abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/805
Change subject: Update the Key Value Reader ...................................................................... Update the Key Value Reader Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699 --- M asterixdb/asterix-external-data/pom.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java 4 files changed, 24 insertions(+), 35 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/05/805/1 diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index 8c59cc4..3a47ba6 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -287,7 +287,7 @@ <dependency> <groupId>com.couchbase.client</groupId> <artifactId>core-io</artifactId> - <version>1.2.3</version> + <version>1.3.0-SNAPSHOT</version> </dependency> <dependency> <groupId>io.reactivex</groupId> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java index 185aea0..3f5b531 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java @@ -30,10 +30,7 @@ import org.apache.log4j.Logger; import com.couchbase.client.core.CouchbaseCore; -import com.couchbase.client.core.dcp.BucketStreamAggregator; -import com.couchbase.client.core.dcp.BucketStreamAggregatorState; -import com.couchbase.client.core.dcp.BucketStreamState; -import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent; +import com.couchbase.client.core.endpoint.dcp.DCPConnection; import com.couchbase.client.core.env.DefaultCoreEnvironment; import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder; import com.couchbase.client.core.message.cluster.CloseBucketRequest; @@ -41,27 +38,30 @@ import com.couchbase.client.core.message.cluster.SeedNodesRequest; import com.couchbase.client.core.message.dcp.DCPRequest; import com.couchbase.client.core.message.dcp.MutationMessage; +import com.couchbase.client.core.message.dcp.OpenConnectionRequest; +import com.couchbase.client.core.message.dcp.OpenConnectionResponse; import com.couchbase.client.core.message.dcp.RemoveMessage; import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage; +import rx.Observable; import rx.functions.Action1; public class KVReader implements IRecordReader<DCPRequest> { private static final Logger LOGGER = Logger.getLogger(KVReader.class); - private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L, - null); + private static final MutationMessage POISON_PILL = new MutationMessage(0, (short) 0, null, null, 0, 0L, 0L, 0, 0, + 0L, null); private final String feedName; private final short[] vbuckets; private final String bucket; private final String password; private final String[] sourceNodes; private final Builder builder; - private final BucketStreamAggregator bucketStreamAggregator; private final CouchbaseCore core; private final DefaultCoreEnvironment env; private final GenericRecord<DCPRequest> record; private final ArrayBlockingQueue<DCPRequest> messages; + private final DCPConnection connection; private AbstractFeedDataFlowController controller; private Thread pushThread; private boolean done = false; @@ -78,7 +78,8 @@ .autoreleaseAfter(KVReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS); this.env = builder.build(); this.core = new CouchbaseCore(env); - this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket); + connection = core.<OpenConnectionResponse> send(new OpenConnectionRequest(feedName, bucket)).toBlocking() + .single().connection(); this.record = new GenericRecord<>(); connect(); } @@ -98,36 +99,24 @@ this.pushThread = new Thread(new Runnable() { @Override public void run() { - KVReader.this.run(bucketStreamAggregator); + KVReader.this.run(connection); } }, feedName); pushThread.start(); } - private void run(BucketStreamAggregator bucketStreamAggregator) { - BucketStreamAggregatorState state = new BucketStreamAggregatorState(); - for (int i = 0; i < vbuckets.length; i++) { - state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff)); - } - state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() { - @Override - public void call(BucketStreamStateUpdatedEvent event) { - if (event.partialUpdate()) { - } else { - } - } - }); + private void run(DCPConnection connection) { try { - bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() { + for (int i = 0; i < vbuckets.length; i++) { + connection.addStream(vbuckets[i]).toBlocking().single(); + } + connection.subject().takeUntil(Observable.never()).toBlocking().forEach(new Action1<DCPRequest>() { @Override public void call(DCPRequest dcpRequest) { try { if (dcpRequest instanceof SnapshotMarkerMessage) { SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest; - BucketStreamState oldState = state.get(message.partition()); - state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(), - message.endSequenceNumber(), oldState.endSequenceNumber(), - message.endSequenceNumber(), oldState.snapshotEndSequenceNumber())); + LOGGER.info("snapshot DCP message received: " + message); } else if ((dcpRequest instanceof MutationMessage) || (dcpRequest instanceof RemoveMessage)) { messages.put(dcpRequest); } else { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java index bc2a980..6c85b20 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java @@ -138,8 +138,8 @@ for (int i = 0; i < vbuckets.length; i++) { vbuckets[i] = listOfAssignedVBuckets.get(i); } - return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes, - vbuckets, ExternalDataUtils.getQueueSize(configuration)); + return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes, vbuckets, + ExternalDataUtils.getQueueSize(configuration)); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java index b75f26c..43132b6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java @@ -132,7 +132,7 @@ if (nextDeleteKey != null) { final String key = nextDeleteKey; nextDeleteKey = null; - return new RemoveMessage(nextDeletePartition, key, cas++, seq++, 0L, bucket); + return new RemoveMessage(0, nextDeletePartition, key, cas++, seq++, 0L, bucket); } } generateNextDocument(); @@ -141,12 +141,12 @@ final String key = nextUpsertKey; nextUpsertKey = null; upsertCounter++; - return new MutationMessage(nextUpsertPartition, key, byteBuff, expiration++, seq++, 0, 0, lockTime++, - cas++, bucket); + return new MutationMessage(byteBuff.readableBytes(), nextUpsertPartition, key, byteBuff, expiration++, + seq++, 0, 0, lockTime++, cas++, bucket); } } - return new MutationMessage(assigned.get(counter % assigned.size()), generateKey(), byteBuff, expiration++, - seq++, 0, 0, lockTime++, cas++, bucket); + return new MutationMessage(byteBuff.readableBytes(), assigned.get(counter % assigned.size()), generateKey(), + byteBuff, expiration++, seq++, 0, 0, lockTime++, cas++, bucket); } private void generateNextDocument() { -- To view, visit https://asterix-gerrit.ics.uci.edu/805 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>