abdullah alamoudi has submitted this change and it was merged.

Change subject: Update the Key Value Reader
......................................................................


Update the Key Value Reader

Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699
Reviewed-on: https://asterix-gerrit.ics.uci.edu/805
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
---
M asterixdb/asterix-external-data/pom.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
5 files changed, 70 insertions(+), 75 deletions(-)

Approvals:
  Till Westmann: Looks good to me, approved
  Jenkins: Verified



diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index 8c59cc4..f7ac6a9 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -287,12 +287,7 @@
         <dependency>
             <groupId>com.couchbase.client</groupId>
             <artifactId>core-io</artifactId>
-            <version>1.2.3</version>
-        </dependency>
-        <dependency>
-            <groupId>io.reactivex</groupId>
-            <artifactId>rxjava</artifactId>
-            <version>1.0.15</version>
+            <version>1.2.7</version>
         </dependency>
     </dependencies>
 </project>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
index 379bbf2..724699c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
@@ -37,6 +37,7 @@
 import com.couchbase.client.core.message.dcp.MutationMessage;
 import com.couchbase.client.core.message.dcp.RemoveMessage;
 import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
 
 public class DCPRequestToRecordWithMetadataAndPKConverter
         implements IRecordToRecordWithMetadataAndPKConverter<DCPRequest, 
char[]> {
@@ -88,6 +89,7 @@
             recordWithMetadata.setMetadata(8, revSeqNumber);
             recordWithMetadata.setMetadata(9, lockTime);
             
DCPRequestToRecordWithMetadataAndPKConverter.set(message.content(), decoder, 
bytes, chars, value);
+            ReferenceCountUtil.release(message.content());
         } else if (dcpRequest instanceof RemoveMessage) {
             final RemoveMessage message = (RemoveMessage) dcpRequest;
             final String key = message.key();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
index 185aea0..41bcc46 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
@@ -30,36 +30,30 @@
 import org.apache.log4j.Logger;
 
 import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.dcp.BucketStreamAggregator;
-import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
-import com.couchbase.client.core.dcp.BucketStreamState;
-import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
-import com.couchbase.client.core.env.DefaultCoreEnvironment;
-import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.endpoint.dcp.DCPConnection;
 import com.couchbase.client.core.message.cluster.CloseBucketRequest;
 import com.couchbase.client.core.message.cluster.OpenBucketRequest;
 import com.couchbase.client.core.message.cluster.SeedNodesRequest;
 import com.couchbase.client.core.message.dcp.DCPRequest;
 import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
+import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
 import com.couchbase.client.core.message.dcp.RemoveMessage;
 import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
 
-import rx.functions.Action1;
+import rx.Subscriber;
 
 public class KVReader implements IRecordReader<DCPRequest> {
 
     private static final Logger LOGGER = Logger.getLogger(KVReader.class);
-    private static final MutationMessage POISON_PILL = new 
MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
-            null);
+    private static final MutationMessage POISON_PILL = new MutationMessage(0, 
(short) 0, null, null, 0, 0L, 0L, 0, 0,
+            0L, null);
     private final String feedName;
     private final short[] vbuckets;
     private final String bucket;
     private final String password;
     private final String[] sourceNodes;
-    private final Builder builder;
-    private final BucketStreamAggregator bucketStreamAggregator;
     private final CouchbaseCore core;
-    private final DefaultCoreEnvironment env;
     private final GenericRecord<DCPRequest> record;
     private final ArrayBlockingQueue<DCPRequest> messages;
     private AbstractFeedDataFlowController controller;
@@ -67,20 +61,22 @@
     private boolean done = false;
 
     public KVReader(String feedName, String bucket, String password, String[] 
sourceNodes, short[] vbuckets,
-            int queueSize) throws HyracksDataException {
+            int queueSize, CouchbaseCore core) throws HyracksDataException {
         this.feedName = feedName;
         this.bucket = bucket;
         this.password = password;
         this.sourceNodes = sourceNodes;
         this.vbuckets = vbuckets;
         this.messages = new ArrayBlockingQueue<DCPRequest>(queueSize);
-        this.builder = 
DefaultCoreEnvironment.builder().dcpEnabled(KVReaderFactory.DCP_ENABLED)
-                
.autoreleaseAfter(KVReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
-        this.env = builder.build();
-        this.core = new CouchbaseCore(env);
-        this.bucketStreamAggregator = new BucketStreamAggregator(feedName, 
core, bucket);
+        this.core = core;
         this.record = new GenericRecord<>();
-        connect();
+        this.pushThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KVReader.this.run();
+            }
+        }, feedName);
+        pushThread.start();
     }
 
     @Override
@@ -90,44 +86,33 @@
         }
     }
 
-    private void connect() {
+    private void run() {
         core.send(new 
SeedNodesRequest(sourceNodes)).timeout(KVReaderFactory.TIMEOUT, 
KVReaderFactory.TIME_UNIT)
                 .toBlocking().single();
         core.send(new OpenBucketRequest(bucket, 
password)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
                 .toBlocking().single();
-        this.pushThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                KVReader.this.run(bucketStreamAggregator);
-            }
-        }, feedName);
-        pushThread.start();
-    }
-
-    private void run(BucketStreamAggregator bucketStreamAggregator) {
-        BucketStreamAggregatorState state = new BucketStreamAggregatorState();
+        DCPConnection connection = core.<OpenConnectionResponse> send(new 
OpenConnectionRequest(feedName, bucket))
+                .toBlocking().single().connection();
         for (int i = 0; i < vbuckets.length; i++) {
-            state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 
0xffffffff));
+            connection.addStream(vbuckets[i]).toBlocking().single();
         }
-        state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() 
{
-            @Override
-            public void call(BucketStreamStateUpdatedEvent event) {
-                if (event.partialUpdate()) {
-                } else {
-                }
-            }
-        });
         try {
-            bucketStreamAggregator.feed(state).toBlocking().forEach(new 
Action1<DCPRequest>() {
+            connection.subject().toBlocking().subscribe(new 
Subscriber<DCPRequest>() {
                 @Override
-                public void call(DCPRequest dcpRequest) {
+                public void onCompleted() {
+                }
+
+                @Override
+                public void onError(Throwable e) {
+                    e.printStackTrace();
+                }
+
+                @Override
+                public void onNext(DCPRequest dcpRequest) {
                     try {
                         if (dcpRequest instanceof SnapshotMarkerMessage) {
                             SnapshotMarkerMessage message = 
(SnapshotMarkerMessage) dcpRequest;
-                            BucketStreamState oldState = 
state.get(message.partition());
-                            state.put(new 
BucketStreamState(message.partition(), oldState.vbucketUUID(),
-                                    message.endSequenceNumber(), 
oldState.endSequenceNumber(),
-                                    message.endSequenceNumber(), 
oldState.snapshotEndSequenceNumber()));
+                            LOGGER.info("snapshot DCP message received: " + 
message);
                         } else if ((dcpRequest instanceof MutationMessage) || 
(dcpRequest instanceof RemoveMessage)) {
                             messages.put(dcpRequest);
                         } else {
@@ -139,13 +124,7 @@
                 }
             });
         } catch (Throwable th) {
-            if (th.getCause() instanceof InterruptedException) {
-                LOGGER.warn("dcp thread was interrupted", th);
-                synchronized (this) {
-                    KVReader.this.close();
-                    notifyAll();
-                }
-            }
+            th.printStackTrace();
             throw th;
         }
     }
@@ -172,6 +151,7 @@
     public boolean stop() {
         done = true;
         core.send(new CloseBucketRequest(bucket)).toBlocking();
+        pushThread.interrupt();
         try {
             messages.put(KVReader.POISON_PILL);
         } catch (InterruptedException e) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
index bc2a980..70d53f6 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
@@ -19,7 +19,10 @@
 package org.apache.asterix.external.input.record.reader.kv;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -62,13 +65,19 @@
     private int[] schedule;
     private String feedName;
     // Transient fields
-    private transient CouchbaseCore core;
+    private static transient CouchbaseCore core;
     private transient Builder builder;
-    private transient DefaultCoreEnvironment env;
+    private static transient DefaultCoreEnvironment env;
+    private transient AlgebricksAbsolutePartitionConstraint 
locationConstraints;
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        return AsterixClusterProperties.INSTANCE.getClusterLocations();
+        if (locationConstraints == null) {
+            String[] allPartitions = 
AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+            Set<String> ncs = new 
HashSet<String>(Arrays.asList(allPartitions));
+            locationConstraints = new 
AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()]));
+        }
+        return locationConstraints;
     }
 
     @Override
@@ -90,12 +99,20 @@
         bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
         couchbaseNodes = 
configuration.get(ExternalDataConstants.KEY_NODES).split(",");
         feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
-        builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED)
-                .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
-        env = builder.build();
-        core = new CouchbaseCore(env);
+        createEnvironment("CC");
         getNumberOfVbuckets();
         schedule();
+    }
+
+    private void createEnvironment(String connectionName) {
+        synchronized (TIME_UNIT) {
+            if (core == null) {
+                builder = 
DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED).dcpConnectionName(connectionName)
+                        .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
+                env = builder.build();
+                core = new CouchbaseCore(env);
+            }
+        }
     }
 
     /*
@@ -104,7 +121,7 @@
      */
     private void schedule() {
         schedule = new int[numOfVBuckets];
-        String[] locations = 
AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+        String[] locations = getPartitionConstraint().getLocations();
         for (int i = 0; i < numOfVBuckets; i++) {
             schedule[i] = i % locations.length;
         }
@@ -128,6 +145,7 @@
     public IRecordReader<? extends DCPRequest> 
createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
         String nodeName = 
ctx.getJobletContext().getApplicationContext().getNodeId();
+        createEnvironment(nodeName);
         ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
         for (int i = 0; i < schedule.length; i++) {
             if (schedule[i] == partition) {
@@ -138,8 +156,8 @@
         for (int i = 0; i < vbuckets.length; i++) {
             vbuckets[i] = listOfAssignedVBuckets.get(i);
         }
-        return new KVReader(feedName + ":" + nodeName + ":" + partition, 
bucket, password, couchbaseNodes,
-                vbuckets, ExternalDataUtils.getQueueSize(configuration));
+        return new KVReader(feedName + ":" + nodeName + ":" + partition, 
bucket, password, couchbaseNodes, vbuckets,
+                ExternalDataUtils.getQueueSize(configuration), core);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
index b75f26c..f7fe77f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -80,7 +80,6 @@
             }
         }
         this.byteBuff = 
ByteBufAllocator.DEFAULT.buffer(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-        byteBuff.retain();
         this.record = new GenericRecord<DCPRequest>();
         this.counter = counterStart;
     }
@@ -132,7 +131,7 @@
             if (nextDeleteKey != null) {
                 final String key = nextDeleteKey;
                 nextDeleteKey = null;
-                return new RemoveMessage(nextDeletePartition, key, cas++, 
seq++, 0L, bucket);
+                return new RemoveMessage(0, nextDeletePartition, key, cas++, 
seq++, 0L, bucket);
             }
         }
         generateNextDocument();
@@ -141,15 +140,16 @@
                 final String key = nextUpsertKey;
                 nextUpsertKey = null;
                 upsertCounter++;
-                return new MutationMessage(nextUpsertPartition, key, byteBuff, 
expiration++, seq++, 0, 0, lockTime++,
-                        cas++, bucket);
+                return new MutationMessage(byteBuff.readableBytes(), 
nextUpsertPartition, key, byteBuff, expiration++,
+                        seq++, 0, 0, lockTime++, cas++, bucket);
             }
         }
-        return new MutationMessage(assigned.get(counter % assigned.size()), 
generateKey(), byteBuff, expiration++,
-                seq++, 0, 0, lockTime++, cas++, bucket);
+        return new MutationMessage(byteBuff.readableBytes(), 
assigned.get(counter % assigned.size()), generateKey(),
+                byteBuff, expiration++, seq++, 0, 0, lockTime++, cas++, 
bucket);
     }
 
     private void generateNextDocument() {
+        byteBuff.retain();
         // reset the string
         strBuilder.setLength(0);
         strBuilder.append("{\"id\":" + (counter + upsertCounter) + 
",\"name\":\""

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/805
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699
Gerrit-PatchSet: 12
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>
Gerrit-Reviewer: Chris Hillery <c...@lambda.nu>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to