[airavata-mft] 02/05: Local incoming streamer added

2023-03-18 Thread dimuthuupe
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit e536a7d2f0ba13c0ecf438225a8219f2e2e2a28d
Author: Praneeth Chityala 
AuthorDate: Sat Mar 18 01:35:21 2023 -0400

Local incoming streamer added
---
 .../airavata/mft/core/ConnectorResolver.java   |  6 +--
 .../local/LocalIncomingStreamingConnector.java | 57 ++
 2 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java 
b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
index 13824d3..ea355bf 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
@@ -45,9 +45,9 @@ public final class ConnectorResolver {
 case "GCS":
 className = 
"org.apache.airavata.mft.transport.gcp.GCSIncomingStreamingConnector";
 break;
-//case "LOCAL":
-//className = 
"org.apache.airavata.mft.transport.local.LocalIncomingStreamingConnector";
-//break;
+case "LOCAL":
+className = 
"org.apache.airavata.mft.transport.local.LocalIncomingStreamingConnector";
+break;
 }
 
 if (className != null) {
diff --git 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingStreamingConnector.java
 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingStreamingConnector.java
new file mode 100644
index 000..290cccb
--- /dev/null
+++ 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingStreamingConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.local;
+
+
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocalIncomingStreamingConnector implements 
IncomingStreamingConnector{
+
+private String resourcePath;
+
+private static final Logger logger = 
LoggerFactory.getLogger(LocalIncomingStreamingConnector.class);
+
+@Override
+public void init(ConnectorConfig connectorConfig) throws Exception {
+this.resourcePath = connectorConfig.getResourcePath();
+}
+
+@Override
+public void complete() throws Exception {
+logger.info("File {} successfully received", this.resourcePath);
+}
+
+@Override
+public void failed() throws Exception {
+logger.error("Failed while receiving file {}", this.resourcePath);
+}
+
+@Override
+public InputStream fetchInputStream() throws Exception {
+InputStream from = new FileInputStream(new File(this.resourcePath));
+
+return from;
+}
+}



[airavata-mft] 04/05: S3 Outgoing bufferStreaming update

2023-03-18 Thread dimuthuupe
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit 29c05e15b5f2c9c57ad916a1de214c2a7d99f9fc
Author: Praneeth Chityala 
AuthorDate: Sat Mar 18 01:36:59 2023 -0400

S3 Outgoing bufferStreaming update
---
 .../org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index 4803b32..6dfa022 100644
--- 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -36,7 +36,9 @@ import 
org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -61,7 +63,6 @@ public class S3OutgoingConnector implements 
OutgoingChunkedConnector {
 public void init(ConnectorConfig cc) throws Exception {
 
 this.resourcePath = cc.getResourcePath();
-this.resourceLength = cc.getMetadata().getFile().getResourceSize();
 
 s3Storage = cc.getStorage().getS3();
 
@@ -92,7 +93,8 @@ public class S3OutgoingConnector implements 
OutgoingChunkedConnector {
 .withPartNumber(chunkId + 1)
 .withFileOffset(0)
 //.withMD5Digest(Md5Utils.md5AsBase64(new 
File(uploadFile)))
-.withFile(file)
+//.withFile(file)
+.withInputStream(new BufferedInputStream(new 
FileInputStream(file), Math.min(16 * 1024 * 1024, (int) ( endByte - 
startByte
 .withPartSize(file.length());
 
 UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
@@ -137,6 +139,6 @@ public class S3OutgoingConnector implements 
OutgoingChunkedConnector {
 
 @Override
 public void failed() throws Exception {
-
+logger.error("S3 failed to upload chunk to bucket {} for resource path 
{}", s3Storage.getBucketName(), resourcePath);
 }
 }



[airavata-mft] 05/05: consul update to accept all

2023-03-18 Thread dimuthuupe
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit badd42c810e3b58a1d266b204b39b12ab77293fe
Author: Praneeth Chityala 
AuthorDate: Sat Mar 18 01:37:36 2023 -0400

consul update to accept all
---
 scripts/start-consul.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/scripts/start-consul.sh b/scripts/start-consul.sh
index 74a79f2..fcc4a4e 100755
--- a/scripts/start-consul.sh
+++ b/scripts/start-consul.sh
@@ -27,7 +27,7 @@ case $1 in
 curl -O 
https://releases.hashicorp.com/consul/1.7.1/consul_1.7.1_darwin_amd64.zip
 unzip -o consul_1.7.1_darwin_amd64.zip -d ../airavata-mft/consul
 rm consul_1.7.1_darwin_amd64.zip
-nohup ../airavata-mft/consul/consul agent -dev > $LOG_FILE 2>&1 &
+nohup ../airavata-mft/consul/consul agent -dev -client 0.0.0.0 > 
$LOG_FILE 2>&1 &
 echo $! > $PID_PATH_NAME
 echo "Consul started"
 else



[airavata-mft] branch master updated (9c9e48c -> badd42c)

2023-03-18 Thread dimuthuupe
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


from 9c9e48c  Turning on tcp keep alive for S3 clients
 new 27dca07  gRPC ports config
 new e536a7d  Local incoming streamer added
 new 33be009  Local incoming chunked update
 new 29c05e1  S3 Outgoing bufferStreaming update
 new badd42c  consul update to accept all

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../airavata/mft/core/ConnectorResolver.java   |  6 +-
 python-cli/mft_cli/airavata_mft_cli/config.py  |  7 ++
 python-cli/mft_cli/airavata_mft_cli/operations.py  | 51 +++---
 .../mft_cli/airavata_mft_cli/storage/__init__.py   | 17 +++--
 .../mft_cli/airavata_mft_cli/storage/azure.py  | 17 +++--
 python-cli/mft_cli/airavata_mft_cli/storage/gcs.py | 17 +++--
 .../mft_cli/airavata_mft_cli/storage/local.py  | 17 +++--
 python-cli/mft_cli/airavata_mft_cli/storage/s3.py  | 17 +++--
 .../mft_cli/airavata_mft_cli/storage/swift.py  | 17 +++--
 scripts/start-consul.sh|  2 +-
 .../local/LocalIncomingChunkedConnector.java   | 81 +++---
 ...r.java => LocalIncomingStreamingConnector.java} | 59 ++--
 .../mft/transport/s3/S3OutgoingConnector.java  |  8 ++-
 13 files changed, 147 insertions(+), 169 deletions(-)
 create mode 100644 python-cli/mft_cli/airavata_mft_cli/config.py
 copy 
transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/{LocalIncomingChunkedConnector.java
 => LocalIncomingStreamingConnector.java} (53%)



[airavata-mft] 01/05: gRPC ports config

2023-03-18 Thread dimuthuupe
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit 27dca078dd7582646c8b9232beb58f526a1603f9
Author: Praneeth Chityala 
AuthorDate: Sat Mar 18 01:32:57 2023 -0400

gRPC ports config
---
 python-cli/mft_cli/airavata_mft_cli/config.py  |  7 +++
 python-cli/mft_cli/airavata_mft_cli/operations.py  | 51 --
 .../mft_cli/airavata_mft_cli/storage/__init__.py   | 17 +---
 .../mft_cli/airavata_mft_cli/storage/azure.py  | 17 +---
 python-cli/mft_cli/airavata_mft_cli/storage/gcs.py | 17 +---
 .../mft_cli/airavata_mft_cli/storage/local.py  | 17 +---
 python-cli/mft_cli/airavata_mft_cli/storage/s3.py  | 17 +---
 .../mft_cli/airavata_mft_cli/storage/swift.py  | 17 +---
 8 files changed, 94 insertions(+), 66 deletions(-)

diff --git a/python-cli/mft_cli/airavata_mft_cli/config.py 
b/python-cli/mft_cli/airavata_mft_cli/config.py
new file mode 100644
index 000..b5be669
--- /dev/null
+++ b/python-cli/mft_cli/airavata_mft_cli/config.py
@@ -0,0 +1,7 @@
+transfer_api_port = 7004
+transfer_api_secured = False
+resource_service_host = "localhost"
+resource_service_port = 7002
+resource_service_secured = False
+secret_service_host = "localhost"
+secret_service_port = 7003
\ No newline at end of file
diff --git a/python-cli/mft_cli/airavata_mft_cli/operations.py 
b/python-cli/mft_cli/airavata_mft_cli/operations.py
index e6b9aec..40fbe64 100644
--- a/python-cli/mft_cli/airavata_mft_cli/operations.py
+++ b/python-cli/mft_cli/airavata_mft_cli/operations.py
@@ -23,15 +23,18 @@ from airavata_mft_sdk import MFTTransferApi_pb2
 from rich.console import Console
 from rich.table import Table
 import time
+import sys
+sys.path.append('.')
+from . import config as configcli
 
 def fetch_storage_and_secret_ids(storage_name):
-  client = mft_client.MFTClient(transfer_api_port = 7003,
-transfer_api_secured = False,
-resource_service_host = "localhost",
-resource_service_port = 7003,
-resource_service_secured = False,
-secret_service_host = "localhost",
-secret_service_port = 7003)
+  client = mft_client.MFTClient(transfer_api_port = 
configcli.transfer_api_port,
+transfer_api_secured = 
configcli.transfer_api_secured,
+resource_service_host = 
configcli.resource_service_host,
+resource_service_port = 
configcli.resource_service_port,
+resource_service_secured = 
configcli.resource_service_secured,
+secret_service_host = 
configcli.secret_service_host,
+secret_service_port = 
configcli.secret_service_port)
   search_req = StorageCommon_pb2.StorageSearchRequest(storageName=storage_name)
   storages = client.common_api.searchStorages(search_req)
 
@@ -68,13 +71,13 @@ def get_resource_metadata(storage_path, recursive_search = 
False):
 resourcePath = 
resource_path)
   resource_medata_req = 
MFTTransferApi_pb2.FetchResourceMetadataRequest(idRequest = id_req)
 
-  client = mft_client.MFTClient(transfer_api_port = 7003,
-transfer_api_secured = False,
-resource_service_host = "localhost",
-resource_service_port = 7003,
-resource_service_secured = False,
-secret_service_host = "localhost",
-secret_service_port = 7003)
+  client = mft_client.MFTClient(transfer_api_port = 
configcli.transfer_api_port,
+transfer_api_secured = 
configcli.transfer_api_secured,
+resource_service_host = 
configcli.resource_service_host,
+resource_service_port = 
configcli.resource_service_port,
+resource_service_secured = 
configcli.resource_service_secured,
+secret_service_host = 
configcli.secret_service_host,
+secret_service_port = 
configcli.secret_service_port)
 
   metadata_resp = client.transfer_api.resourceMetadata(resource_medata_req)
   return metadata_resp
@@ -167,18 +170,18 @@ def copy(source, destination):
   " files to be transferred. Total volume is " + 
str(total_volume)
   + " bytes. Do you want to start the transfer? ", 
True)
 
-  client = mft_client.MFTClient(transfer_api_port = 7003,
-transfer_api_secured = False,
-

[airavata-mft] 03/05: Local incoming chunked update

2023-03-18 Thread dimuthuupe
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit 33be009066c43d32db9c89da96dd6444643fd231
Author: Praneeth Chityala 
AuthorDate: Sat Mar 18 01:36:13 2023 -0400

Local incoming chunked update
---
 .../local/LocalIncomingChunkedConnector.java   | 81 +++---
 1 file changed, 39 insertions(+), 42 deletions(-)

diff --git 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java
 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java
index 91f5de6..dd388ab 100644
--- 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java
+++ 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java
@@ -21,22 +21,24 @@ package org.apache.airavata.mft.transport.local;
 import org.apache.airavata.mft.core.api.ConnectorConfig;
 import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
 
-import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.File;
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class LocalIncomingChunkedConnector implements IncomingChunkedConnector 
{
 
 private String resourcePath;
+private long resourceSize;
 
 private static final Logger logger = 
LoggerFactory.getLogger(LocalIncomingChunkedConnector.class);
 
 @Override
 public void init(ConnectorConfig connectorConfig) throws Exception {
 this.resourcePath = connectorConfig.getResourcePath();
+this.resourceSize = 
connectorConfig.getMetadata().getFile().getResourceSize();
 }
 
 @Override
@@ -53,45 +55,40 @@ public class LocalIncomingChunkedConnector implements 
IncomingChunkedConnector {
 @Override
 public void downloadChunk(int chunkId, long startByte, long endByte, 
String downloadFile) throws Exception {
 
-FileInputStream from = new FileInputStream(new 
File(this.resourcePath));
-FileOutputStream to = new FileOutputStream(new File(downloadFile));
-
-final int buffLen = 1024;
-
-byte[] buf = new byte[buffLen];
-
-from.skip(startByte);
-
-long fileSize = endByte - startByte + 1;
-
-while (true) {
-int bufSize = 0;
-
-if (buffLen < fileSize) {
-bufSize = buffLen;
-} else {
-bufSize = (int) fileSize;
-}
-
-bufSize = (int) from.read(buf, 0, bufSize);
-
-if (bufSize < 0) {
-break;
-}
-
-to.write(buf, 0, bufSize);
-to.flush();
-
-fileSize -= bufSize;
-
-if (fileSize == 0L) {
-break;
+logger.info("Downloading chunk {} with start byte {} and end byte {} 
to file {} from resource path {}",
+chunkId, startByte, endByte, downloadFile, this.resourcePath);
+
+//#use this code on a DMA enabled device
+//if (resourceSize <= endByte - startByte) {
+//Files.copy(Path.of(this.resourcePath), Path.of(downloadFile));
+//} else {
+//try (FileInputStream from = new 
FileInputStream(this.resourcePath);
+// FileOutputStream to = new FileOutputStream(downloadFile)) {
+//from.getChannel().transferTo(startByte, endByte - startByte, 
to.getChannel());
+//} catch (Exception e) {
+//logger.error("Unexpected error occurred while downloading 
chunk {} to file {} from resource path {}",
+//chunkId, downloadFile, this.resourcePath, e);
+//throw e;
+//}
+//}
+
+int buffLen = 1024 * 1024 * 16;
+try (BufferedInputStream bis = new BufferedInputStream(new 
FileInputStream(this.resourcePath),buffLen);
+ BufferedOutputStream bos = new BufferedOutputStream(new 
FileOutputStream(downloadFile))) {
+byte[] buffer = new byte[buffLen];
+int read = 0;
+long totalRead = bis.skip(startByte);
+while ((read = bis.read(buffer,0,Math.min(buffLen, (int) (endByte 
- totalRead  > 0) {
+bos.write(buffer, 0, read);
+totalRead += read;
 }
+bis.close();
+bos.close();
+} catch (Exception e) {
+logger.error("Unexpected error occurred while downloading chunk {} 
to file {} from resource path {}",
+chunkId, downloadFile, this.resourcePath, e);
+throw e;
 }
-
-from.close();
-to.close();
-
 }
 
 @Override
@@ -101,6 +98,6 @@ public class