Hello Nir Soffer, Dan Kenigsberg,
I'd like you to do a code review. Please visit
http://gerrit.ovirt.org/24597
to review the following change.
Change subject: introducing capability to stream data to image
......................................................................
introducing capability to stream data to image
This patch introduces the a new capabaility to vdsm
which allows to upload using streaming content to vdsm
images.
Previously we sent ovf data using XMLRPC (UpdateVM verb),
which limits the size of the data, having to encode the
payload into the xml, and make it hard and inefficient
to upload lot of data and store it on some image.
This patch adds the capabillity to stream data to image,
allowing efficient upload of data in any size and format
and storing it directly on an
image. As the XML-RPC spec doesn't support streaming and
to avoid requiring another port by using dedicated http
server, in this patch we use the existing xmlrpc server to
handle upload requests.
General upload information:
-----------------------------------------------------------
PUT requests arriving to the server with content type of
application/octet-stream to default paths that we use
today for request handling ('/', '/RPC2') will be treated
as upload requests.
The upload itself is being executed within a task, that's
needed to indicate that there's an operation executed by
the host.
Change-Id: I768b84799ed9fb2769c6d4240519d036f8988b99
Signed-off-by: Liron Aravot <[email protected]>
Reviewed-on: http://gerrit.ovirt.org/23281
Reviewed-by: Nir Soffer <[email protected]>
Reviewed-by: Dan Kenigsberg <[email protected]>
---
M vdsm/API.py
M vdsm/BindingXMLRPC.py
M vdsm/storage/hsm.py
M vdsm/storage/image.py
M vdsm/storage/imageSharing.py
M vdsm/storage/sp.py
6 files changed, 211 insertions(+), 3 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/97/24597/1
diff --git a/vdsm/API.py b/vdsm/API.py
index 6103ea2..d9d8c9c 100644
--- a/vdsm/API.py
+++ b/vdsm/API.py
@@ -854,6 +854,11 @@
return self._irs.downloadImage(
methodArgs, self._spUUID, self._sdUUID, self._UUID, volUUID)
+ def downloadFromStream(self, methodArgs, callback, volUUID=None):
+ return self._irs.downloadImageFromStream(
+ methodArgs, callback, self._spUUID, self._sdUUID, self._UUID,
+ volUUID)
+
class LVMVolumeGroup(APIBase):
ctorArgs = ['lvmvolumegroupID']
diff --git a/vdsm/BindingXMLRPC.py b/vdsm/BindingXMLRPC.py
index 03859d3..9b56ea1 100644
--- a/vdsm/BindingXMLRPC.py
+++ b/vdsm/BindingXMLRPC.py
@@ -21,9 +21,12 @@
from errno import EINTR
import SimpleXMLRPCServer
from vdsm import SecureXMLRPCServer
+import json
+import httplib
import logging
import libvirt
import threading
+import socket
from vdsm import constants
from vdsm import utils
@@ -113,11 +116,95 @@
else:
basehandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
- class LoggingHandler(basehandler):
+ class RequestHandler(basehandler):
+
+ # Timeout for the request socket
+ timeout = 60
+ log = logging.getLogger("BindingXMLRPC.RequestHandler")
+
+ HEADER_POOL = 'Storage-Pool-Id'
+ HEADER_DOMAIN = 'Storage-Domain-Id'
+ HEADER_IMAGE = 'Image-Id'
+ HEADER_VOLUME = 'Volume-Id'
+ HEADER_CONTENT_LENGTH = 'content-length'
+ HEADER_CONTENT_TYPE = 'content-type'
+
def setup(self):
threadLocal.client = self.client_address[0]
threadLocal.server = self.request.getsockname()[0]
return basehandler.setup(self)
+
+ def do_PUT(self):
+ try:
+ contentLength = self.headers.getheader(
+ self.HEADER_CONTENT_LENGTH)
+ if not contentLength:
+ self.send_error(httplib.LENGTH_REQUIRED,
+ "missing content length")
+ return
+
+ try:
+ contentLength = int(contentLength)
+ except ValueError:
+ self.send_error(httplib.BAD_REQUEST,
+ "invalid content length %r" %
+ contentLength)
+ return
+
+ # Required headers
+ spUUID = self.headers.getheader(self.HEADER_POOL)
+ sdUUID = self.headers.getheader(self.HEADER_DOMAIN)
+ imgUUID = self.headers.getheader(self.HEADER_IMAGE)
+ if not all((spUUID, sdUUID, imgUUID)):
+ self.send_error(httplib.BAD_REQUEST,
+ "missing or empty required header(s):"
+ " spUUID=%s sdUUID=%s imgUUID=%s"
+ % (spUUID, sdUUID, imgUUID))
+ return
+
+ # Optional headers
+ volUUID = self.headers.getheader(self.HEADER_VOLUME)
+
+ uploadFinishedEvent = threading.Event()
+
+ def upload_finished():
+ uploadFinishedEvent.set()
+
+ methodArgs = {'fileObj': self.rfile,
+ 'contentLength': contentLength}
+ image = API.Image(imgUUID, spUUID, sdUUID)
+ response = image.downloadFromStream(methodArgs,
+ upload_finished,
+ volUUID)
+
+ while not uploadFinishedEvent.is_set():
+ uploadFinishedEvent.wait()
+
+ json_response = json.dumps(response)
+ self.send_response(httplib.OK)
+ self.send_header(self.HEADER_CONTENT_TYPE,
+ 'application/json')
+ self.send_header(self.HEADER_CONTENT_LENGTH,
+ len(json_response))
+ self.end_headers()
+ self.wfile.write(json_response)
+
+ except socket.timeout:
+ self.send_error(httplib.REQUEST_TIMEOUT,
+ "request timeout")
+
+ except Exception:
+ self.send_error(httplib.INTERNAL_SERVER_ERROR,
+ "error during execution", exc_info=True)
+
+ def send_error(self, error, message, exc_info=False):
+ try:
+ self.log.error(message, exc_info=exc_info)
+ self.send_response(error)
+ self.end_headers()
+ except Exception:
+ self.log.error("failed to return response",
+ exc_info=True)
def parse_request(self):
r = (SecureXMLRPCServer.SecureXMLRPCRequestHandler.
@@ -131,11 +218,11 @@
server_address,
keyfile=KEYFILE, certfile=CERTFILE, ca_certs=CACERT,
timeout=self.serverRespTimeout,
- requestHandler=LoggingHandler)
+ requestHandler=RequestHandler)
else:
server = utils.SimpleThreadedXMLRPCServer(
server_address,
- requestHandler=LoggingHandler, logRequests=True)
+ requestHandler=RequestHandler, logRequests=True)
utils.closeOnExec(server.socket.fileno())
return server
diff --git a/vdsm/storage/hsm.py b/vdsm/storage/hsm.py
index 53c9dd0..5c7df0b 100644
--- a/vdsm/storage/hsm.py
+++ b/vdsm/storage/hsm.py
@@ -1673,6 +1673,22 @@
self._spmSchedule(spUUID, "downloadImage", pool.downloadImage,
methodArgs, sdUUID, imgUUID, volUUID)
+ @public
+ def downloadImageFromStream(self, methodArgs, callback, spUUID, sdUUID,
+ imgUUID, volUUID=None):
+ """
+ Download an image from a stream.
+
+ Warning: Internal use only.
+ """
+ sdCache.produce(sdUUID)
+ pool = self.getPool(spUUID)
+ # NOTE: this could become an hsm task, in such case the LV extension
+ # required to prepare the destination should go through the mailbox.
+ self._spmSchedule(spUUID, "downloadImageFromStream",
+ pool.downloadImageFromStream, methodArgs, callback,
+ sdUUID, imgUUID, volUUID)
+
@deprecated
@public
def moveMultipleImages(self, spUUID, srcDomUUID, dstDomUUID, imgDict,
diff --git a/vdsm/storage/image.py b/vdsm/storage/image.py
index 70f21c4..a24bcc3 100644
--- a/vdsm/storage/image.py
+++ b/vdsm/storage/image.py
@@ -1173,3 +1173,22 @@
imageSharing.download(vol.getVolumePath(), methodArgs)
finally:
domain.deactivateImage(imgUUID)
+
+ def downloadFromStream(self, methodArgs, callback, sdUUID, imgUUID,
+ volUUID=None):
+ try:
+ self._downloadFromStream(methodArgs, sdUUID, imgUUID, volUUID)
+ finally:
+ callback()
+
+ def _downloadFromStream(self, methodArgs, sdUUID, imgUUID, volUUID=None):
+ domain = sdCache.produce(sdUUID)
+
+ vol = self._activateVolumeForImportExport(domain, imgUUID, volUUID)
+ try:
+ # Extend the volume (if relevant) to the image size
+ vol.extend(imageSharing.
+ streamGetSize(methodArgs) / volume.BLOCK_SIZE)
+ imageSharing.streamDownloadImage(vol.getVolumePath(), methodArgs)
+ finally:
+ domain.deactivateImage(imgUUID)
diff --git a/vdsm/storage/imageSharing.py b/vdsm/storage/imageSharing.py
index 26f299a..7166293 100644
--- a/vdsm/storage/imageSharing.py
+++ b/vdsm/storage/imageSharing.py
@@ -18,11 +18,26 @@
#
import logging
+import signal
+import socket
import curlImgWrap
+from vdsm import constants
+from vdsm import utils
+import storage_exception as se
log = logging.getLogger("Storage.ImageSharing")
+# Time to wait from finishing writing data to dd, until dd exists,
+# Ensure that we don't keep the task active forever if dd cannot
+# access the storage.
+WAIT_TIMEOUT = 30
+# Number of bytes to read from the socket and write
+# to dd stdin trough the pipe. Based on default socket buffer
+# size(~80KB) and default pipe buffer size (64K), this should
+# minimize system call overhead without consuming too much
+# memory.
+BUFFER_SIZE = 65536
def httpGetSize(methodArgs):
@@ -46,6 +61,10 @@
return size
+def streamGetSize(methodArgs):
+ return methodArgs['contentLength']
+
+
def httpDownloadImage(dstImgPath, methodArgs):
curlImgWrap.download(methodArgs.get('url'), dstImgPath,
methodArgs.get("headers", {}))
@@ -56,6 +75,54 @@
methodArgs.get("headers", {}))
+def streamDownloadImage(dstImgPath, methodArgs):
+ bytes_left = streamGetSize(methodArgs)
+ stream = methodArgs['fileObj']
+
+ cmd = [constants.EXT_DD, "of=%s" % dstImgPath, "bs=%s" % constants.MEGAB]
+ p = utils.execCmd(cmd, sudo=False, sync=False,
+ deathSignal=signal.SIGKILL)
+ try:
+ while bytes_left > 0:
+ to_read = min(BUFFER_SIZE, bytes_left)
+
+ try:
+ data = stream.read(to_read)
+ except socket.timeout:
+ log.error("socket timeout")
+ raise se.MiscFileReadException()
+
+ if not data:
+ total_size = streamGetSize(methodArgs)
+ log.error("partial data %s from %s",
+ total_size - bytes_left, total_size)
+ raise se.MiscFileReadException()
+
+ p.stdin.write(data)
+ # Process stdin is not a real file object but a wrapper using
+ # StringIO buffer. To ensure that we don't use more memory if we
+ # get data faster then dd read it from the pipe, we flush on every
+ # write. We can remove flush() we can limit the buffer size used
+ # by this stdin wrapper.
+ p.stdin.flush()
+ bytes_left = bytes_left - len(data)
+
+ p.stdin.close()
+ if not p.wait(WAIT_TIMEOUT):
+ log.error("timeout waiting for dd process")
+ raise se.StorageException()
+
+ if p.returncode != 0:
+ log.error("dd error - code %s, stderr %s",
+ p.returncode, p.stderr.read(1000))
+ raise se.MiscFileWriteException()
+
+ except Exception:
+ if p.returncode is None:
+ p.kill()
+ raise
+
+
_METHOD_IMPLEMENTATIONS = {
'http': (httpGetSize, httpDownloadImage, httpUploadImage),
}
diff --git a/vdsm/storage/sp.py b/vdsm/storage/sp.py
index a020e1e..550419c 100644
--- a/vdsm/storage/sp.py
+++ b/vdsm/storage/sp.py
@@ -1618,6 +1618,20 @@
return image.Image(self.poolPath) \
.download(methodArgs, sdUUID, imgUUID, volUUID)
+ def downloadImageFromStream(self, methodArgs, callback, sdUUID, imgUUID,
+ volUUID=None):
+ """
+ Download an image from a stream.
+ """
+ imgResourceLock = rmanager.acquireResource(
+ sd.getNamespace(sdUUID, IMAGE_NAMESPACE), imgUUID,
+ rm.LockType.exclusive)
+
+ with imgResourceLock:
+ return image.Image(self.poolPath) \
+ .downloadFromStream(methodArgs, callback, sdUUID, imgUUID,
+ volUUID)
+
def moveMultipleImages(self, srcDomUUID, dstDomUUID, imgDict, vmUUID,
force):
"""
--
To view, visit http://gerrit.ovirt.org/24597
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I768b84799ed9fb2769c6d4240519d036f8988b99
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: ovirt-3.4
Gerrit-Owner: Liron Ar <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches