abdullah alamoudi has submitted this change and it was merged. Change subject: Update the Key Value Reader ......................................................................
Update the Key Value Reader Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699 Reviewed-on: https://asterix-gerrit.ics.uci.edu/805 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> --- M asterixdb/asterix-external-data/pom.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java 5 files changed, 70 insertions(+), 75 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index 8c59cc4..f7ac6a9 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -287,12 +287,7 @@ <dependency> <groupId>com.couchbase.client</groupId> <artifactId>core-io</artifactId> - <version>1.2.3</version> - </dependency> - <dependency> - <groupId>io.reactivex</groupId> - <artifactId>rxjava</artifactId> - <version>1.0.15</version> + <version>1.2.7</version> </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java index 379bbf2..724699c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java @@ -37,6 +37,7 @@ import com.couchbase.client.core.message.dcp.MutationMessage; import com.couchbase.client.core.message.dcp.RemoveMessage; import com.couchbase.client.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil; public class DCPRequestToRecordWithMetadataAndPKConverter implements IRecordToRecordWithMetadataAndPKConverter<DCPRequest, char[]> { @@ -88,6 +89,7 @@ recordWithMetadata.setMetadata(8, revSeqNumber); recordWithMetadata.setMetadata(9, lockTime); DCPRequestToRecordWithMetadataAndPKConverter.set(message.content(), decoder, bytes, chars, value); + ReferenceCountUtil.release(message.content()); } else if (dcpRequest instanceof RemoveMessage) { final RemoveMessage message = (RemoveMessage) dcpRequest; final String key = message.key(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java index 185aea0..41bcc46 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java @@ -30,36 +30,30 @@ import org.apache.log4j.Logger; import com.couchbase.client.core.CouchbaseCore; -import com.couchbase.client.core.dcp.BucketStreamAggregator; -import com.couchbase.client.core.dcp.BucketStreamAggregatorState; -import com.couchbase.client.core.dcp.BucketStreamState; -import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent; -import com.couchbase.client.core.env.DefaultCoreEnvironment; -import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder; +import com.couchbase.client.core.endpoint.dcp.DCPConnection; import com.couchbase.client.core.message.cluster.CloseBucketRequest; import com.couchbase.client.core.message.cluster.OpenBucketRequest; import com.couchbase.client.core.message.cluster.SeedNodesRequest; import com.couchbase.client.core.message.dcp.DCPRequest; import com.couchbase.client.core.message.dcp.MutationMessage; +import com.couchbase.client.core.message.dcp.OpenConnectionRequest; +import com.couchbase.client.core.message.dcp.OpenConnectionResponse; import com.couchbase.client.core.message.dcp.RemoveMessage; import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage; -import rx.functions.Action1; +import rx.Subscriber; public class KVReader implements IRecordReader<DCPRequest> { private static final Logger LOGGER = Logger.getLogger(KVReader.class); - private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L, - null); + private static final MutationMessage POISON_PILL = new MutationMessage(0, (short) 0, null, null, 0, 0L, 0L, 0, 0, + 0L, null); private final String feedName; private final short[] vbuckets; private final String bucket; private final String password; private final String[] sourceNodes; - private final Builder builder; - private final BucketStreamAggregator bucketStreamAggregator; private final CouchbaseCore core; - private final DefaultCoreEnvironment env; private final GenericRecord<DCPRequest> record; private final ArrayBlockingQueue<DCPRequest> messages; private AbstractFeedDataFlowController controller; @@ -67,20 +61,22 @@ private boolean done = false; public KVReader(String feedName, String bucket, String password, String[] sourceNodes, short[] vbuckets, - int queueSize) throws HyracksDataException { + int queueSize, CouchbaseCore core) throws HyracksDataException { this.feedName = feedName; this.bucket = bucket; this.password = password; this.sourceNodes = sourceNodes; this.vbuckets = vbuckets; this.messages = new ArrayBlockingQueue<DCPRequest>(queueSize); - this.builder = DefaultCoreEnvironment.builder().dcpEnabled(KVReaderFactory.DCP_ENABLED) - .autoreleaseAfter(KVReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS); - this.env = builder.build(); - this.core = new CouchbaseCore(env); - this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket); + this.core = core; this.record = new GenericRecord<>(); - connect(); + this.pushThread = new Thread(new Runnable() { + @Override + public void run() { + KVReader.this.run(); + } + }, feedName); + pushThread.start(); } @Override @@ -90,44 +86,33 @@ } } - private void connect() { + private void run() { core.send(new SeedNodesRequest(sourceNodes)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT) .toBlocking().single(); core.send(new OpenBucketRequest(bucket, password)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT) .toBlocking().single(); - this.pushThread = new Thread(new Runnable() { - @Override - public void run() { - KVReader.this.run(bucketStreamAggregator); - } - }, feedName); - pushThread.start(); - } - - private void run(BucketStreamAggregator bucketStreamAggregator) { - BucketStreamAggregatorState state = new BucketStreamAggregatorState(); + DCPConnection connection = core.<OpenConnectionResponse> send(new OpenConnectionRequest(feedName, bucket)) + .toBlocking().single().connection(); for (int i = 0; i < vbuckets.length; i++) { - state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff)); + connection.addStream(vbuckets[i]).toBlocking().single(); } - state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() { - @Override - public void call(BucketStreamStateUpdatedEvent event) { - if (event.partialUpdate()) { - } else { - } - } - }); try { - bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() { + connection.subject().toBlocking().subscribe(new Subscriber<DCPRequest>() { @Override - public void call(DCPRequest dcpRequest) { + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + } + + @Override + public void onNext(DCPRequest dcpRequest) { try { if (dcpRequest instanceof SnapshotMarkerMessage) { SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest; - BucketStreamState oldState = state.get(message.partition()); - state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(), - message.endSequenceNumber(), oldState.endSequenceNumber(), - message.endSequenceNumber(), oldState.snapshotEndSequenceNumber())); + LOGGER.info("snapshot DCP message received: " + message); } else if ((dcpRequest instanceof MutationMessage) || (dcpRequest instanceof RemoveMessage)) { messages.put(dcpRequest); } else { @@ -139,13 +124,7 @@ } }); } catch (Throwable th) { - if (th.getCause() instanceof InterruptedException) { - LOGGER.warn("dcp thread was interrupted", th); - synchronized (this) { - KVReader.this.close(); - notifyAll(); - } - } + th.printStackTrace(); throw th; } } @@ -172,6 +151,7 @@ public boolean stop() { done = true; core.send(new CloseBucketRequest(bucket)).toBlocking(); + pushThread.interrupt(); try { messages.put(KVReader.POISON_PILL); } catch (InterruptedException e) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java index bc2a980..70d53f6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java @@ -19,7 +19,10 @@ package org.apache.asterix.external.input.record.reader.kv; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.asterix.common.exceptions.AsterixException; @@ -62,13 +65,19 @@ private int[] schedule; private String feedName; // Transient fields - private transient CouchbaseCore core; + private static transient CouchbaseCore core; private transient Builder builder; - private transient DefaultCoreEnvironment env; + private static transient DefaultCoreEnvironment env; + private transient AlgebricksAbsolutePartitionConstraint locationConstraints; @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { - return AsterixClusterProperties.INSTANCE.getClusterLocations(); + if (locationConstraints == null) { + String[] allPartitions = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations(); + Set<String> ncs = new HashSet<String>(Arrays.asList(allPartitions)); + locationConstraints = new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()])); + } + return locationConstraints; } @Override @@ -90,12 +99,20 @@ bucket = configuration.get(ExternalDataConstants.KEY_BUCKET); couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(","); feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME); - builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED) - .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS); - env = builder.build(); - core = new CouchbaseCore(env); + createEnvironment("CC"); getNumberOfVbuckets(); schedule(); + } + + private void createEnvironment(String connectionName) { + synchronized (TIME_UNIT) { + if (core == null) { + builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED).dcpConnectionName(connectionName) + .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS); + env = builder.build(); + core = new CouchbaseCore(env); + } + } } /* @@ -104,7 +121,7 @@ */ private void schedule() { schedule = new int[numOfVBuckets]; - String[] locations = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations(); + String[] locations = getPartitionConstraint().getLocations(); for (int i = 0; i < numOfVBuckets; i++) { schedule[i] = i % locations.length; } @@ -128,6 +145,7 @@ public IRecordReader<? extends DCPRequest> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException { String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId(); + createEnvironment(nodeName); ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>(); for (int i = 0; i < schedule.length; i++) { if (schedule[i] == partition) { @@ -138,8 +156,8 @@ for (int i = 0; i < vbuckets.length; i++) { vbuckets[i] = listOfAssignedVBuckets.get(i); } - return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes, - vbuckets, ExternalDataUtils.getQueueSize(configuration)); + return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes, vbuckets, + ExternalDataUtils.getQueueSize(configuration), core); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java index b75f26c..f7fe77f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java @@ -80,7 +80,6 @@ } } this.byteBuff = ByteBufAllocator.DEFAULT.buffer(ExternalDataConstants.DEFAULT_BUFFER_SIZE); - byteBuff.retain(); this.record = new GenericRecord<DCPRequest>(); this.counter = counterStart; } @@ -132,7 +131,7 @@ if (nextDeleteKey != null) { final String key = nextDeleteKey; nextDeleteKey = null; - return new RemoveMessage(nextDeletePartition, key, cas++, seq++, 0L, bucket); + return new RemoveMessage(0, nextDeletePartition, key, cas++, seq++, 0L, bucket); } } generateNextDocument(); @@ -141,15 +140,16 @@ final String key = nextUpsertKey; nextUpsertKey = null; upsertCounter++; - return new MutationMessage(nextUpsertPartition, key, byteBuff, expiration++, seq++, 0, 0, lockTime++, - cas++, bucket); + return new MutationMessage(byteBuff.readableBytes(), nextUpsertPartition, key, byteBuff, expiration++, + seq++, 0, 0, lockTime++, cas++, bucket); } } - return new MutationMessage(assigned.get(counter % assigned.size()), generateKey(), byteBuff, expiration++, - seq++, 0, 0, lockTime++, cas++, bucket); + return new MutationMessage(byteBuff.readableBytes(), assigned.get(counter % assigned.size()), generateKey(), + byteBuff, expiration++, seq++, 0, 0, lockTime++, cas++, bucket); } private void generateNextDocument() { + byteBuff.retain(); // reset the string strBuilder.setLength(0); strBuilder.append("{\"id\":" + (counter + upsertCounter) + ",\"name\":\"" -- To view, visit https://asterix-gerrit.ics.uci.edu/805 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699 Gerrit-PatchSet: 12 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com> Gerrit-Reviewer: Chris Hillery <c...@lambda.nu> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>