KYLIN-2627 add simple rollback on ResourceStore
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ea5cabac Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ea5cabac Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ea5cabac Branch: refs/heads/master Commit: ea5cabac95b3d27aa60d6b16263904142e3d2daa Parents: c9dc7cc Author: Li Yang <liy...@apache.org> Authored: Wed May 17 17:46:08 2017 +0800 Committer: Roger Shi <rogershijich...@gmail.com> Committed: Wed May 17 19:40:34 2017 +0800 ---------------------------------------------------------------------- .../kylin/common/persistence/ResourceStore.java | 103 ++++++++++++++++++- .../persistence/LocalFileResourceStoreTest.java | 44 ++++++++ 2 files changed, 145 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ea5cabac/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index d5fbc2e..0565c66 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -18,7 +18,9 @@ package org.apache.kylin.common.persistence; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -27,6 +29,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.NavigableSet; import java.util.UUID; @@ -237,6 +240,11 @@ abstract public class ResourceStore { final public void putResource(String resPath, InputStream content, long ts) throws IOException { resPath = norm(resPath); logger.trace("Directly saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); + putResourceCheckpoint(resPath, content, ts); + } + + private void putResourceCheckpoint(String resPath, InputStream content, long ts) throws IOException { + beforeChange(resPath); putResourceImpl(resPath, content, ts); } @@ -266,7 +274,7 @@ abstract public class ResourceStore { dout.close(); buf.close(); - newTS = checkAndPutResourceImpl(resPath, buf.toByteArray(), oldTS, newTS); + newTS = checkAndPutResourceCheckpoint(resPath, buf.toByteArray(), oldTS, newTS); obj.setLastModified(newTS); // update again the confirmed TS return newTS; } catch (IOException e) { @@ -278,6 +286,11 @@ abstract public class ResourceStore { } } + private long checkAndPutResourceCheckpoint(String resPath, byte[] content, long oldTS, long newTS) throws IOException { + beforeChange(resPath); + return checkAndPutResourceImpl(resPath, content, oldTS, newTS); + } + /** * checks old timestamp when overwriting existing */ @@ -288,7 +301,12 @@ abstract public class ResourceStore { */ final public void deleteResource(String resPath) throws IOException { logger.trace("Deleting resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); - deleteResourceImpl(norm(resPath)); + deleteResourceCheckpoint(norm(resPath)); + } + + private void deleteResourceCheckpoint(String resPath) throws IOException { + beforeChange(resPath); + deleteResourceImpl(resPath); } abstract protected void deleteResourceImpl(String resPath) throws IOException; @@ -315,6 +333,87 @@ abstract public class ResourceStore { // ============================================================================ + ThreadLocal<Checkpoint> checkpointing = new ThreadLocal<>(); + + public Checkpoint checkpoint() { + Checkpoint cp = checkpointing.get(); + if (cp != null) + throw new IllegalStateException("A checkpoint has been open for this thread: " + cp); + + cp = new Checkpoint(); + checkpointing.set(cp); + return cp; + } + + private void beforeChange(String resPath) throws IOException { + Checkpoint cp = checkpointing.get(); + if (cp != null) + cp.beforeChange(resPath); + } + + public class Checkpoint implements Closeable { + + LinkedHashMap<String, byte[]> origResData = new LinkedHashMap<>(); + LinkedHashMap<String, Long> origResTimestamp = new LinkedHashMap<>(); + + private void beforeChange(String resPath) throws IOException { + if (origResData.containsKey(resPath)) + return; + + RawResource raw = getResourceImpl(resPath); + if (raw == null) { + origResData.put(resPath, null); + origResTimestamp.put(resPath, null); + } else { + origResData.put(resPath, readAll(raw.inputStream)); + origResTimestamp.put(resPath, raw.timestamp); + } + } + + private byte[] readAll(InputStream inputStream) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IOUtils.copy(inputStream, out); + inputStream.close(); + out.close(); + return out.toByteArray(); + } + + public void rollback() { + checkThread(); + + for (String resPath : origResData.keySet()) { + logger.debug("Rollbacking " + resPath); + try { + byte[] data = origResData.get(resPath); + Long ts = origResTimestamp.get(resPath); + if (data == null || ts == null) + deleteResourceImpl(resPath); + else + putResourceImpl(resPath, new ByteArrayInputStream(data), ts); + } catch (IOException ex) { + logger.error("Failed to rollback " + resPath, ex); + } + } + } + + @Override + public void close() throws IOException { + checkThread(); + + origResData = null; + origResTimestamp = null; + checkpointing.set(null); + } + + private void checkThread() { + Checkpoint cp = checkpointing.get(); + if (this != cp) + throw new IllegalStateException(); + } + } + + // ============================================================================ + public static interface Visitor { void visit(String path) throws IOException; } http://git-wip-us.apache.org/repos/asf/kylin/blob/ea5cabac/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java index 17b608d..aca4a0a 100644 --- a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java @@ -18,7 +18,12 @@ package org.apache.kylin.common.persistence; +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; + import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore.Checkpoint; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.junit.After; import org.junit.Before; @@ -41,4 +46,43 @@ public class LocalFileResourceStoreTest extends LocalFileMetadataTestCase { ResourceStoreTest.testAStore("", KylinConfig.getInstanceFromEnv()); } + @Test + public void testRollback() throws Exception { + ResourceStore store = ResourceStore.getStore(KylinConfig.getInstanceFromEnv()); + byte[] bytes = new byte[] { 0, 1, 2 }; + RawResource raw; + Checkpoint cp; + + cp = store.checkpoint(); + try { + store.putResource("/res1", new StringEntity("data1"), 1000, StringEntity.serializer); + } finally { + cp.close(); + } + StringEntity str = store.getResource("/res1", StringEntity.class, StringEntity.serializer); + assertEquals("data1", str.toString()); + + cp = store.checkpoint(); + try { + ByteArrayInputStream is = new ByteArrayInputStream(bytes); + store.putResource("/res2", is, 2000); + is.close(); + + store.putResource("/res1", str, 2000, StringEntity.serializer); + store.deleteResource("/res1"); + + assertEquals(null, store.getResource("/res1")); + assertEquals(2000, (raw = store.getResource("/res2")).timestamp); + raw.inputStream.close(); + + cp.rollback(); + + assertEquals(null, store.getResource("/res2")); + assertEquals(1000, (raw = store.getResource("/res1")).timestamp); + raw.inputStream.close(); + } finally { + cp.close(); + } + } + }