>From Wail Alkowaileet <[email protected]>:

Wail Alkowaileet has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17631 )


Change subject: [WIP] Introduce cloud client profiler
......................................................................

[WIP] Introduce cloud client profiler

Change-Id: Ic95b766b033f90394ba0081c4a1a6fbd39fb528e
---
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
6 files changed, 267 insertions(+), 11 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/31/17631/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 6c02bbc..8fdaff4 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.TimeUnit;

 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -39,18 +40,21 @@
 import software.amazon.awssdk.services.s3.model.UploadPartRequest;

 public class S3BufferedWriter implements ICloudBufferedWriter {
-    private final List<CompletedPart> partQueue;
-    private final String path;
-    private final S3Client s3Client;
-    private final String bucket;
-    private String uploadId;
-    private int partNumber;
     private static final int MAX_RETRIES = 3;

     private static final Logger LOGGER = LogManager.getLogger();
+    private final S3Client s3Client;
+    private final IRequestProfiler profiler;
+    private final String bucket;
+    private final String path;
+    private final List<CompletedPart> partQueue;

-    public S3BufferedWriter(S3Client s3client, String bucket, String path) {
+    private String uploadId;
+    private int partNumber;
+
+    public S3BufferedWriter(S3Client s3client, IRequestProfiler profiler, 
String bucket, String path) {
         this.s3Client = s3client;
+        this.profiler = profiler;
         this.bucket = bucket;
         this.path = path;
         partQueue = new ArrayList<>();
@@ -58,6 +62,7 @@

     @Override
     public int upload(InputStream stream, int length) {
+        profiler.incrementMultipartUpload();
         setUploadId();
         UploadPartRequest upReq =
                 
UploadPartRequest.builder().uploadId(uploadId).partNumber(partNumber).bucket(bucket).key(path).build();
@@ -108,6 +113,7 @@
     }

     private void completeMultipartUpload(CompleteMultipartUploadRequest 
request) throws HyracksDataException {
+        profiler.incrementMultipartUpload();
         try {
             s3Client.completeMultipartUpload(request);
         } catch (Exception e) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index bbe8586..e86b1c6 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.clients.aws.s3;

+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.common.config.CloudProperties;

 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
@@ -25,17 +27,22 @@
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

 public class S3ClientConfig {
+    // Log in five minutes intervals between requests
+    private static final long DEFAULT_LOG_INTERVAL = 
TimeUnit.MINUTES.toNanos(1);

     private final String region;
     private final String endpoint;
     private final String prefix;
     private final boolean anonymousAuth;
+    private final long profilerLogInterval;

     public S3ClientConfig(String region, String endpoint, String prefix, 
boolean anonymousAuth) {
         this.region = region;
         this.endpoint = endpoint;
         this.prefix = prefix;
         this.anonymousAuth = anonymousAuth;
+        // TODO should be configurable
+        profilerLogInterval = DEFAULT_LOG_INTERVAL;
     }

     public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -64,6 +71,10 @@
         return anonymousAuth ? AnonymousCredentialsProvider.create() : 
DefaultCredentialsProvider.create();
     }

+    public long getProfilerLogInterval() {
+        return profilerLogInterval;
+    }
+
     private boolean isS3Mock() {
         return endpoint != null && !endpoint.isEmpty();
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 0fb9c08..7efa087 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -41,6 +41,9 @@

 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -73,17 +76,24 @@
 public class S3CloudClient implements ICloudClient {

     private static final Logger LOGGER = LogManager.getLogger();
+    // TODO(htowaileb): Temporary variables, can we get this from the used 
instance?
+    private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps

     private final S3ClientConfig config;
     private final S3Client s3Client;
+    private final IRequestProfiler profiler;
     private S3TransferManager s3TransferManager;

-    // TODO(htowaileb): Temporary variables, can we get this from the used 
instance?
-    private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps
-
     public S3CloudClient(S3ClientConfig config) {
         this.config = config;
         s3Client = buildClient();
+        long profilerInterval = config.getProfilerLogInterval();
+        if (profilerInterval > 0) {
+            profiler = new CountRequestProfiler(profilerInterval);
+        } else {
+            profiler = NoOpRequestProfiler.INSTANCE;
+        }
+
     }

     private S3Client buildClient() {
@@ -104,17 +114,19 @@

     @Override
     public ICloudBufferedWriter createBufferedWriter(String bucket, String 
path) {
-        return new S3BufferedWriter(s3Client, bucket, path);
+        return new S3BufferedWriter(s3Client, profiler, bucket, path);
     }

     @Override
     public Set<String> listObjects(String bucket, String path, FilenameFilter 
filter) {
+        profiler.incrementListObjects();
         path = config.isEncodeKeys() ? encodeURI(path) : path;
         return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
     }

     @Override
     public int read(String bucket, String path, long offset, ByteBuffer 
buffer) throws HyracksDataException {
+        profiler.incrementGetObject();
         long readTo = offset + buffer.remaining();
         GetObjectRequest rangeGetObjectRequest =
                 GetObjectRequest.builder().range("bytes=" + offset + "-" + 
readTo).bucket(bucket).key(path).build();
@@ -141,6 +153,7 @@

     @Override
     public byte[] readAllBytes(String bucket, String path) throws 
HyracksDataException {
+        profiler.incrementGetObject();
         GetObjectRequest getReq = 
GetObjectRequest.builder().bucket(bucket).key(path).build();

         try (ResponseInputStream<GetObjectResponse> stream = 
s3Client.getObject(getReq)) {
@@ -154,6 +167,7 @@

     @Override
     public InputStream getObjectStream(String bucket, String path) {
+        profiler.incrementGetObject();
         GetObjectRequest getReq = 
GetObjectRequest.builder().bucket(bucket).key(path).build();
         try {
             return s3Client.getObject(getReq);
@@ -165,6 +179,7 @@

     @Override
     public void write(String bucket, String path, byte[] data) {
+        profiler.incrementWriteObject();
         PutObjectRequest putReq = 
PutObjectRequest.builder().bucket(bucket).key(path).build();

         // TODO(htowaileb): add retry logic here
@@ -175,7 +190,10 @@
     public void copy(String bucket, String srcPath, FileReference destPath) {
         srcPath = config.isEncodeKeys() ? encodeURI(srcPath) : srcPath;
         List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
+
+        profiler.incrementListObjects();
         for (S3Object object : objects) {
+            profiler.incrementCopyObject();
             String srcKey = object.key();
             String destKey = 
destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
             CopyObjectRequest copyReq = 
CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
@@ -191,6 +209,7 @@
             return;
         }

+        profiler.incrementDeleteObject();
         List<ObjectIdentifier> objectIdentifiers = new ArrayList<>();
         for (String file : fileList) {
             
objectIdentifiers.add(ObjectIdentifier.builder().key(file).build());
@@ -202,6 +221,7 @@

     @Override
     public long getObjectSize(String bucket, String path) throws 
HyracksDataException {
+        profiler.incrementGetObject();
         try {
             return 
s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build()).contentLength();
         } catch (NoSuchKeyException ex) {
@@ -213,6 +233,7 @@

     @Override
     public boolean exists(String bucket, String path) throws 
HyracksDataException {
+        profiler.incrementGetObject();
         try {
             
s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build());
             return true;
@@ -255,6 +276,8 @@

         try {
             for (CompletableFuture<CompletedDirectoryDownload> download : 
downloads) {
+                // multipart download
+                profiler.incrementMultipartDownload();
                 download.join();
                 CompletedDirectoryDownload completedDirectoryDownload = 
download.get();

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
new file mode 100644
index 0000000..6b5e20e
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class CountRequestProfiler implements IRequestProfiler {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final long logInterval;
+    private final AtomicLong listObjectsCounter;
+    private final AtomicLong getObjectCounter;
+    private final AtomicLong writeObjectCounter;
+    private final AtomicLong deleteObjectCounter;
+    private final AtomicLong copyObjectCounter;
+    private final AtomicLong multipartUploadCounter;
+    private final AtomicLong multipartDownloadCounter;
+    private long lastLogTimestamp;
+
+    public CountRequestProfiler(long logIntervalNanoSec) {
+        this.logInterval = logIntervalNanoSec;
+        listObjectsCounter = new AtomicLong();
+        getObjectCounter = new AtomicLong();
+        writeObjectCounter = new AtomicLong();
+        deleteObjectCounter = new AtomicLong();
+        copyObjectCounter = new AtomicLong();
+        multipartUploadCounter = new AtomicLong();
+        multipartDownloadCounter = new AtomicLong();
+        lastLogTimestamp = System.nanoTime();
+    }
+
+    @Override
+    public void incrementListObjects() {
+        listObjectsCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void incrementGetObject() {
+        getObjectCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void incrementWriteObject() {
+        writeObjectCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void incrementDeleteObject() {
+        deleteObjectCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void incrementCopyObject() {
+        copyObjectCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void incrementMultipartUpload() {
+        multipartUploadCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void incrementMultipartDownload() {
+        multipartDownloadCounter.incrementAndGet();
+        log();
+    }
+
+    private void log() {
+        long currentTime = System.nanoTime();
+        if (currentTime - lastLogTimestamp >= logInterval) {
+            // Might log multiple times
+            lastLogTimestamp = currentTime;
+            ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
+            countersNode.put("listObjectsCounter", listObjectsCounter.get());
+            countersNode.put("getObjectCounter", getObjectCounter.get());
+            countersNode.put("writeObjectCounter", writeObjectCounter.get());
+            countersNode.put("copyObjectCounter", copyObjectCounter.get());
+            countersNode.put("multipartUploadCounter", 
multipartUploadCounter.get());
+            countersNode.put("multipartDownloadCounter", 
multipartDownloadCounter.get());
+            LOGGER.info("Request counters: {}", countersNode.toString());
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
new file mode 100644
index 0000000..b54e18c4
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler;
+
+public interface IRequestProfiler {
+    void incrementListObjects();
+
+    void incrementGetObject();
+
+    void incrementWriteObject();
+
+    void incrementDeleteObject();
+
+    void incrementCopyObject();
+
+    void incrementMultipartUpload();
+
+    void incrementMultipartDownload();
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
new file mode 100644
index 0000000..63f7dd0
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler;
+
+public class NoOpRequestProfiler implements IRequestProfiler {
+    public static final IRequestProfiler INSTANCE = new NoOpRequestProfiler();
+
+    private NoOpRequestProfiler() {
+    }
+
+    @Override
+    public void incrementListObjects() {
+        // NoOp
+    }
+
+    @Override
+    public void incrementGetObject() {
+        // NoOp
+    }
+
+    @Override
+    public void incrementWriteObject() {
+        // NoOp
+    }
+
+    @Override
+    public void incrementDeleteObject() {
+        // NoOp
+    }
+
+    @Override
+    public void incrementCopyObject() {
+        // NoOp
+    }
+
+    @Override
+    public void incrementMultipartUpload() {
+        // NoOp
+    }
+
+    @Override
+    public void incrementMultipartDownload() {
+        // NoOp
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17631
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ic95b766b033f90394ba0081c4a1a6fbd39fb528e
Gerrit-Change-Number: 17631
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange

Reply via email to