>From Wail Alkowaileet <[email protected]>:
Wail Alkowaileet has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17631 )
Change subject: [WIP] Introduce cloud client profiler
......................................................................
[WIP] Introduce cloud client profiler
Change-Id: Ic95b766b033f90394ba0081c4a1a6fbd39fb528e
---
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
6 files changed, 267 insertions(+), 11 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/31/17631/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 6c02bbc..8fdaff4 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -39,18 +40,21 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
public class S3BufferedWriter implements ICloudBufferedWriter {
- private final List<CompletedPart> partQueue;
- private final String path;
- private final S3Client s3Client;
- private final String bucket;
- private String uploadId;
- private int partNumber;
private static final int MAX_RETRIES = 3;
private static final Logger LOGGER = LogManager.getLogger();
+ private final S3Client s3Client;
+ private final IRequestProfiler profiler;
+ private final String bucket;
+ private final String path;
+ private final List<CompletedPart> partQueue;
- public S3BufferedWriter(S3Client s3client, String bucket, String path) {
+ private String uploadId;
+ private int partNumber;
+
+ public S3BufferedWriter(S3Client s3client, IRequestProfiler profiler,
String bucket, String path) {
this.s3Client = s3client;
+ this.profiler = profiler;
this.bucket = bucket;
this.path = path;
partQueue = new ArrayList<>();
@@ -58,6 +62,7 @@
@Override
public int upload(InputStream stream, int length) {
+ profiler.incrementMultipartUpload();
setUploadId();
UploadPartRequest upReq =
UploadPartRequest.builder().uploadId(uploadId).partNumber(partNumber).bucket(bucket).key(path).build();
@@ -108,6 +113,7 @@
}
private void completeMultipartUpload(CompleteMultipartUploadRequest
request) throws HyracksDataException {
+ profiler.incrementMultipartUpload();
try {
s3Client.completeMultipartUpload(request);
} catch (Exception e) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index bbe8586..e86b1c6 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud.clients.aws.s3;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.common.config.CloudProperties;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
@@ -25,17 +27,22 @@
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
public class S3ClientConfig {
+ // Log in five minutes intervals between requests
+ private static final long DEFAULT_LOG_INTERVAL =
TimeUnit.MINUTES.toNanos(1);
private final String region;
private final String endpoint;
private final String prefix;
private final boolean anonymousAuth;
+ private final long profilerLogInterval;
public S3ClientConfig(String region, String endpoint, String prefix,
boolean anonymousAuth) {
this.region = region;
this.endpoint = endpoint;
this.prefix = prefix;
this.anonymousAuth = anonymousAuth;
+ // TODO should be configurable
+ profilerLogInterval = DEFAULT_LOG_INTERVAL;
}
public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -64,6 +71,10 @@
return anonymousAuth ? AnonymousCredentialsProvider.create() :
DefaultCredentialsProvider.create();
}
+ public long getProfilerLogInterval() {
+ return profilerLogInterval;
+ }
+
private boolean isS3Mock() {
return endpoint != null && !endpoint.isEmpty();
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 0fb9c08..7efa087 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -41,6 +41,9 @@
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -73,17 +76,24 @@
public class S3CloudClient implements ICloudClient {
private static final Logger LOGGER = LogManager.getLogger();
+ // TODO(htowaileb): Temporary variables, can we get this from the used
instance?
+ private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps
private final S3ClientConfig config;
private final S3Client s3Client;
+ private final IRequestProfiler profiler;
private S3TransferManager s3TransferManager;
- // TODO(htowaileb): Temporary variables, can we get this from the used
instance?
- private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps
-
public S3CloudClient(S3ClientConfig config) {
this.config = config;
s3Client = buildClient();
+ long profilerInterval = config.getProfilerLogInterval();
+ if (profilerInterval > 0) {
+ profiler = new CountRequestProfiler(profilerInterval);
+ } else {
+ profiler = NoOpRequestProfiler.INSTANCE;
+ }
+
}
private S3Client buildClient() {
@@ -104,17 +114,19 @@
@Override
public ICloudBufferedWriter createBufferedWriter(String bucket, String
path) {
- return new S3BufferedWriter(s3Client, bucket, path);
+ return new S3BufferedWriter(s3Client, profiler, bucket, path);
}
@Override
public Set<String> listObjects(String bucket, String path, FilenameFilter
filter) {
+ profiler.incrementListObjects();
path = config.isEncodeKeys() ? encodeURI(path) : path;
return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
}
@Override
public int read(String bucket, String path, long offset, ByteBuffer
buffer) throws HyracksDataException {
+ profiler.incrementGetObject();
long readTo = offset + buffer.remaining();
GetObjectRequest rangeGetObjectRequest =
GetObjectRequest.builder().range("bytes=" + offset + "-" +
readTo).bucket(bucket).key(path).build();
@@ -141,6 +153,7 @@
@Override
public byte[] readAllBytes(String bucket, String path) throws
HyracksDataException {
+ profiler.incrementGetObject();
GetObjectRequest getReq =
GetObjectRequest.builder().bucket(bucket).key(path).build();
try (ResponseInputStream<GetObjectResponse> stream =
s3Client.getObject(getReq)) {
@@ -154,6 +167,7 @@
@Override
public InputStream getObjectStream(String bucket, String path) {
+ profiler.incrementGetObject();
GetObjectRequest getReq =
GetObjectRequest.builder().bucket(bucket).key(path).build();
try {
return s3Client.getObject(getReq);
@@ -165,6 +179,7 @@
@Override
public void write(String bucket, String path, byte[] data) {
+ profiler.incrementWriteObject();
PutObjectRequest putReq =
PutObjectRequest.builder().bucket(bucket).key(path).build();
// TODO(htowaileb): add retry logic here
@@ -175,7 +190,10 @@
public void copy(String bucket, String srcPath, FileReference destPath) {
srcPath = config.isEncodeKeys() ? encodeURI(srcPath) : srcPath;
List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
+
+ profiler.incrementListObjects();
for (S3Object object : objects) {
+ profiler.incrementCopyObject();
String srcKey = object.key();
String destKey =
destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
CopyObjectRequest copyReq =
CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
@@ -191,6 +209,7 @@
return;
}
+ profiler.incrementDeleteObject();
List<ObjectIdentifier> objectIdentifiers = new ArrayList<>();
for (String file : fileList) {
objectIdentifiers.add(ObjectIdentifier.builder().key(file).build());
@@ -202,6 +221,7 @@
@Override
public long getObjectSize(String bucket, String path) throws
HyracksDataException {
+ profiler.incrementGetObject();
try {
return
s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build()).contentLength();
} catch (NoSuchKeyException ex) {
@@ -213,6 +233,7 @@
@Override
public boolean exists(String bucket, String path) throws
HyracksDataException {
+ profiler.incrementGetObject();
try {
s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build());
return true;
@@ -255,6 +276,8 @@
try {
for (CompletableFuture<CompletedDirectoryDownload> download :
downloads) {
+ // multipart download
+ profiler.incrementMultipartDownload();
download.join();
CompletedDirectoryDownload completedDirectoryDownload =
download.get();
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
new file mode 100644
index 0000000..6b5e20e
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class CountRequestProfiler implements IRequestProfiler {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final long logInterval;
+ private final AtomicLong listObjectsCounter;
+ private final AtomicLong getObjectCounter;
+ private final AtomicLong writeObjectCounter;
+ private final AtomicLong deleteObjectCounter;
+ private final AtomicLong copyObjectCounter;
+ private final AtomicLong multipartUploadCounter;
+ private final AtomicLong multipartDownloadCounter;
+ private long lastLogTimestamp;
+
+ public CountRequestProfiler(long logIntervalNanoSec) {
+ this.logInterval = logIntervalNanoSec;
+ listObjectsCounter = new AtomicLong();
+ getObjectCounter = new AtomicLong();
+ writeObjectCounter = new AtomicLong();
+ deleteObjectCounter = new AtomicLong();
+ copyObjectCounter = new AtomicLong();
+ multipartUploadCounter = new AtomicLong();
+ multipartDownloadCounter = new AtomicLong();
+ lastLogTimestamp = System.nanoTime();
+ }
+
+ @Override
+ public void incrementListObjects() {
+ listObjectsCounter.incrementAndGet();
+ log();
+ }
+
+ @Override
+ public void incrementGetObject() {
+ getObjectCounter.incrementAndGet();
+ log();
+ }
+
+ @Override
+ public void incrementWriteObject() {
+ writeObjectCounter.incrementAndGet();
+ log();
+ }
+
+ @Override
+ public void incrementDeleteObject() {
+ deleteObjectCounter.incrementAndGet();
+ log();
+ }
+
+ @Override
+ public void incrementCopyObject() {
+ copyObjectCounter.incrementAndGet();
+ log();
+ }
+
+ @Override
+ public void incrementMultipartUpload() {
+ multipartUploadCounter.incrementAndGet();
+ log();
+ }
+
+ @Override
+ public void incrementMultipartDownload() {
+ multipartDownloadCounter.incrementAndGet();
+ log();
+ }
+
+ private void log() {
+ long currentTime = System.nanoTime();
+ if (currentTime - lastLogTimestamp >= logInterval) {
+ // Might log multiple times
+ lastLogTimestamp = currentTime;
+ ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
+ countersNode.put("listObjectsCounter", listObjectsCounter.get());
+ countersNode.put("getObjectCounter", getObjectCounter.get());
+ countersNode.put("writeObjectCounter", writeObjectCounter.get());
+ countersNode.put("copyObjectCounter", copyObjectCounter.get());
+ countersNode.put("multipartUploadCounter",
multipartUploadCounter.get());
+ countersNode.put("multipartDownloadCounter",
multipartDownloadCounter.get());
+ LOGGER.info("Request counters: {}", countersNode.toString());
+ }
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
new file mode 100644
index 0000000..b54e18c4
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler;
+
+public interface IRequestProfiler {
+ void incrementListObjects();
+
+ void incrementGetObject();
+
+ void incrementWriteObject();
+
+ void incrementDeleteObject();
+
+ void incrementCopyObject();
+
+ void incrementMultipartUpload();
+
+ void incrementMultipartDownload();
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
new file mode 100644
index 0000000..63f7dd0
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler;
+
+public class NoOpRequestProfiler implements IRequestProfiler {
+ public static final IRequestProfiler INSTANCE = new NoOpRequestProfiler();
+
+ private NoOpRequestProfiler() {
+ }
+
+ @Override
+ public void incrementListObjects() {
+ // NoOp
+ }
+
+ @Override
+ public void incrementGetObject() {
+ // NoOp
+ }
+
+ @Override
+ public void incrementWriteObject() {
+ // NoOp
+ }
+
+ @Override
+ public void incrementDeleteObject() {
+ // NoOp
+ }
+
+ @Override
+ public void incrementCopyObject() {
+ // NoOp
+ }
+
+ @Override
+ public void incrementMultipartUpload() {
+ // NoOp
+ }
+
+ @Override
+ public void incrementMultipartDownload() {
+ // NoOp
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17631
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ic95b766b033f90394ba0081c4a1a6fbd39fb528e
Gerrit-Change-Number: 17631
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange