Author: alexparvulescu Date: Tue Jul 1 10:17:22 2014 New Revision: 1607031
URL: http://svn.apache.org/r1607031 Log: OAK-1915 TarMK failover 2.0 - added checksum verification to the segment transfer Modified: jackrabbit/oak/trunk/oak-tarmk-failover/README.md jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java Modified: jackrabbit/oak/trunk/oak-tarmk-failover/README.md URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/README.md?rev=1607031&r1=1607030&r2=1607031&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/README.md (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/README.md Tue Jul 1 10:17:22 2014 @@ -7,8 +7,8 @@ Failover The component should be installed when failover support is needed. The setup is expected to be: one master to one/many slaves nodes. -The slave will periodically poll the master for the head state over http -on a custom port, if it changed, it should pull in all the new segments. +The slave will periodically poll the master for the head state, if this +changed, it will pull in all the new segments since the last sync. Setup in OSGi ------------- @@ -24,7 +24,7 @@ Master host represents the master host i Interval represents how often the sync thread should run, in seconds. See examples in the osgi-conf folder for each run mode. To install a new OSGI config in the sling launcher, -you only need to create a new folder called 'install' in the sling.home folder and copy the configs there. +you only need to create a new folder called 'install' in the sling.home folder and copy the specific config there. TODO ---- @@ -32,7 +32,6 @@ TODO - timeout handling doesn't cover everything on both server and slave - error handling on the slave still has some issues (the slave hangs) - maybe enable compression of the segments over the wire - - maybe add a checksum to the segment encoder/decoder to verify the integrity of the transfer - slave runmode could possibly be a read-only mode (no writes permitted) License Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1607031&r1=1607030&r2=1607031&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java Tue Jul 1 10:17:22 2014 @@ -80,12 +80,6 @@ public final class FailoverClient implem @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); - - // p.addLast(new LoggingHandler(LogLevel.INFO)); - // Enable stream compression - // p.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); - // p.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); - // WriteTimeoutHandler & ReadTimeoutHandler p.addLast("readTimeoutHandler", new ReadTimeoutHandler( readTimeoutMs, TimeUnit.MILLISECONDS)); Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java?rev=1607031&r1=1607030&r2=1607031&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java Tue Jul 1 10:17:22 2014 @@ -23,12 +23,18 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import java.nio.ByteBuffer; +import java.util.UUID; + import org.apache.jackrabbit.oak.plugins.segment.Segment; import org.apache.jackrabbit.oak.plugins.segment.SegmentId; import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + public class SegmentDecoder extends LengthFieldBasedFrameDecoder { private static final Logger log = LoggerFactory @@ -37,7 +43,7 @@ public class SegmentDecoder extends Leng private final SegmentStore store; public SegmentDecoder(SegmentStore store) { - super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 4); + super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 0); this.store = store; } @@ -48,14 +54,26 @@ public class SegmentDecoder extends Leng if (frame == null) { return null; } + int len = frame.readInt(); byte type = frame.readByte(); long msb = frame.readLong(); long lsb = frame.readLong(); - frame.discardReadBytes(); - SegmentId id = new SegmentId(store.getTracker(), msb, lsb); - Segment s = new Segment(store.getTracker(), id, frame.nioBuffer()); - log.debug("received type {} with id {} and size {}", type, id, s.size()); - return s; + long hash = frame.readLong(); + byte[] segment = new byte[len - 25]; + frame.getBytes(29, segment); + Hasher hasher = Hashing.murmur3_32().newHasher(); + long check = hasher.putBytes(segment).hash().padToLong(); + if (hash == check) { + SegmentId id = new SegmentId(store.getTracker(), msb, lsb); + Segment s = new Segment(store.getTracker(), id, + ByteBuffer.wrap(segment)); + log.debug("received type {} with id {} and size {}", type, id, + s.size()); + return s; + } + log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb)); + return null; + } @Override Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java?rev=1607031&r1=1607030&r2=1607031&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java Tue Jul 1 10:17:22 2014 @@ -20,27 +20,36 @@ package org.apache.jackrabbit.oak.plugins.segment.failover.codec; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; +import java.io.ByteArrayOutputStream; + import org.apache.jackrabbit.oak.plugins.segment.Segment; import org.apache.jackrabbit.oak.plugins.segment.SegmentId; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + public class SegmentEncoder extends MessageToByteEncoder<Segment> { @Override protected void encode(ChannelHandlerContext ctx, Segment s, ByteBuf out) throws Exception { SegmentId id = s.getSegmentId(); - int len = s.size() + 17; + ByteArrayOutputStream baos = new ByteArrayOutputStream(s.size()); + s.writeTo(baos); + byte[] segment = baos.toByteArray(); + + Hasher hasher = Hashing.murmur3_32().newHasher(); + long hash = hasher.putBytes(segment).hash().padToLong(); + + int len = segment.length + 25; out.writeInt(len); out.writeByte(Messages.HEADER_SEGMENT); out.writeLong(id.getMostSignificantBits()); out.writeLong(id.getLeastSignificantBits()); - ByteBufOutputStream bout = new ByteBufOutputStream(out); - s.writeTo(bout); - bout.flush(); - bout.close(); + out.writeLong(hash); + out.writeBytes(segment); } }
