[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172493072
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
+
+int numOfLedgers = 70;
+int numEntries = 2000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
+List writeAndFlushTasks = new 
ArrayList();
+for (int j = 0; j < numEntries; j++) {
+for (int i = 0; i < numOfLedgers; i++) {
+writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+}
+}
+
+/*
+ * add some flush tasks to the list of writetasks list.
+ */
+for (int i = 0; i < 

[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172492593
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
 
 Review comment:
   But what happens if test hits a failure because of randomness? I can 
guarantee that 99 times out of 100, noone looks at the failure, reruns for the 
pass, or just ignores CI. Given that this is how things have always been, I'd 
prefer more determinism in the order things run, and only hope for randomness 
in how the system scheduler is scheduling threads.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With 

[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172492072
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
+
+int numOfLedgers = 70;
+int numEntries = 2000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
+List writeAndFlushTasks = new 
ArrayList();
+for (int j = 0; j < numEntries; j++) {
+for (int i = 0; i < numOfLedgers; i++) {
+writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+}
+}
+
+/*
+ * add some flush tasks to the list of writetasks list.
+ */
+for (int i = 0; i < 

[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172460269
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
 
 Review comment:
   Specify a seed, so that the test is more deterministic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172459536
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
+
+int numOfLedgers = 70;
+int numEntries = 2000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
+List writeAndFlushTasks = new 
ArrayList();
+for (int j = 0; j < numEntries; j++) {
+for (int i = 0; i < numOfLedgers; i++) {
+writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+}
+}
+
+/*
+ * add some flush tasks to the list of writetasks list.
+ */
+for (int i = 0; i < 

[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172461173
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
+
+int numOfLedgers = 70;
+int numEntries = 2000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
+List writeAndFlushTasks = new 
ArrayList();
+for (int j = 0; j < numEntries; j++) {
+for (int i = 0; i < numOfLedgers; i++) {
+writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+}
+}
+
+/*
+ * add some flush tasks to the list of writetasks list.
+ */
+for (int i = 0; i < 

[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-06 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172460039
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +402,174 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+static class LedgerStorageFlushTask implements Callable {
+LedgerStorage ledgerStorage;
+
+LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ledgerStorage.flush();
+} catch (IOException e) {
+LOG.error("Got Exception for flush call", e);
+throw new IOException("Got Exception for Flush call", e);
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() throws IOException {
+try {
+ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+if (!expectedByteBuf.equals(actualByteBuf)) {
+LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+actualByteBuf.toString(Charset.defaultCharset()));
+throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
++ " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+e);
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
+InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+Random rand = new Random();
+
+int numOfLedgers = 70;
+int numEntries = 2000;
+// Create ledgers
+for (int i = 0; i < numOfLedgers; i++) {
+ledgerStorage.setMasterKey(i, "key".getBytes());
+}
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
+List writeAndFlushTasks = new 
ArrayList();
+for (int j = 0; j < numEntries; j++) {
+for (int i = 0; i < numOfLedgers; i++) {
+writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+}
+}
+
+/*
+ * add some flush tasks to the list of writetasks list.
+ */
+for (int i = 0; i < 

[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172314429
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
 
 Review comment:
   @reddycharan it would be good to have a thread running in a tight loop 
calling flush() on the ledgerstorage, while the adds and reads are happening.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172307998
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
 
 Review comment:
   because .get() throws checked exceptions, so in the snippet I posted, you 
would have to catch and rethrow as an unchecked exception. 
   
   I was suggest throwing exceptions in the callable thread precisely because 
asserts are only meaningful in the main thread (they're java.lang.Error). 
Throwing an exception in the throwable will actually be easer to debug, because 
you'll get the stacktrace directly on failure, so you can tell what line failed 
without having to even go to the logs. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172311201
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
 
 Review comment:
   > whether constructing a full bookie or constructing a ledger storage is 
just a tool to setup a test case for testing EntryLog.
   
   If it's only testing EntryLogger, then only entrylogger should be 
constructed.
   
   But the change has changed flushing also, and removed locking that was 
previous around access to the index, so the how ledger storage should be 
tested. 
   
   > the whole test suite is using Bookie already. for consistency, it is okay 
to use Bookie here. I don't see a strong reason to block this change just 
because of that.
   
   The rest of the suite is testing badly. We shouldn't propagate bad 
practices. I don't consider consistency a good argument against this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172144582
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
 
 Review comment:
   > I don't understand why you need a sorted ledger storage test here. the 
change is in entry log itself.
   
   This change is in interleavedledgerstorage, which sortedledgerstorage 
derives from. It removes synchronization from processEntry, which 
sortedledgerstorage calls directly. It seems there's no other synchronization 
in sortedledgerstorage though, so it should be ok.
   
   > the test should focus on testing what it is changing, 
   
   which is _exactly_ why we shouldn't be constructing a full bookie to test it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172130878
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+static class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+static class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
 
 Review comment:
   This change effects SortedLedgerStorage too. Add a variant of the test that 
tests against that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172125347
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
 
 Review comment:
   It was for diagnosing the errors that I suggested putting the ledgerid and 
entryid in the message of the thrown exception. You're right about join though. 
Using get would make things messy, so possibly not worth it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172126626
 
 

 ##
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##
 @@ -391,4 +400,137 @@ public void testGetEntryLogsSet() throws Exception {
 
 assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
 }
+
+class LedgerStorageWriteTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+} catch (IOException e) {
+LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+class LedgerStorageReadTask implements Callable {
+long ledgerId;
+int entryId;
+LedgerStorage ledgerStorage;
+
+LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+this.ledgerId = ledgerId;
+this.entryId = entryId;
+this.ledgerStorage = ledgerStorage;
+}
+
+@Override
+public Boolean call() {
+try {
+String expectedValue = generateDataString(ledgerId, entryId);
+ByteBuf byteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+long actualLedgerId = byteBuf.readLong();
+long actualEntryId = byteBuf.readLong();
+byte[] data = new byte[byteBuf.readableBytes()];
+byteBuf.readBytes(data);
+if (ledgerId != actualLedgerId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual ledgerId: {}", ledgerId, entryId,
+actualLedgerId);
+return false;
+}
+if (entryId != actualEntryId) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual entryId: {}", ledgerId, entryId,
+actualEntryId);
+return false;
+}
+if (!expectedValue.equals(new String(data))) {
+LOG.error("For ledgerId: {} entryId: {} readRequest, 
actual Data: {}", ledgerId, entryId,
+new String(data));
+return false;
+}
+} catch (IOException e) {
+LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+return false;
+}
+return true;
+}
+}
+
+/**
+ * test concurrent write operations and then concurrent read
+ * operations using InterleavedLedgerStorage.
+ */
+@Test
+public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+File ledgerDir = createTempDir("bkTest", ".dir");
+ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+conf.setJournalDirName(ledgerDir.toString());
+conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+Bookie bookie = new Bookie(conf);
 
 Review comment:
   I've seen this pattern a few times where we construct a whole bookie, and 
everything that goes with it, just to get the ledger dirs manager or ledger 
storage. It makes our tests super heavy, and hard to inject mocked behaviour. 
This is part of the reason we have Thread.sleep all over the place in the test.
   
   Anyhow, I won't block for this, but it's something we should be aware of and 
try to avoid in future. If you're concerned about long argument lists, we can 
create utility methods to hide the complexity.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

2018-03-05 Thread GitBox
ivankelly commented on a change in pull request #1225: Issue #570: getting rid 
of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225#discussion_r172130455
 
 

 ##
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 ##
 @@ -360,10 +362,10 @@ public void checkpoint(Checkpoint checkpoint) throws 
IOException {
 
 @Override
 public synchronized void flush() throws IOException {
-if (!somethingWritten) {
+if (!somethingWritten.get()) {
 
 Review comment:
   with the compareAndSet, the only thing protected here is the 
flushOrCheckpoint(), which is called unprotected from checkpoint() above, so I 
think it would be safe. 
   
   It only seems to be called from the syncthread, which means it's effectively 
synchronized anyhow since it's all on a single thread, and the synchronized 
here was only to protect the somethingWritten flag. 
   
   It's no harm to leave it in, just seems weird to be mixing atomics and 
synchronization without good reason.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services