[airavata-mft] 02/05: Local incoming streamer added
This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git commit e536a7d2f0ba13c0ecf438225a8219f2e2e2a28d Author: Praneeth Chityala AuthorDate: Sat Mar 18 01:35:21 2023 -0400 Local incoming streamer added --- .../airavata/mft/core/ConnectorResolver.java | 6 +-- .../local/LocalIncomingStreamingConnector.java | 57 ++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java index 13824d3..ea355bf 100644 --- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java +++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java @@ -45,9 +45,9 @@ public final class ConnectorResolver { case "GCS": className = "org.apache.airavata.mft.transport.gcp.GCSIncomingStreamingConnector"; break; -//case "LOCAL": -//className = "org.apache.airavata.mft.transport.local.LocalIncomingStreamingConnector"; -//break; +case "LOCAL": +className = "org.apache.airavata.mft.transport.local.LocalIncomingStreamingConnector"; +break; } if (className != null) { diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingStreamingConnector.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingStreamingConnector.java new file mode 100644 index 000..290cccb --- /dev/null +++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingStreamingConnector.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.mft.transport.local; + + +import org.apache.airavata.mft.core.api.ConnectorConfig; +import org.apache.airavata.mft.core.api.IncomingStreamingConnector; + +import java.io.InputStream; +import java.io.FileInputStream; +import java.io.File; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LocalIncomingStreamingConnector implements IncomingStreamingConnector{ + +private String resourcePath; + +private static final Logger logger = LoggerFactory.getLogger(LocalIncomingStreamingConnector.class); + +@Override +public void init(ConnectorConfig connectorConfig) throws Exception { +this.resourcePath = connectorConfig.getResourcePath(); +} + +@Override +public void complete() throws Exception { +logger.info("File {} successfully received", this.resourcePath); +} + +@Override +public void failed() throws Exception { +logger.error("Failed while receiving file {}", this.resourcePath); +} + +@Override +public InputStream fetchInputStream() throws Exception { +InputStream from = new FileInputStream(new File(this.resourcePath)); + +return from; +} +}
[airavata-mft] 04/05: S3 Outgoing bufferStreaming update
This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git commit 29c05e15b5f2c9c57ad916a1de214c2a7d99f9fc Author: Praneeth Chityala AuthorDate: Sat Mar 18 01:36:59 2023 -0400 S3 Outgoing bufferStreaming update --- .../org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java index 4803b32..6dfa022 100644 --- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java +++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java @@ -36,7 +36,9 @@ import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; @@ -61,7 +63,6 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector { public void init(ConnectorConfig cc) throws Exception { this.resourcePath = cc.getResourcePath(); -this.resourceLength = cc.getMetadata().getFile().getResourceSize(); s3Storage = cc.getStorage().getS3(); @@ -92,7 +93,8 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector { .withPartNumber(chunkId + 1) .withFileOffset(0) //.withMD5Digest(Md5Utils.md5AsBase64(new File(uploadFile))) -.withFile(file) +//.withFile(file) +.withInputStream(new BufferedInputStream(new FileInputStream(file), Math.min(16 * 1024 * 1024, (int) ( endByte - startByte .withPartSize(file.length()); UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest); @@ -137,6 +139,6 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector { @Override public void failed() throws Exception { - +logger.error("S3 failed to upload chunk to bucket {} for resource path {}", s3Storage.getBucketName(), resourcePath); } }
[airavata-mft] 05/05: consul update to accept all
This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git commit badd42c810e3b58a1d266b204b39b12ab77293fe Author: Praneeth Chityala AuthorDate: Sat Mar 18 01:37:36 2023 -0400 consul update to accept all --- scripts/start-consul.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/start-consul.sh b/scripts/start-consul.sh index 74a79f2..fcc4a4e 100755 --- a/scripts/start-consul.sh +++ b/scripts/start-consul.sh @@ -27,7 +27,7 @@ case $1 in curl -O https://releases.hashicorp.com/consul/1.7.1/consul_1.7.1_darwin_amd64.zip unzip -o consul_1.7.1_darwin_amd64.zip -d ../airavata-mft/consul rm consul_1.7.1_darwin_amd64.zip -nohup ../airavata-mft/consul/consul agent -dev > $LOG_FILE 2>&1 & +nohup ../airavata-mft/consul/consul agent -dev -client 0.0.0.0 > $LOG_FILE 2>&1 & echo $! > $PID_PATH_NAME echo "Consul started" else
[airavata-mft] branch master updated (9c9e48c -> badd42c)
This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git from 9c9e48c Turning on tcp keep alive for S3 clients new 27dca07 gRPC ports config new e536a7d Local incoming streamer added new 33be009 Local incoming chunked update new 29c05e1 S3 Outgoing bufferStreaming update new badd42c consul update to accept all The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../airavata/mft/core/ConnectorResolver.java | 6 +- python-cli/mft_cli/airavata_mft_cli/config.py | 7 ++ python-cli/mft_cli/airavata_mft_cli/operations.py | 51 +++--- .../mft_cli/airavata_mft_cli/storage/__init__.py | 17 +++-- .../mft_cli/airavata_mft_cli/storage/azure.py | 17 +++-- python-cli/mft_cli/airavata_mft_cli/storage/gcs.py | 17 +++-- .../mft_cli/airavata_mft_cli/storage/local.py | 17 +++-- python-cli/mft_cli/airavata_mft_cli/storage/s3.py | 17 +++-- .../mft_cli/airavata_mft_cli/storage/swift.py | 17 +++-- scripts/start-consul.sh| 2 +- .../local/LocalIncomingChunkedConnector.java | 81 +++--- ...r.java => LocalIncomingStreamingConnector.java} | 59 ++-- .../mft/transport/s3/S3OutgoingConnector.java | 8 ++- 13 files changed, 147 insertions(+), 169 deletions(-) create mode 100644 python-cli/mft_cli/airavata_mft_cli/config.py copy transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/{LocalIncomingChunkedConnector.java => LocalIncomingStreamingConnector.java} (53%)
[airavata-mft] 01/05: gRPC ports config
This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git commit 27dca078dd7582646c8b9232beb58f526a1603f9 Author: Praneeth Chityala AuthorDate: Sat Mar 18 01:32:57 2023 -0400 gRPC ports config --- python-cli/mft_cli/airavata_mft_cli/config.py | 7 +++ python-cli/mft_cli/airavata_mft_cli/operations.py | 51 -- .../mft_cli/airavata_mft_cli/storage/__init__.py | 17 +--- .../mft_cli/airavata_mft_cli/storage/azure.py | 17 +--- python-cli/mft_cli/airavata_mft_cli/storage/gcs.py | 17 +--- .../mft_cli/airavata_mft_cli/storage/local.py | 17 +--- python-cli/mft_cli/airavata_mft_cli/storage/s3.py | 17 +--- .../mft_cli/airavata_mft_cli/storage/swift.py | 17 +--- 8 files changed, 94 insertions(+), 66 deletions(-) diff --git a/python-cli/mft_cli/airavata_mft_cli/config.py b/python-cli/mft_cli/airavata_mft_cli/config.py new file mode 100644 index 000..b5be669 --- /dev/null +++ b/python-cli/mft_cli/airavata_mft_cli/config.py @@ -0,0 +1,7 @@ +transfer_api_port = 7004 +transfer_api_secured = False +resource_service_host = "localhost" +resource_service_port = 7002 +resource_service_secured = False +secret_service_host = "localhost" +secret_service_port = 7003 \ No newline at end of file diff --git a/python-cli/mft_cli/airavata_mft_cli/operations.py b/python-cli/mft_cli/airavata_mft_cli/operations.py index e6b9aec..40fbe64 100644 --- a/python-cli/mft_cli/airavata_mft_cli/operations.py +++ b/python-cli/mft_cli/airavata_mft_cli/operations.py @@ -23,15 +23,18 @@ from airavata_mft_sdk import MFTTransferApi_pb2 from rich.console import Console from rich.table import Table import time +import sys +sys.path.append('.') +from . import config as configcli def fetch_storage_and_secret_ids(storage_name): - client = mft_client.MFTClient(transfer_api_port = 7003, -transfer_api_secured = False, -resource_service_host = "localhost", -resource_service_port = 7003, -resource_service_secured = False, -secret_service_host = "localhost", -secret_service_port = 7003) + client = mft_client.MFTClient(transfer_api_port = configcli.transfer_api_port, +transfer_api_secured = configcli.transfer_api_secured, +resource_service_host = configcli.resource_service_host, +resource_service_port = configcli.resource_service_port, +resource_service_secured = configcli.resource_service_secured, +secret_service_host = configcli.secret_service_host, +secret_service_port = configcli.secret_service_port) search_req = StorageCommon_pb2.StorageSearchRequest(storageName=storage_name) storages = client.common_api.searchStorages(search_req) @@ -68,13 +71,13 @@ def get_resource_metadata(storage_path, recursive_search = False): resourcePath = resource_path) resource_medata_req = MFTTransferApi_pb2.FetchResourceMetadataRequest(idRequest = id_req) - client = mft_client.MFTClient(transfer_api_port = 7003, -transfer_api_secured = False, -resource_service_host = "localhost", -resource_service_port = 7003, -resource_service_secured = False, -secret_service_host = "localhost", -secret_service_port = 7003) + client = mft_client.MFTClient(transfer_api_port = configcli.transfer_api_port, +transfer_api_secured = configcli.transfer_api_secured, +resource_service_host = configcli.resource_service_host, +resource_service_port = configcli.resource_service_port, +resource_service_secured = configcli.resource_service_secured, +secret_service_host = configcli.secret_service_host, +secret_service_port = configcli.secret_service_port) metadata_resp = client.transfer_api.resourceMetadata(resource_medata_req) return metadata_resp @@ -167,18 +170,18 @@ def copy(source, destination): " files to be transferred. Total volume is " + str(total_volume) + " bytes. Do you want to start the transfer? ", True) - client = mft_client.MFTClient(transfer_api_port = 7003, -transfer_api_secured = False, -
[airavata-mft] 03/05: Local incoming chunked update
This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git commit 33be009066c43d32db9c89da96dd6444643fd231 Author: Praneeth Chityala AuthorDate: Sat Mar 18 01:36:13 2023 -0400 Local incoming chunked update --- .../local/LocalIncomingChunkedConnector.java | 81 +++--- 1 file changed, 39 insertions(+), 42 deletions(-) diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java index 91f5de6..dd388ab 100644 --- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java +++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java @@ -21,22 +21,24 @@ package org.apache.airavata.mft.transport.local; import org.apache.airavata.mft.core.api.ConnectorConfig; import org.apache.airavata.mft.core.api.IncomingChunkedConnector; -import java.io.InputStream; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.File; +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LocalIncomingChunkedConnector implements IncomingChunkedConnector { private String resourcePath; +private long resourceSize; private static final Logger logger = LoggerFactory.getLogger(LocalIncomingChunkedConnector.class); @Override public void init(ConnectorConfig connectorConfig) throws Exception { this.resourcePath = connectorConfig.getResourcePath(); +this.resourceSize = connectorConfig.getMetadata().getFile().getResourceSize(); } @Override @@ -53,45 +55,40 @@ public class LocalIncomingChunkedConnector implements IncomingChunkedConnector { @Override public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception { -FileInputStream from = new FileInputStream(new File(this.resourcePath)); -FileOutputStream to = new FileOutputStream(new File(downloadFile)); - -final int buffLen = 1024; - -byte[] buf = new byte[buffLen]; - -from.skip(startByte); - -long fileSize = endByte - startByte + 1; - -while (true) { -int bufSize = 0; - -if (buffLen < fileSize) { -bufSize = buffLen; -} else { -bufSize = (int) fileSize; -} - -bufSize = (int) from.read(buf, 0, bufSize); - -if (bufSize < 0) { -break; -} - -to.write(buf, 0, bufSize); -to.flush(); - -fileSize -= bufSize; - -if (fileSize == 0L) { -break; +logger.info("Downloading chunk {} with start byte {} and end byte {} to file {} from resource path {}", +chunkId, startByte, endByte, downloadFile, this.resourcePath); + +//#use this code on a DMA enabled device +//if (resourceSize <= endByte - startByte) { +//Files.copy(Path.of(this.resourcePath), Path.of(downloadFile)); +//} else { +//try (FileInputStream from = new FileInputStream(this.resourcePath); +// FileOutputStream to = new FileOutputStream(downloadFile)) { +//from.getChannel().transferTo(startByte, endByte - startByte, to.getChannel()); +//} catch (Exception e) { +//logger.error("Unexpected error occurred while downloading chunk {} to file {} from resource path {}", +//chunkId, downloadFile, this.resourcePath, e); +//throw e; +//} +//} + +int buffLen = 1024 * 1024 * 16; +try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(this.resourcePath),buffLen); + BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(downloadFile))) { +byte[] buffer = new byte[buffLen]; +int read = 0; +long totalRead = bis.skip(startByte); +while ((read = bis.read(buffer,0,Math.min(buffLen, (int) (endByte - totalRead > 0) { +bos.write(buffer, 0, read); +totalRead += read; } +bis.close(); +bos.close(); +} catch (Exception e) { +logger.error("Unexpected error occurred while downloading chunk {} to file {} from resource path {}", +chunkId, downloadFile, this.resourcePath, e); +throw e; } - -from.close(); -to.close(); - } @Override @@ -101,6 +98,6 @@ public class