The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6741

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===

From 7b75c0d99dcf8ee752541ae72d9791d4a7dab35a Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.h...@canonical.com>
Date: Thu, 19 Dec 2019 13:46:47 +0100
Subject: [PATCH 1/2] lxd/storage/drivers: Add Ceph driver

Signed-off-by: Thomas Hipp <thomas.h...@canonical.com>
---
 lxd/storage/drivers/driver_ceph.go         |  286 +++
 lxd/storage/drivers/driver_ceph_utils.go   | 1990 ++++++++++++++++++++
 lxd/storage/drivers/driver_ceph_volumes.go |  857 +++++++++
 lxd/storage/drivers/load.go                |    1 +
 4 files changed, 3134 insertions(+)
 create mode 100644 lxd/storage/drivers/driver_ceph.go
 create mode 100644 lxd/storage/drivers/driver_ceph_utils.go
 create mode 100644 lxd/storage/drivers/driver_ceph_volumes.go

diff --git a/lxd/storage/drivers/driver_ceph.go 
b/lxd/storage/drivers/driver_ceph.go
new file mode 100644
index 0000000000..c76f50f18e
--- /dev/null
+++ b/lxd/storage/drivers/driver_ceph.go
@@ -0,0 +1,286 @@
+package drivers
+
+import (
+       "fmt"
+       "os/exec"
+       "strings"
+
+       "github.com/lxc/lxd/lxd/migration"
+       "github.com/lxc/lxd/lxd/operations"
+       "github.com/lxc/lxd/shared"
+       "github.com/lxc/lxd/shared/api"
+       "github.com/lxc/lxd/shared/logger"
+       "github.com/lxc/lxd/shared/units"
+)
+
+var cephAllowedFilesystems = []string{"btrfs", "ext4"}
+var cephVersion string
+var cephLoaded bool
+
+type ceph struct {
+       common
+}
+
+func (d *ceph) load() error {
+       // Register the patches.
+       d.patches = map[string]func() error{
+               "storage_create_vm": nil,
+       }
+
+       // Done if previously loaded.
+       if cephLoaded {
+               return nil
+       }
+
+       // Validate the required binaries.
+       for _, tool := range []string{"ceph", "rbd"} {
+               _, err := exec.LookPath(tool)
+               if err != nil {
+                       return fmt.Errorf("Required tool '%s' is missing", tool)
+               }
+       }
+
+       // Detect and record the version.
+       if cephVersion == "" {
+               out, err := shared.RunCommand("rbd", "--version")
+               if err != nil {
+                       return err
+               }
+
+               cephVersion = strings.TrimSpace(out)
+       }
+
+       cephLoaded = true
+       return nil
+}
+
+// Info returns info about the driver and its environment.
+func (d *ceph) Info() Info {
+       return Info{
+               Name:                  "ceph",
+               Version:               cephVersion,
+               OptimizedImages:       true,
+               PreservesInodes:       false,
+               Remote:                true,
+               VolumeTypes:           []VolumeType{VolumeTypeCustom, 
VolumeTypeImage, VolumeTypeContainer, VolumeTypeVM},
+               BlockBacking:          false,
+               RunningQuotaResize:    false,
+               RunningSnapshotFreeze: true,
+       }
+}
+
+func (d *ceph) Create() error {
+       revert := true
+
+       d.config["volatile.initial_source"] = d.config["source"]
+
+       // Set default properties if missing.
+       if d.config["ceph.cluster_name"] == "" {
+               d.config["ceph.cluster_name"] = "ceph"
+       }
+
+       if d.config["ceph.user_name"] == "" {
+               d.config["ceph.user_name"] = "admin"
+       }
+
+       if d.config["ceph.osd.pg_num"] == "" {
+               d.config["ceph.osd.pg_num"] = "32"
+       } else {
+               // Validate
+               _, err := units.ParseByteSizeString(d.config["ceph.osd.pg_num"])
+               if err != nil {
+                       return err
+               }
+       }
+
+       // sanity check
+       if d.config["source"] != "" &&
+               d.config["ceph.osd.pool_name"] != "" &&
+               d.config["source"] != d.config["ceph.osd.pool_name"] {
+               return fmt.Errorf(`The "source" and "ceph.osd.pool_name" 
property must not differ for CEPH OSD storage pools`)
+       }
+
+       // use an existing OSD pool
+       if d.config["source"] != "" {
+               d.config["ceph.osd.pool_name"] = d.config["source"]
+       }
+
+       if d.config["ceph.osd.pool_name"] == "" {
+               d.config["ceph.osd.pool_name"] = d.name
+               d.config["source"] = d.name
+       }
+
+       if !d.osdPoolExists() {
+               // Create new osd pool
+               err := d.osdCreatePool()
+               if err != nil {
+                       return fmt.Errorf("Failed to create Ceph OSD pool %q in 
cluster %q: %s", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"], 
err)
+               }
+
+               defer func() {
+                       if !revert {
+                               return
+                       }
+
+                       err := d.osdDeletePool()
+                       if err != nil {
+                               logger.Warnf("Failed to delete Ceph OSD pool %q 
in cluster %q: %s", d.config["ceph.osd.pool_name"], 
d.config["ceph.cluster_name"], err)
+                       }
+               }()
+
+               // Create dummy storage volume. Other LXD instances will use 
this to detect whether this osd pool is already in use by another LXD instance.
+               err = d.rbdCreateVolume(d.config["ceph.osd.pool_name"], "lxd", 
"0")
+               if err != nil {
+                       logger.Errorf("Failed to create RBD volume %q on OSD 
pool %q: %s", d.config["ceph.osd.pool_name"], d.config["ceph.osd.pool_name"], 
err)
+                       return err
+               }
+               d.config["volatile.pool.pristine"] = "true"
+       } else {
+               ok := d.rbdVolumeExists(d.config["ceph.osd.pool_name"], "lxd")
+               d.config["volatile.pool.pristine"] = "false"
+               if ok {
+                       if d.config["ceph.osd.force_reuse"] == "" || 
!shared.IsTrue(d.config["ceph.osd.force_reuse"]) {
+                               return fmt.Errorf("Ceph OSD pool %q in cluster 
%q seems to be in use by another LXD instance. Use 
\"ceph.osd.force_reuse=true\" to force", d.config["ceph.osd.pool_name"], 
d.config["ceph.cluster_name"])
+                       }
+               }
+
+               // Use existing osd pool
+               msg, err := d.osdGetPoolPGNum()
+               if err != nil {
+                       return fmt.Errorf("Failed to retrieve number of 
placement groups for Ceph OSD pool %q in cluster %q: %s", 
d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"], err)
+               }
+
+               logger.Debugf("Retrieved number of placement groups or Ceph OSD 
pool %q in cluster %q", d.config["ceph.osd.pool_name"], 
d.config["ceph.cluster_name"])
+
+               idx := strings.Index(msg, "pg_num:")
+               if idx == -1 {
+                       return fmt.Errorf("Failed to parse number of placement 
groups for Ceph OSD pool %q in cluster %q: %s", d.config["ceph.osd.pool_name"], 
d.config["ceph.cluster_name"], msg)
+               }
+
+               msg = msg[(idx + len("pg_num:")):]
+               msg = strings.TrimSpace(msg)
+
+               // It is ok to update the pool configuration since storage pool
+               // creation via API is implemented such that the storage pool is
+               // checked for a changed config after this function returns and
+               // if so the db for it is updated.
+               d.config["ceph.osd.pg_num"] = msg
+       }
+
+       if d.config["source"] == "" {
+               d.config["source"] = d.config["ceph.osd.pool_name"]
+       }
+
+       // set immutable ceph.cluster_name property
+       if d.config["ceph.cluster_name"] == "" {
+               d.config["ceph.cluster_name"] = "ceph"
+       }
+
+       // set immutable ceph.osd.pool_name property
+       if d.config["ceph.osd.pool_name"] == "" {
+               d.config["ceph.osd.pool_name"] = d.name
+       }
+
+       if d.config["ceph.osd.pg_num"] == "" {
+               d.config["ceph.osd.pg_num"] = "32"
+       }
+
+       revert = false
+
+       return nil
+}
+
+func (d *ceph) Delete(op *operations.Operation) error {
+       // test if pool exists
+       poolExists := d.osdPoolExists()
+       if !poolExists {
+               logger.Warnf("Ceph OSD pool %q does not exist in cluster %q", 
d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"])
+       }
+
+       // Check whether we own the pool and only remove in this case.
+       if d.config["volatile.pool.pristine"] != "" &&
+               shared.IsTrue(d.config["volatile.pool.pristine"]) {
+               logger.Debugf("Detected that this LXD instance is the owner of 
the Ceph OSD pool %q in cluster %q", d.config["ceph.osd.pool_name"], 
d.config["ceph.cluster_name"])
+
+               // Delete the osd pool.
+               if poolExists {
+                       err := d.osdDeletePool()
+                       if err != nil {
+                               return fmt.Errorf("Failed to delete Ceph OSD 
pool %q in cluster %q: %s", d.config["ceph.osd.pool_name"], 
d.config["ceph.cluster_name"], err)
+                       }
+               }
+       }
+
+       // If the user completely destroyed it, call it done.
+       if !shared.PathExists(GetPoolMountPath(d.name)) {
+               return nil
+       }
+
+       // On delete, wipe everything in the directory.
+       err := wipeDirectory(GetPoolMountPath(d.name))
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) Mount() (bool, error) {
+       // Nothing to do here.
+       return true, nil
+}
+func (d *ceph) Unmount() (bool, error) {
+       // Nothing to do here.
+       return true, nil
+}
+
+func (d *ceph) GetResources() (*api.ResourcesStoragePool, error) {
+       return d.getPoolUsage()
+}
+
+func (d *ceph) Validate(config map[string]string) error {
+       rules := map[string]func(value string) error{
+               "ceph.cluster_name":       shared.IsAny,
+               "ceph.osd.force_reuse":    shared.IsBool,
+               "ceph.osd.pg_num":         shared.IsAny,
+               "ceph.osd.pool_name":      shared.IsAny,
+               "ceph.osd.data_pool_name": shared.IsAny,
+               "ceph.rbd.clone_copy":     shared.IsBool,
+               "ceph.user_name":          shared.IsAny,
+               "volatile.pool.pristine":  shared.IsAny,
+               "volume.block.filesystem": func(value string) error {
+                       if value == "" {
+                               return nil
+                       }
+                       return shared.IsOneOf(value, cephAllowedFilesystems)
+               },
+               "volume.block.mount_options": shared.IsAny,
+       }
+
+       return d.validatePool(config, rules)
+}
+
+func (d *ceph) Update(changedConfig map[string]string) error {
+       return nil
+}
+
+func (d *ceph) MigrationTypes(contentType ContentType, refresh bool) 
[]migration.Type {
+       if refresh {
+               return []migration.Type{
+                       {
+                               FSType:   migration.MigrationFSType_RSYNC,
+                               Features: []string{"delete", "compress", 
"bidirectional"},
+                       },
+               }
+       }
+
+       return []migration.Type{
+               {
+                       FSType: migration.MigrationFSType_RBD,
+               },
+               {
+                       FSType:   migration.MigrationFSType_RSYNC,
+                       Features: []string{"delete", "compress", 
"bidirectional"},
+               },
+       }
+}
diff --git a/lxd/storage/drivers/driver_ceph_utils.go 
b/lxd/storage/drivers/driver_ceph_utils.go
new file mode 100644
index 0000000000..412e2d4b6e
--- /dev/null
+++ b/lxd/storage/drivers/driver_ceph_utils.go
@@ -0,0 +1,1990 @@
+package drivers
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "os"
+       "os/exec"
+       "strconv"
+       "strings"
+       "syscall"
+       "time"
+
+       "github.com/pborman/uuid"
+       "golang.org/x/sys/unix"
+
+       "github.com/lxc/lxd/lxd/db"
+       "github.com/lxc/lxd/lxd/operations"
+       "github.com/lxc/lxd/lxd/rsync"
+       "github.com/lxc/lxd/shared"
+       "github.com/lxc/lxd/shared/api"
+       "github.com/lxc/lxd/shared/ioprogress"
+       "github.com/lxc/lxd/shared/units"
+)
+
+// osdCreatePool creates an OSD pool.
+func (d *ceph) osdCreatePool() error {
+       // Create new osd pool
+       _, err := shared.TryRunCommand("ceph",
+               "--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+               "--cluster", d.config["ceph.cluster_name"],
+               "osd",
+               "pool",
+               "create",
+               d.config["ceph.osd.pool_name"],
+               d.config["ceph.osd.pg_num"])
+
+       return err
+}
+
+// osdPoolExists checks whether a given OSD pool exists.
+func (d *ceph) osdPoolExists() bool {
+       _, err := shared.RunCommand(
+               "ceph",
+               "--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+               "--cluster", d.config["ceph.cluster_name"],
+               "osd",
+               "pool",
+               "get",
+               d.config["ceph.osd.pool_name"],
+               "size")
+       if err != nil {
+               return false
+       }
+
+       return true
+}
+
+// osdDeletePool destroys an OSD pool.
+// - A call to osdDeletePool will destroy a pool including any storage
+//   volumes that still exist in the pool.
+// - In case the OSD pool that is supposed to be deleted does not exist this
+//   command will still exit 0. This means that if the caller wants to be sure
+//   that this call actually deleted an OSD pool it needs to check for the
+//   existence of the pool first.
+func (d *ceph) osdDeletePool() error {
+       _, err := shared.RunCommand("ceph",
+               "--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+               "--cluster", d.config["ceph.cluster_name"],
+               "osd",
+               "pool",
+               "delete",
+               d.config["ceph.osd.pool_name"],
+               d.config["ceph.osd.pool_name"],
+               "--yes-i-really-really-mean-it")
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) osdGetPoolPGNum() (string, error) {
+       out, err := shared.RunCommand("ceph",
+               "--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+               "--cluster", d.config["ceph.cluster_name"],
+               "osd",
+               "pool",
+               "get",
+               d.config["ceph.osd.pool_name"],
+               "pg_num")
+
+       return out, err
+}
+
+func (d *ceph) getPoolUsage() (*api.ResourcesStoragePool, error) {
+       var stdout bytes.Buffer
+
+       err := shared.RunCommandWithFds(nil, &stdout,
+               "ceph",
+               "--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+               "--cluster", d.config["ceph.cluster_name"],
+               "df",
+               "-f", "json")
+       if err != nil {
+               return nil, err
+       }
+
+       // Temporary structs for parsing
+       type cephDfPoolStats struct {
+               BytesUsed      int64 `json:"bytes_used"`
+               BytesAvailable int64 `json:"max_avail"`
+       }
+
+       type cephDfPool struct {
+               Name  string          `json:"name"`
+               Stats cephDfPoolStats `json:"stats"`
+       }
+
+       type cephDf struct {
+               Pools []cephDfPool `json:"pools"`
+       }
+
+       // Parse the JSON output
+       df := cephDf{}
+       err = json.NewDecoder(&stdout).Decode(&df)
+       if err != nil {
+               return nil, err
+       }
+
+       var pool *cephDfPool
+       for _, entry := range df.Pools {
+               if entry.Name == d.config["ceph.osd.pool_name"] {
+                       pool = &entry
+                       break
+               }
+       }
+
+       if pool == nil {
+               return nil, fmt.Errorf("OSD pool missing in df output")
+       }
+
+       spaceUsed := uint64(pool.Stats.BytesUsed)
+       spaceAvailable := uint64(pool.Stats.BytesAvailable)
+
+       res := api.ResourcesStoragePool{}
+       res.Space.Total = spaceAvailable + spaceUsed
+       res.Space.Used = spaceUsed
+
+       return &res, nil
+}
+
+// rbdCreateVolume creates an RBD storage volume.
+// Note that the set of features is intentionally limited is intentionally
+// limited by passing --image-feature explicitly. This is done to ensure that
+// the chances of a conflict between the features supported by the userspace
+// library and the kernel module are minimized. Otherwise random panics might
+// occur.
+func (d *ceph) rbdCreateVolume(volumeName string,
+       volumeType string, size string) error {
+       cmd := []string{
+               "--id", d.config["ceph.user_name"],
+               "--image-feature", "layering,",
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+       }
+
+       if d.config["ceph.osd.data_pool_name"] != "" {
+               cmd = append(cmd, "--data-pool", 
d.config["ceph.osd.data_pool_name"])
+       }
+
+       cmd = append(cmd,
+               "--size", size,
+               "create",
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+
+       _, err := shared.RunCommand("rbd", cmd...)
+       return err
+}
+
+// rbdVolumeExists checks whether a given RBD storage volume exists.
+func (d *ceph) rbdVolumeExists(volumeName string, volumeType string) bool {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "image-meta",
+               "list",
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               return false
+       }
+       return true
+}
+
+// rbdVolumeSnapshotExists checks whether a given RBD snapshot exists.
+func (d *ceph) rbdVolumeSnapshotExists(volumeName string, volumeType string, 
snapshotName string) bool {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "info",
+               d.getRBDVolumeName(volumeName, snapshotName, volumeType, false, 
false))
+       if err != nil {
+               return false
+       }
+       return true
+}
+
+// rbdDeleteVolume deletes an RBD storage volume.
+// - In case the RBD storage volume that is supposed to be deleted does not
+//   exist this command will still exit 0. This means that if the caller wants
+//   to be sure that this call actually deleted an RBD storage volume it needs
+//   to check for the existence of the pool first.
+func (d *ceph) rbdDeleteVolume(volumeName string, volumeType string) error {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "rm",
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdMapVolume maps a given RBD storage volume
+// This will ensure that the RBD storage volume is accessible as a block device
+// in the /dev directory and is therefore necessary in order to mount it.
+func (d *ceph) rbdMapVolume(volumeName string, volumeType string) (string, 
error) {
+       devPath, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "map",
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               return "", err
+       }
+
+       idx := strings.Index(devPath, "/dev/rbd")
+       if idx < 0 {
+               return "", fmt.Errorf("Failed to detect mapped device path")
+       }
+
+       devPath = devPath[idx:]
+       return strings.TrimSpace(devPath), nil
+}
+
+// rbdUnmapVolume unmaps a given RBD storage volume
+// This is a precondition in order to delete an RBD storage volume can.
+func (d *ceph) rbdUnmapVolume(volumeName string, volumeType string, 
unmapUntilEINVAL bool) error {
+       busyCount := 0
+
+again:
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "unmap",
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               runError, ok := err.(shared.RunError)
+               if ok {
+                       exitError, ok := runError.Err.(*exec.ExitError)
+                       if ok {
+                               waitStatus := 
exitError.Sys().(syscall.WaitStatus)
+                               if waitStatus.ExitStatus() == 22 {
+                                       // EINVAL (already unmapped)
+                                       return nil
+                               }
+
+                               if waitStatus.ExitStatus() == 16 {
+                                       // EBUSY (currently in use)
+                                       busyCount++
+                                       if busyCount == 10 {
+                                               return err
+                                       }
+
+                                       // Wait a second an try again
+                                       time.Sleep(time.Second)
+                                       goto again
+                               }
+                       }
+               }
+
+               return err
+       }
+
+       if unmapUntilEINVAL {
+               goto again
+       }
+
+       return nil
+}
+
+// rbdUnmapVolumeSnapshot unmaps a given RBD snapshot
+// This is a precondition in order to delete an RBD snapshot can.
+func (d *ceph) rbdUnmapVolumeSnapshot(volumeName string, volumeType string, 
snapshotName string, unmapUntilEINVAL bool) error {
+again:
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "unmap",
+               d.getRBDVolumeName(volumeName, snapshotName, volumeType, false, 
false))
+       if err != nil {
+               runError, ok := err.(shared.RunError)
+               if ok {
+                       exitError, ok := runError.Err.(*exec.ExitError)
+                       if ok {
+                               waitStatus := 
exitError.Sys().(syscall.WaitStatus)
+                               if waitStatus.ExitStatus() == 22 {
+                                       // EINVAL (already unmapped)
+                                       return nil
+                               }
+                       }
+               }
+               return err
+       }
+
+       if unmapUntilEINVAL {
+               goto again
+       }
+
+       return nil
+}
+
+// rbdCreateVolumeSnapshot creates a read-write snapshot of a given RBD storage
+// volume
+func (d *ceph) rbdCreateVolumeSnapshot(volumeName string, volumeType string, 
snapshotName string) error {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "snap",
+               "create",
+               "--snap", snapshotName,
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdPurgeVolumeSnapshots deletes all snapshot of a given RBD storage volume
+// Note that this will only succeed if none of the snapshots are protected.
+func (d *ceph) rbdPurgeVolumeSnapshots(volumeName string, volumeType string) 
error {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "snap",
+               "purge",
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdProtectVolumeSnapshot protects a given snapshot from being deleted
+// This is a precondition to be able to create RBD clones from a given 
snapshot.
+func (d *ceph) rbdProtectVolumeSnapshot(volumeName string, volumeType string, 
snapshotName string) error {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "snap",
+               "protect",
+               "--snap", snapshotName,
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               runError, ok := err.(shared.RunError)
+               if ok {
+                       exitError, ok := runError.Err.(*exec.ExitError)
+                       if ok {
+                               waitStatus := 
exitError.Sys().(syscall.WaitStatus)
+                               if waitStatus.ExitStatus() == 16 {
+                                       // EBUSY (snapshot already protected)
+                                       return nil
+                               }
+                       }
+               }
+               return err
+       }
+
+       return nil
+}
+
+// rbdUnprotectVolumeSnapshot unprotects a given snapshot
+// - This is a precondition to be able to delete an RBD snapshot.
+// - This command will only succeed if the snapshot does not have any clones.
+func (d *ceph) rbdUnprotectVolumeSnapshot(volumeName string, volumeType 
string, snapshotName string) error {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "snap",
+               "unprotect",
+               "--snap", snapshotName,
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               runError, ok := err.(shared.RunError)
+               if ok {
+                       exitError, ok := runError.Err.(*exec.ExitError)
+                       if ok {
+                               waitStatus := 
exitError.Sys().(syscall.WaitStatus)
+                               if waitStatus.ExitStatus() == 22 {
+                                       // EBUSY (snapshot already unprotected)
+                                       return nil
+                               }
+                       }
+               }
+               return err
+       }
+
+       return nil
+}
+
+// rbdCreateClone creates a clone from a protected RBD snapshot
+func (d *ceph) rbdCreateClone(sourceVolumeName string, sourceVolumeType 
string, sourceSnapshotName string, targetVolumeName string, targetVolumeType 
string) error {
+       cmd := []string{
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--image-feature", "layering",
+       }
+
+       if d.config["ceph.osd.data_pool_name"] != "" {
+               cmd = append(cmd, "--data-pool", 
d.config["ceph.osd.data_pool_name"])
+       }
+
+       cmd = append(cmd,
+               "clone",
+               d.getRBDVolumeName(sourceVolumeName, sourceSnapshotName, 
sourceVolumeType, false, true),
+               d.getRBDVolumeName(targetVolumeName, "", targetVolumeType, 
false, true))
+
+       _, err := shared.RunCommand("rbd", cmd...)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdListSnapshotClones list all clones of an RBD snapshot
+func (d *ceph) rbdListSnapshotClones(volumeName string, volumeType string, 
snapshotName string) ([]string, error) {
+       msg, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "children",
+               "--image", d.getRBDVolumeName(volumeName, "", volumeType, 
false, false),
+               "--snap", snapshotName)
+       if err != nil {
+               return nil, err
+       }
+
+       msg = strings.TrimSpace(msg)
+       clones := strings.Fields(msg)
+       if len(clones) == 0 {
+               return nil, db.ErrNoSuchObject
+       }
+
+       return clones, nil
+}
+
+// rbdMarkVolumeDeleted marks an RBD storage volume as being in "zombie"
+// state
+// An RBD storage volume that is in zombie state is not tracked in LXD's
+// database anymore but still needs to be kept around for the sake of any
+// dependent storage entities in the storage pool. This usually happens when an
+// RBD storage volume has protected snapshots; a scenario most common when
+// creating a sparse copy of a container or when LXD updated an image and the
+// image still has dependent container clones.
+func (d *ceph) rbdMarkVolumeDeleted(volumeType string, oldVolumeName string, 
newVolumeName string, suffix string) error {
+       deletedName := d.getRBDVolumeName(newVolumeName, "", volumeType, true, 
true)
+       if suffix != "" {
+               deletedName = fmt.Sprintf("%s_%s", deletedName, suffix)
+       }
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "mv",
+               d.getRBDVolumeName(oldVolumeName, "", volumeType, false, true),
+               deletedName)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdUnmarkVolumeDeleted unmarks an RBD storage volume as being in "zombie"
+// state
+// - An RBD storage volume that is in zombie is not tracked in LXD's database
+//   anymore but still needs to be kept around for the sake of any dependent
+//   storage entities in the storage pool.
+// - This function is mostly used when a user has deleted the storage volume of
+//   an image from the storage pool and then triggers a container creation. If
+//   LXD detects that the storage volume for the given hash already exists in
+//   the pool but is marked as "zombie" it will unmark it as a zombie instead 
of
+//   creating another storage volume for the image.
+func (d *ceph) rbdUnmarkVolumeDeleted(volumeName string, volumeType string, 
oldSuffix string, newSuffix string) error {
+       oldName := d.getRBDVolumeName(volumeName, "", volumeType, true, true)
+       if oldSuffix != "" {
+               oldName = fmt.Sprintf("%s_%s", oldName, oldSuffix)
+       }
+
+       newName := d.getRBDVolumeName(volumeName, "", volumeType, false, true)
+       if newSuffix != "" {
+               newName = fmt.Sprintf("%s_%s", newName, newSuffix)
+       }
+
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "mv",
+               oldName,
+               newName)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdRenameVolume renames a given RBD storage volume
+// Note that this usually requires that the image be unmapped under its 
original
+// name, then renamed, and finally will be remapped again. If it is not 
unmapped
+// under its original name and the callers maps it under its new name the image
+// will be mapped twice. This will prevent it from being deleted.
+func (d *ceph) rbdRenameVolume(oldVolumeName string, newVolumeName string, 
volumeType string) error {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "mv",
+               d.getRBDVolumeName(oldVolumeName, "", volumeType, false, true),
+               d.getRBDVolumeName(newVolumeName, "", volumeType, false, true))
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdRenameVolumeSnapshot renames a given RBD storage volume
+// Note that if the snapshot is mapped - which it usually shouldn't be - this
+// usually requires that the snapshot be unmapped under its original name, then
+// renamed, and finally will be remapped again. If it is not unmapped under its
+// original name and the caller maps it under its new name the snapshot will be
+// mapped twice. This will prevent it from being deleted.
+func (d *ceph) rbdRenameVolumeSnapshot(volumeName string, volumeType string, 
oldSnapshotName string, newSnapshotName string) error {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "snap",
+               "rename",
+               d.getRBDVolumeName(volumeName, oldSnapshotName, volumeType, 
false, true),
+               d.getRBDVolumeName(volumeName, newSnapshotName, volumeType, 
false, true))
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdGetVolumeParent will return the snapshot the RBD clone was created
+// from
+// - If the RBD storage volume is not a clone then this function will return
+//   db.NoSuchObjectError.
+// - The snapshot will be returned as
+//   <osd-pool-name>/<rbd-volume-name>@<rbd-snapshot-name>
+//   The caller will usually want to parse this according to its needs. This
+//   helper library provides two small functions to do this but see below.
+func (d *ceph) rbdGetVolumeParent(volumeName string, volumeType string) 
(string, error) {
+       msg, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "info",
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               return "", err
+       }
+
+       idx := strings.Index(msg, "parent: ")
+       if idx == -1 {
+               return "", db.ErrNoSuchObject
+       }
+
+       msg = msg[(idx + len("parent: ")):]
+       msg = strings.TrimSpace(msg)
+
+       idx = strings.Index(msg, "\n")
+       if idx == -1 {
+               return "", fmt.Errorf("Unexpected parsing error")
+       }
+
+       msg = msg[:idx]
+       msg = strings.TrimSpace(msg)
+
+       return msg, nil
+}
+
+// rbdDeleteVolumeSnapshot deletes an RBD snapshot
+// This requires that the snapshot does not have any clones and is unmapped and
+// unprotected.
+func (d *ceph) rbdDeleteVolumeSnapshot(volumeName string, volumeType string, 
snapshotName string) error {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "snap",
+               "rm",
+               d.getRBDVolumeName(volumeName, snapshotName, volumeType, false, 
false))
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdCopyVolume copies an RBD storage volume
+// This is a non-sparse copy which doesn't introduce any dependency 
relationship
+// between the source RBD storage volume and the target RBD storage volume. The
+// operations is similar to creating an empty RBD storage volume and rsyncing
+// the contents of the source RBD storage volume into it.
+func (d *ceph) rbdCopyVolume(oldVolumeName string, newVolumeName string) error 
{
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "cp",
+               oldVolumeName,
+               newVolumeName)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// rbdListVolumeSnapshots retrieves the snapshots of an RBD storage volume
+// The format of the snapshot names is simply the part after the @. So given a
+// valid RBD path relative to a pool
+// <osd-pool-name>/<rbd-storage-volume>@<rbd-snapshot-name>
+// this will only return
+// <rbd-snapshot-name>
+func (d *ceph) rbdListVolumeSnapshots(volumeName string, volumeType string) 
([]string, error) {
+       msg, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--format", "json",
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "snap",
+               "ls",
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               return []string{}, err
+       }
+
+       var data []map[string]interface{}
+       err = json.Unmarshal([]byte(msg), &data)
+       if err != nil {
+               return []string{}, err
+       }
+
+       snapshots := []string{}
+       for _, v := range data {
+               _, ok := v["name"]
+               if !ok {
+                       return []string{}, fmt.Errorf("No \"name\" property 
found")
+               }
+
+               name, ok := v["name"].(string)
+               if !ok {
+                       return []string{}, fmt.Errorf("\"name\" property did 
not have string type")
+               }
+
+               name = strings.TrimSpace(name)
+               snapshots = append(snapshots, name)
+       }
+
+       if len(snapshots) == 0 {
+               return []string{}, db.ErrNoSuchObject
+       }
+
+       return snapshots, nil
+}
+
+// rbdRestoreVolume restores an RBD storage volume to the state of one of
+// its snapshots
+func (d *ceph) rbdRestoreVolume(volumeName string, volumeType string, 
snapshotName string) error {
+       _, err := shared.RunCommand(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "snap",
+               "rollback",
+               "--snap", snapshotName,
+               d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// getRBDSize returns the size the RBD storage volume is supposed to be created
+// with
+func (d *ceph) getRBDSize(vol Volume) (string, error) {
+       size, ok := vol.config["size"]
+       if !ok {
+               size = vol.poolConfig["volume.size"]
+       }
+
+       sz, err := units.ParseByteSizeString(size)
+       if err != nil {
+               return "", err
+       }
+
+       // Safety net: Set to default value.
+       if sz == 0 {
+               sz, _ = units.ParseByteSizeString("10GB")
+       }
+
+       return fmt.Sprintf("%dB", sz), nil
+}
+
+// getRBDFilesystem returns the filesystem the RBD storage volume is supposed 
to
+// be created with
+func (d *ceph) getRBDFilesystem(vol Volume) string {
+       if vol.config["block.filesystem"] != "" {
+               return vol.config["block.filesystem"]
+       }
+
+       if vol.poolConfig["volume.block.filesystem"] != "" {
+               return vol.poolConfig["volume.block.filesystem"]
+       }
+
+       return "ext4"
+}
+
+// getRBDMountOptions returns the mount options the storage volume is supposed
+// to be mounted with
+// The option string that is returned needs to be passed to the approriate
+// helper (currently named "LXDResolveMountoptions") which will take on the job
+// of splitting it into appropriate flags and string options.
+func (d *ceph) getRBDMountOptions(vol Volume) string {
+       if vol.config["block.mount_options"] != "" {
+               return vol.config["block.mount_options"]
+       }
+
+       if vol.poolConfig["volume.block.mount_options"] != "" {
+               return vol.poolConfig["volume.block.mount_options"]
+       }
+
+       if d.getRBDFilesystem(vol) == "btrfs" {
+               return "user_subvol_rm_allowed,discard"
+       }
+
+       return "discard"
+}
+
+// copyWithoutSnapshotsFull creates a non-sparse copy of a container
+// This does not introduce a dependency relation between the source RBD storage
+// volume and the target RBD storage volume.
+func (d *ceph) copyWithoutSnapshotsFull(source Volume, target Volume) error {
+       targetContainerName := target.name
+       oldVolumeName := d.getRBDVolumeName(source.name, "", 
string(source.volType), false, true)
+       newVolumeName := d.getRBDVolumeName(target.name, "", 
string(target.volType), false, true)
+
+       err := d.rbdCopyVolume(oldVolumeName, newVolumeName)
+       if err != nil {
+               return err
+       }
+
+       _, err = d.rbdMapVolume(targetContainerName, string(target.volType))
+       if err != nil {
+               return err
+       }
+
+       // Re-generate the UUID
+       err = d.generateUUID(target)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// copyWithoutSnapshotsFull creates a sparse copy of a container
+// This introduces a dependency relation between the source RBD storage volume
+// and the target RBD storage volume.
+func (d *ceph) copyWithoutSnapshotsSparse(source Volume, target Volume) error {
+       sourceIsSnapshot := source.IsSnapshot()
+       sourceContainerName := source.name
+       targetContainerName := target.name
+       sourceContainerOnlyName := sourceContainerName
+       sourceSnapshotOnlyName := ""
+       snapshotName := fmt.Sprintf("zombie_snapshot_%s",
+               uuid.NewRandom().String())
+
+       if sourceIsSnapshot {
+               sourceContainerOnlyName, sourceSnapshotOnlyName, _ =
+                       
shared.InstanceGetParentAndSnapshotName(sourceContainerName)
+               snapshotName = fmt.Sprintf("snapshot_%s", 
sourceSnapshotOnlyName)
+       } else {
+               // create snapshot
+               err := d.rbdCreateVolumeSnapshot(
+                       sourceContainerName, string(source.volType),
+                       snapshotName)
+               if err != nil {
+                       return err
+               }
+       }
+
+       // protect volume so we can create clones of it
+       err := d.rbdProtectVolumeSnapshot(
+               sourceContainerOnlyName, string(source.volType),
+               snapshotName)
+       if err != nil {
+               return err
+       }
+
+       err = d.rbdCreateClone(
+               sourceContainerOnlyName, string(source.volType),
+               snapshotName, targetContainerName,
+               string(target.volType))
+       if err != nil {
+               return err
+       }
+
+       // Re-generate the UUID
+       err = d.generateUUID(target)
+       if err != nil {
+               return err
+       }
+
+       // Create mountpoint
+       // targetContainerMountPoint := 
driver.GetContainerMountPoint(target.Project(), s.pool.Name, target.Name())
+       // err = driver.CreateContainerMountpoint(targetContainerMountPoint, 
target.Path(), target.IsPrivileged())
+       // if err != nil {
+       //      return err
+       // }
+
+       // ourMount, err := target.StorageStart()
+       // if err != nil {
+       //      return err
+       // }
+       // if ourMount {
+       //      defer target.StorageStop()
+       // }
+
+       // err = target.DeferTemplateApply("copy")
+       // if err != nil {
+
+       //      return err
+       // }
+
+       return nil
+}
+
+// copyWithSnapshots creates a non-sparse copy of a container including its
+// snapshots
+// This does not introduce a dependency relation between the source RBD storage
+// volume and the target RBD storage volume.
+func (d *ceph) copyWithSnapshots(sourceVolumeName string,
+       targetVolumeName string, sourceParentSnapshot string) error {
+       args := []string{
+               "export-diff",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               sourceVolumeName,
+       }
+
+       if sourceParentSnapshot != "" {
+               args = append(args, "--from-snap", sourceParentSnapshot)
+       }
+
+       // redirect output to stdout
+       args = append(args, "-")
+
+       rbdSendCmd := exec.Command("rbd", args...)
+       rbdRecvCmd := exec.Command(
+               "rbd",
+               "--id", d.config["ceph.user_name"],
+               "import-diff",
+               "--cluster", d.config["ceph.cluster_name"],
+               "-",
+               targetVolumeName)
+
+       rbdRecvCmd.Stdin, _ = rbdSendCmd.StdoutPipe()
+       rbdRecvCmd.Stdout = os.Stdout
+       rbdRecvCmd.Stderr = os.Stderr
+
+       err := rbdRecvCmd.Start()
+       if err != nil {
+               return err
+       }
+
+       err = rbdSendCmd.Run()
+       if err != nil {
+               return err
+       }
+
+       err = rbdRecvCmd.Wait()
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// deleteVolume deletes the RBD storage volume of a container including
+// any dependencies
+// - This function takes care to delete any RBD storage entities that are 
marked
+//   as zombie and whose existence is solely dependent on the RBD storage 
volume
+//   for the container to be deleted.
+// - This function will mark any storage entities of the container to be 
deleted
+//   as zombies in case any RBD storage entities in the storage pool have a
+//   dependency relation with it.
+// - This function uses a C-style convention to return error or success simply
+//   because it is more elegant and simple than the go way.
+//   The function will return
+//   -1 on error
+//    0 if the RBD storage volume has been deleted
+//    1 if the RBD storage volume has been marked as a zombie
+// - deleteVolume in conjunction with deleteVolumeSnapshot
+//   recurses through an OSD storage pool to find and delete any storage
+//   entities that were kept around because of dependency relations but are not
+//   deletable.
+func (d *ceph) deleteVolume(volumeName string,
+       volumeType string) int {
+       snaps, err := d.rbdListVolumeSnapshots(
+               volumeName, volumeType)
+       if err == nil {
+               var zombies int
+               for _, snap := range snaps {
+                       ret := d.deleteVolumeSnapshot(volumeName, volumeType, 
snap)
+                       if ret < 0 {
+                               return -1
+                       } else if ret == 1 {
+                               zombies++
+                       }
+               }
+
+               if zombies > 0 {
+                       // unmap
+                       err = d.rbdUnmapVolume(
+                               volumeName, volumeType, true)
+                       if err != nil {
+                               return -1
+                       }
+
+                       if strings.HasPrefix(volumeName, "zombie_") ||
+                               strings.HasPrefix(volumeType, "zombie_") {
+                               return 1
+                       }
+
+                       newVolumeName := fmt.Sprintf("%s_%s", volumeName,
+                               uuid.NewRandom().String())
+                       err := d.rbdMarkVolumeDeleted(
+                               volumeType, volumeName, newVolumeName,
+                               "")
+                       if err != nil {
+                               return -1
+                       }
+
+                       return 1
+               }
+       } else {
+               if err != db.ErrNoSuchObject {
+                       return -1
+               }
+
+               parent, err := d.rbdGetVolumeParent(
+                       volumeName, volumeType)
+               if err == nil {
+                       _, parentVolumeType, parentVolumeName,
+                               parentSnapshotName, err := d.parseParent(parent)
+                       if err != nil {
+                               return -1
+                       }
+
+                       // unmap
+                       err = d.rbdUnmapVolume(
+                               volumeName, volumeType, true)
+                       if err != nil {
+                               return -1
+                       }
+
+                       // delete
+                       err = d.rbdDeleteVolume(
+                               volumeName, volumeType)
+                       if err != nil {
+                               return -1
+                       }
+
+                       // Only delete the parent snapshot of the container if
+                       // it is a zombie. If it is not we know that LXD is
+                       // still using it.
+                       if strings.HasPrefix(parentVolumeType, "zombie_") ||
+                               strings.HasPrefix(parentSnapshotName, 
"zombie_") {
+                               ret := d.deleteVolumeSnapshot(
+                                       parentVolumeName,
+                                       parentVolumeType, parentSnapshotName)
+                               if ret < 0 {
+                                       return -1
+                               }
+                       }
+               } else {
+                       if err != db.ErrNoSuchObject {
+                               return -1
+                       }
+
+                       // unmap
+                       err = d.rbdUnmapVolume(
+                               volumeName, volumeType, true)
+                       if err != nil {
+                               return -1
+                       }
+
+                       // delete
+                       err = d.rbdDeleteVolume(
+                               volumeName, volumeType)
+                       if err != nil {
+                               return -1
+                       }
+               }
+       }
+
+       return 0
+}
+
+// deleteVolumeSnapshot deletes an RBD snapshot of a container including
+// any dependencies
+// - This function takes care to delete any RBD storage entities that are 
marked
+//   as zombie and whose existence is solely dependent on the RBD snapshot for
+//   the container to be deleted.
+// - This function will mark any storage entities of the container to be 
deleted
+//   as zombies in case any RBD storage entities in the storage pool have a
+//   dependency relation with it.
+// - This function uses a C-style convention to return error or success simply
+//   because it is more elegant and simple than the go way.
+//   The function will return
+//   -1 on error
+//    0 if the RBD storage volume has been deleted
+//    1 if the RBD storage volume has been marked as a zombie
+// - deleteVolumeSnapshot in conjunction with deleteVolume
+//   recurses through an OSD storage pool to find and delete any storage
+//   entities that were kept around because of dependency relations but are not
+//   deletable.
+func (d *ceph) deleteVolumeSnapshot(
+       volumeName string, volumeType string, snapshotName string) int {
+       clones, err := d.rbdListSnapshotClones(
+               volumeName, volumeType, snapshotName)
+       if err != nil {
+               if err != db.ErrNoSuchObject {
+                       return -1
+               }
+
+               // unprotect
+               err = d.rbdUnprotectVolumeSnapshot(volumeName,
+                       volumeType, snapshotName)
+               if err != nil {
+                       return -1
+               }
+
+               // unmap
+               err = d.rbdUnmapVolumeSnapshot(
+                       volumeName, volumeType, snapshotName, true)
+               if err != nil {
+                       return -1
+               }
+
+               // delete
+               err = d.rbdDeleteVolumeSnapshot(volumeName,
+                       volumeType, snapshotName)
+               if err != nil {
+                       return -1
+               }
+
+               // Only delete the parent image if it is a zombie. If it is not
+               // we know that LXD is still using it.
+               if strings.HasPrefix(volumeType, "zombie_") {
+                       ret := d.deleteVolume(
+                               volumeName, volumeType)
+                       if ret < 0 {
+                               return -1
+                       }
+               }
+
+               return 0
+       }
+
+       canDelete := true
+       for _, clone := range clones {
+               _, cloneType, cloneName, err := d.parseClone(clone)
+               if err != nil {
+                       return -1
+               }
+
+               if !strings.HasPrefix(cloneType, "zombie_") {
+                       canDelete = false
+                       continue
+               }
+
+               ret := d.deleteVolume(
+                       cloneName, cloneType)
+               if ret < 0 {
+                       return -1
+               } else if ret == 1 {
+                       // Only marked as zombie
+                       canDelete = false
+               }
+       }
+
+       if canDelete {
+               // unprotect
+               err = d.rbdUnprotectVolumeSnapshot(
+                       volumeName, volumeType, snapshotName)
+               if err != nil {
+                       return -1
+               }
+
+               // unmap
+               err = d.rbdUnmapVolumeSnapshot(
+                       volumeName, volumeType, snapshotName,
+                       true)
+               if err != nil {
+                       return -1
+               }
+
+               // delete
+               err = d.rbdDeleteVolumeSnapshot(
+                       volumeName, volumeType, snapshotName)
+               if err != nil {
+                       return -1
+               }
+
+               // Only delete the parent image if it is a zombie. If it
+               // is not we know that LXD is still using it.
+               if strings.HasPrefix(volumeType, "zombie_") {
+                       ret := d.deleteVolume(
+                               volumeName, volumeType)
+                       if ret < 0 {
+                               return -1
+                       }
+               }
+       } else {
+               if strings.HasPrefix(snapshotName, "zombie_") {
+                       return 1
+               }
+
+               err := d.rbdUnmapVolumeSnapshot(
+                       volumeName, volumeType, snapshotName,
+                       true)
+               if err != nil {
+                       return -1
+               }
+
+               newSnapshotName := fmt.Sprintf("zombie_%s", snapshotName)
+
+               err = d.rbdRenameVolumeSnapshot(
+                       volumeName, volumeType, snapshotName,
+                       newSnapshotName)
+               if err != nil {
+                       return -1
+               }
+       }
+
+       return 1
+}
+
+// parseParent splits a string describing a RBD storage entity into its
+// components
+// This can be used on strings like
+// 
<osd-pool-name>/<lxd-specific-prefix>_<rbd-storage-volume>@<rbd-snapshot-name>
+// and will split it into
+// <osd-pool-name>, <rbd-storage-volume>, <lxd-specific-prefix>, 
<rbdd-snapshot-name>
+func (d *ceph) parseParent(parent string) (string, string, string, string, 
error) {
+       idx := strings.Index(parent, "/")
+       if idx == -1 {
+               return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+       }
+       slider := parent[(idx + 1):]
+       poolName := parent[:idx]
+
+       volumeType := slider
+       idx = strings.Index(slider, "zombie_")
+       if idx == 0 {
+               idx += len("zombie_")
+               volumeType = slider
+               slider = slider[idx:]
+       }
+
+       idxType := strings.Index(slider, "_")
+       if idxType == -1 {
+               return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+       }
+
+       if idx == len("zombie_") {
+               idxType += idx
+       }
+       volumeType = volumeType[:idxType]
+
+       idx = strings.Index(slider, "_")
+       if idx == -1 {
+               return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+       }
+
+       volumeName := slider
+       idx = strings.Index(volumeName, "_")
+       if idx == -1 {
+               return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+       }
+       volumeName = volumeName[(idx + 1):]
+
+       idx = strings.Index(volumeName, "@")
+       if idx == -1 {
+               return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+       }
+       snapshotName := volumeName[(idx + 1):]
+       volumeName = volumeName[:idx]
+
+       return poolName, volumeType, volumeName, snapshotName, nil
+}
+
+// parseClone splits a strings describing an RBD storage volume
+// For example a string like
+// <osd-pool-name>/<lxd-specific-prefix>_<rbd-storage-volume>
+// will be split into
+// <osd-pool-name>, <lxd-specific-prefix>, <rbd-storage-volume>
+func (d *ceph) parseClone(clone string) (string, string, string, error) {
+       idx := strings.Index(clone, "/")
+       if idx == -1 {
+               return "", "", "", fmt.Errorf("Unexpected parsing error")
+       }
+       slider := clone[(idx + 1):]
+       poolName := clone[:idx]
+
+       volumeType := slider
+       idx = strings.Index(slider, "zombie_")
+       if idx == 0 {
+               idx += len("zombie_")
+               volumeType = slider
+               slider = slider[idx:]
+       }
+
+       idxType := strings.Index(slider, "_")
+       if idxType == -1 {
+               return "", "", "", fmt.Errorf("Unexpected parsing error")
+       }
+
+       if idx == len("zombie_") {
+               idxType += idx
+       }
+       volumeType = volumeType[:idxType]
+
+       idx = strings.Index(slider, "_")
+       if idx == -1 {
+               return "", "", "", fmt.Errorf("Unexpected parsing error")
+       }
+
+       volumeName := slider
+       idx = strings.Index(volumeName, "_")
+       if idx == -1 {
+               return "", "", "", fmt.Errorf("Unexpected parsing error")
+       }
+       volumeName = volumeName[(idx + 1):]
+
+       return poolName, volumeType, volumeName, nil
+}
+
+// getRBDMappedDevPath looks at sysfs to retrieve the device path
+// "/dev/rbd<idx>" for an RBD image. If it doesn't find it it will map it if
+// told to do so.
+func (d *ceph) getRBDMappedDevPath(poolName string, volumeName string, 
volumeType string,
+       doMap bool) (string, int) {
+       files, err := ioutil.ReadDir("/sys/devices/rbd")
+       if err != nil {
+               if os.IsNotExist(err) {
+                       if doMap {
+                               goto mapImage
+                       }
+
+                       return "", 0
+               }
+
+               return "", -1
+       }
+
+       for _, f := range files {
+               if !f.IsDir() {
+                       continue
+               }
+
+               fName := f.Name()
+               idx, err := strconv.ParseUint(fName, 10, 64)
+               if err != nil {
+                       continue
+               }
+
+               tmp := fmt.Sprintf("/sys/devices/rbd/%s/pool", fName)
+               contents, err := ioutil.ReadFile(tmp)
+               if err != nil {
+                       if os.IsNotExist(err) {
+                               continue
+                       }
+
+                       return "", -1
+               }
+
+               detectedPoolName := strings.TrimSpace(string(contents))
+               if detectedPoolName != poolName {
+                       continue
+               }
+
+               tmp = fmt.Sprintf("/sys/devices/rbd/%s/name", fName)
+               contents, err = ioutil.ReadFile(tmp)
+               if err != nil {
+                       if os.IsNotExist(err) {
+                               continue
+                       }
+
+                       return "", -1
+               }
+
+               typedVolumeName := fmt.Sprintf("%s_%s", volumeType, volumeName)
+               detectedVolumeName := strings.TrimSpace(string(contents))
+               if detectedVolumeName != typedVolumeName {
+                       continue
+               }
+
+               tmp = fmt.Sprintf("/sys/devices/rbd/%s/snap", fName)
+               contents, err = ioutil.ReadFile(tmp)
+               if err != nil {
+                       if os.IsNotExist(err) {
+                               return fmt.Sprintf("/dev/rbd%d", idx), 1
+                       }
+
+                       return "", -1
+               }
+
+               detectedSnapName := strings.TrimSpace(string(contents))
+               if detectedSnapName != "-" {
+                       continue
+               }
+
+               return fmt.Sprintf("/dev/rbd%d", idx), 1
+       }
+
+       if !doMap {
+               return "", 0
+       }
+
+mapImage:
+       devPath, err := d.rbdMapVolume(volumeName, volumeType)
+       if err != nil {
+               return "", -1
+       }
+
+       return strings.TrimSpace(devPath), 2
+}
+
+func (d *ceph) rbdShrink(path string, size int64, fsType string,
+       vol Volume) error {
+       var msg string
+
+       err := shrinkFileSystem(fsType, path, vol, size)
+       if err != nil {
+               return err
+       }
+
+       msg, err = shared.TryRunCommand(
+               "rbd",
+               "resize",
+               "--allow-shrink",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "--size", fmt.Sprintf("%dM", (size/1024/1024)),
+               d.getRBDVolumeName(vol.name, "", string(vol.volType), false, 
false))
+       if err != nil {
+               return fmt.Errorf(`Could not shrink RBD storage volume "%s":
+                       %s`, path, msg)
+       }
+
+       return nil
+}
+
+func (d *ceph) rbdGrow(path string, size int64, fsType string,
+       vol Volume) error {
+
+       // Grow the block device
+       msg, err := shared.TryRunCommand(
+               "rbd",
+               "resize",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               "--pool", d.config["ceph.osd.pool_name"],
+               "--size", fmt.Sprintf("%dM", (size/1024/1024)),
+               d.getRBDVolumeName(vol.name, "", string(vol.volType), false, 
false))
+       if err != nil {
+               return fmt.Errorf(`Could not extend RBD storage volume "%s":
+                       %s`, path, msg)
+       }
+
+       // Grow the filesystem
+       return growFileSystem(fsType, path, vol)
+}
+
+func (d *ceph) rbdExportVolumeToFile(sourceVolumeName string, file string) 
error {
+       args := []string{
+               "export",
+               "--id", d.config["ceph.user_name"],
+               "--cluster", d.config["ceph.cluster_name"],
+               sourceVolumeName,
+               file,
+       }
+
+       rbdSendCmd := exec.Command("rbd", args...)
+       err := rbdSendCmd.Run()
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) rbdCreateVolumeBackup(vol Volume, targetPath string) error {
+       sourceIsSnapshot := vol.IsSnapshot()
+       sourceContainerName := vol.name
+       sourceContainerOnlyName := vol.name
+       sourceSnapshotOnlyName := ""
+
+       // Prepare for rsync
+       rsync := func(oldPath string, newPath string, bwlimit string) error {
+               output, err := rsync.LocalCopy(oldPath, newPath, bwlimit, true)
+               if err != nil {
+                       return fmt.Errorf("Failed to rsync: %s: %s", 
string(output), err)
+               }
+
+               return nil
+       }
+
+       bwlimit := vol.poolConfig["rsync.bwlimit"]
+       // Create a temporary snapshot
+       snapshotName := fmt.Sprintf("zombie_snapshot_%s", 
uuid.NewRandom().String())
+       if sourceIsSnapshot {
+               sourceContainerOnlyName, sourceSnapshotOnlyName, _ = 
shared.InstanceGetParentAndSnapshotName(sourceContainerName)
+               snapshotName = fmt.Sprintf("snapshot_%s", 
sourceSnapshotOnlyName)
+       } else {
+               // This is costly but we need to ensure that all cached data has
+               // been committed to disk. If we don't then the rbd snapshot of
+               // the underlying filesystem can be inconsistent or - worst case
+               // - empty.
+               unix.Sync()
+
+               // create snapshot
+               err := d.rbdCreateVolumeSnapshot(sourceContainerOnlyName, 
string(vol.volType), snapshotName)
+               if err != nil {
+                       return err
+               }
+               defer d.rbdDeleteVolumeSnapshot(sourceContainerOnlyName, 
string(vol.volType), snapshotName)
+       }
+
+       // Protect volume so we can create clones of it
+       err := d.rbdProtectVolumeSnapshot(sourceContainerOnlyName, 
string(vol.volType), snapshotName)
+       if err != nil {
+               return err
+       }
+       defer d.rbdUnprotectVolumeSnapshot(sourceContainerOnlyName, 
string(vol.volType), snapshotName)
+
+       // Create a new volume from the snapshot
+       cloneName := uuid.NewRandom().String()
+       err = d.rbdCreateClone(sourceContainerOnlyName, string(vol.volType), 
snapshotName, cloneName, "backup")
+       if err != nil {
+               return err
+       }
+       defer d.rbdDeleteVolume(cloneName, "backup")
+
+       // Map the new volume
+       RBDDevPath, err := d.rbdMapVolume(cloneName, "backup")
+       if err != nil {
+               return err
+       }
+       defer d.rbdUnmapVolume(cloneName, "backup", true)
+
+       // Generate a new UUID if needed
+       RBDFilesystem := d.getRBDFilesystem(vol)
+
+       err = regenerateFilesystemUUID(RBDFilesystem, RBDDevPath)
+       if err != nil {
+               return err
+       }
+
+       // Create a temporary mountpoing
+       tmpContainerMntPoint, err := ioutil.TempDir("", "lxd_backup_")
+       if err != nil {
+               return err
+       }
+       defer os.RemoveAll(tmpContainerMntPoint)
+
+       err = os.Chmod(tmpContainerMntPoint, 0100)
+       if err != nil {
+               return err
+       }
+
+       // Mount the volume
+       mountFlags, mountOptions := 
resolveMountOptions(d.getRBDMountOptions(vol))
+       err = TryMount(RBDDevPath, tmpContainerMntPoint, RBDFilesystem, 
mountFlags, mountOptions)
+       if err != nil {
+               return err
+       }
+
+       defer TryUnmount(tmpContainerMntPoint, unix.MNT_DETACH)
+
+       // Figure out the target name
+       targetName := sourceContainerName
+       if sourceIsSnapshot {
+               _, targetName, _ = 
shared.InstanceGetParentAndSnapshotName(sourceContainerName)
+       }
+
+       // Create the path for the backup.
+       targetBackupMntPoint := fmt.Sprintf("%s/container", targetPath)
+       if sourceIsSnapshot {
+               targetBackupMntPoint = fmt.Sprintf("%s/snapshots/%s", 
targetPath, targetName)
+       }
+
+       err = os.MkdirAll(targetBackupMntPoint, 0711)
+       if err != nil {
+               return err
+       }
+
+       err = rsync(tmpContainerMntPoint, targetBackupMntPoint, bwlimit)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) createVolume(vol Volume, filler *VolumeFiller, op 
*operations.Operation) error {
+       revert := true
+
+       // get size
+       RBDSize, err := d.getRBDSize(vol)
+       if err != nil {
+
+               return err
+       }
+
+       // create volume
+       err = d.rbdCreateVolume(vol.name, string(vol.volType), RBDSize)
+       if err != nil {
+               return err
+       }
+
+       defer func() {
+               if !revert {
+                       return
+               }
+
+               err := d.rbdDeleteVolume(vol.name, string(vol.volType))
+               if err != nil {
+
+               }
+       }()
+
+       RBDDevPath, err := d.rbdMapVolume(vol.name, string(vol.volType))
+       if err != nil {
+               return err
+       }
+
+       defer func() {
+               if !revert {
+                       return
+               }
+
+               d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+       }()
+
+       // get filesystem
+       RBDFilesystem := d.getRBDFilesystem(vol)
+       _, err = makeFSType(RBDDevPath, RBDFilesystem, nil)
+       if err != nil {
+               return err
+       }
+
+       _, err = d.MountVolume(vol, op)
+       if err != nil {
+               return err
+       }
+       defer d.UnmountVolume(vol, op)
+
+       // Run the volume filler function if supplied.
+       if filler != nil && filler.Fill != nil {
+               err = filler.Fill(vol.MountPath(), "")
+               if err != nil {
+                       return err
+               }
+       }
+
+       revert = false
+       return nil
+}
+
+// generateUUID regenerates the XFS/btrfs UUID as needed
+func (d *ceph) generateUUID(vol Volume) error {
+       fsType := d.getRBDFilesystem(vol)
+
+       if !renegerateFilesystemUUIDNeeded(fsType) {
+               return nil
+       }
+
+       // Map the RBD volume
+       RBDDevPath, err := d.rbdMapVolume(vol.name, string(vol.volType))
+       if err != nil {
+               return err
+       }
+       defer d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+
+       // Update the UUID
+       err = regenerateFilesystemUUID(fsType, RBDDevPath)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) getRBDVolumeName(volName string, snapName string, volType 
string, zombie bool, withPoolName bool) string {
+       out := ""
+       parentName, snapshotName, isSnapshot := 
shared.InstanceGetParentAndSnapshotName(volName)
+
+       // We use this map as values VolumeType* and StoragePoolVolumeType* 
differ,
+       // e.g. containers vs. container. This needs to be handled correctly.
+       volumeTypeMap := map[string]string{
+               string(VolumeTypeContainer): 
db.StoragePoolVolumeTypeNameContainer,
+               string(VolumeTypeCustom):    db.StoragePoolVolumeTypeNameCustom,
+               string(VolumeTypeImage):     db.StoragePoolVolumeTypeNameImage,
+               string(VolumeTypeVM):        db.StoragePoolVolumeTypeNameVM,
+       }
+
+       volumeType, ok := volumeTypeMap[volType]
+       if !ok {
+               volumeType = volType
+       }
+
+       if snapName != "" {
+               // Always use the provided snapshot name if specified.
+               out = fmt.Sprintf("%s_%s@%s", volumeType, parentName, snapName)
+       } else {
+               if isSnapshot {
+                       // If volumeName is a snapshot (<vol>/<snap>) and 
snapName is not set,
+                       // assue that it's a normal snapshot (not a zombie) and 
prefix it with
+                       // "snapshot_".
+                       out = fmt.Sprintf("%s_%s@snapshot_%s", volumeType, 
parentName, snapshotName)
+               } else {
+                       out = fmt.Sprintf("%s_%s", volumeType, parentName)
+               }
+       }
+
+       // If the volume is to be in zombie state (i.e. not tracked by the LXD 
database),
+       // prefix the output with "zombie_".
+       if zombie {
+               out = fmt.Sprintf("zombie_%s", out)
+       }
+
+       // If needed, the output will be prefixed with the pool name, e.g.
+       // <pool>/<type>_<volname>@<snapname>.
+       if withPoolName {
+               out = fmt.Sprintf("%s/%s", d.config["ceph.osd.pool_name"], out)
+       }
+
+       return out
+}
+
+func (d *ceph) createVolumeFromImage(srcVol Volume, vol Volume, op 
*operations.Operation) error {
+       revert := true
+
+       err := d.rbdCreateClone(srcVol.name, string(srcVol.volType), 
"readonly", vol.name, string(vol.volType))
+       if err != nil {
+               return err
+       }
+
+       defer func() {
+               if revert {
+                       d.deleteVolume(vol.name, string(vol.volType))
+               }
+       }()
+
+       err = d.generateUUID(vol)
+       if err != nil {
+               return err
+       }
+
+       err = d.SetVolumeQuota(vol, srcVol.config["size"], op)
+       if err != nil {
+               return err
+       }
+
+       ourMount, err := d.MountVolume(vol, op)
+       if err != nil {
+               return err
+       }
+
+       if ourMount {
+               defer d.UnmountVolume(vol, op)
+       }
+
+       revert = false
+       return nil
+}
+
+func (d *ceph) createImage(vol Volume, filler *VolumeFiller, op 
*operations.Operation) error {
+       revert := true
+       prefixedType := fmt.Sprintf("zombie_image_%s", 
vol.config["block.filesystem"])
+
+       // Check if we have a zombie image. If so, restore it otherwise
+       // create a new image volume.
+       exists := d.rbdVolumeExists(vol.name, prefixedType)
+
+       if !exists {
+               err := d.createVolume(vol, filler, op)
+               if err != nil {
+                       return err
+               }
+               defer func() {
+                       if revert {
+                               d.deleteVolume(vol.name, string(vol.volType))
+                       }
+               }()
+
+               err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+               if err != nil {
+                       return err
+               }
+
+               err = d.rbdCreateVolumeSnapshot(vol.name, string(vol.volType), 
"readonly")
+               if err != nil {
+                       return err
+               }
+               defer func() {
+                       if revert {
+                               d.deleteVolumeSnapshot(vol.name, 
string(vol.volType), "readonly")
+                       }
+               }()
+
+               err = d.rbdProtectVolumeSnapshot(vol.name, string(vol.volType), 
"readonly")
+               if err != nil {
+                       return err
+               }
+       } else {
+               // unmark deleted
+               err := d.rbdUnmarkVolumeDeleted(vol.name, string(vol.volType), 
vol.config["block.filesystem"], "")
+               if err != nil {
+                       return fmt.Errorf(`Failed to unmark RBD storage volume 
for image "%s" on storage pool "%s" as zombie: %s`, vol.name, 
d.config["ceph.osd.pool_name"], err)
+               }
+               defer func() {
+                       if !revert {
+                               return
+                       }
+
+                       d.rbdMarkVolumeDeleted(vol.name, vol.name, 
string(vol.volType), vol.config["block.filesystem"])
+               }()
+       }
+
+       revert = false
+       return nil
+}
+
+// Let's say we want to send the a container "a" including snapshots "snap0" 
and
+// "snap1" on storage pool "pool1" from LXD "l1" to LXD "l2" on storage pool
+// "pool2":
+//
+// The pool layout on "l1" would be:
+//     pool1/container_a
+//     pool1/container_a@snapshot_snap0
+//     pool1/container_a@snapshot_snap1
+//
+// Then we need to send:
+//     rbd export-diff pool1/container_a@snapshot_snap0 - | rbd import-diff - 
pool2/container_a
+// (Note that pool2/container_a must have been created by the receiving LXD
+// instance before.)
+//     rbd export-diff pool1/container_a@snapshot_snap1 --from-snap 
snapshot_snap0 - | rbd import-diff - pool2/container_a
+//     rbd export-diff pool1/container_a --from-snap snapshot_snap1 - | rbd 
import-diff - pool2/container_a
+func (d *ceph) sendVolume(conn io.ReadWriteCloser, volumeName string, 
volumeParentName string, tracker *ioprogress.ProgressTracker) error {
+       args := []string{
+               "export-diff",
+               "--cluster", d.config["ceph.cluster_name"],
+               volumeName,
+       }
+
+       if volumeParentName != "" {
+               args = append(args, "--from-snap", volumeParentName)
+       }
+
+       // redirect output to stdout
+       args = append(args, "-")
+
+       cmd := exec.Command("rbd", args...)
+
+       stdout, err := cmd.StdoutPipe()
+       if err != nil {
+               return err
+       }
+
+       stderr, err := cmd.StderrPipe()
+       if err != nil {
+               return err
+       }
+
+       // Setup progress tracker.
+       stdoutPipe := stdout
+       if tracker != nil {
+               stdoutPipe = &ioprogress.ProgressReader{
+                       ReadCloser: stdout,
+                       Tracker:    tracker,
+               }
+       }
+
+       // Forward any output on stdout.
+       chStdoutPipe := make(chan error, 1)
+       go func() {
+               _, err := io.Copy(conn, stdoutPipe)
+               chStdoutPipe <- err
+               conn.Close()
+       }()
+
+       err = cmd.Start()
+       if err != nil {
+               return err
+       }
+
+       output, _ := ioutil.ReadAll(stderr)
+
+       // Handle errors.
+       errs := []error{}
+       chStdoutPipeErr := <-chStdoutPipe
+
+       err = cmd.Wait()
+       if err != nil {
+               errs = append(errs, err)
+
+               if chStdoutPipeErr != nil {
+                       errs = append(errs, chStdoutPipeErr)
+               }
+       }
+
+       if len(errs) > 0 {
+               return fmt.Errorf("ceph export-diff failed: %v (%s)", errs, 
string(output))
+       }
+
+       return nil
+}
+
+func (d *ceph) receiveVolume(volumeName string, conn io.ReadWriteCloser, 
writeWrapper func(io.WriteCloser) io.WriteCloser) error {
+       args := []string{
+               "import-diff",
+               "--cluster", d.config["ceph.cluster_name"],
+               "-",
+               volumeName,
+       }
+
+       cmd := exec.Command("rbd", args...)
+
+       stdin, err := cmd.StdinPipe()
+       if err != nil {
+               return err
+       }
+
+       stderr, err := cmd.StderrPipe()
+       if err != nil {
+               return err
+       }
+
+       // Forward input through stdin.
+       chCopyConn := make(chan error, 1)
+       go func() {
+               _, err = io.Copy(stdin, conn)
+               stdin.Close()
+               chCopyConn <- err
+       }()
+
+       // Run the command.
+       err = cmd.Start()
+       if err != nil {
+               return err
+       }
+
+       // Read any error.
+       output, _ := ioutil.ReadAll(stderr)
+
+       // Handle errors.
+       errs := []error{}
+       chCopyConnErr := <-chCopyConn
+
+       err = cmd.Wait()
+       if err != nil {
+               errs = append(errs, err)
+
+               if chCopyConnErr != nil {
+                       errs = append(errs, chCopyConnErr)
+               }
+       }
+
+       if len(errs) > 0 {
+               return fmt.Errorf("Problem with ceph import-diff: (%v) %s", 
errs, string(output))
+       }
+
+       return nil
+}
+
+func (d *ceph) deleteImage(vol Volume, op *operations.Operation) error {
+       // Try to umount but don't fail
+       d.UnmountVolume(vol, op)
+
+       // Check if image has dependant snapshots
+       _, err := d.rbdListSnapshotClones(vol.name, string(vol.volType), 
"readonly")
+       if err != nil {
+               if err != db.ErrNoSuchObject {
+                       return err
+               }
+
+               // Unprotect snapshot
+               err = d.rbdUnprotectVolumeSnapshot(vol.name, 
string(vol.volType), "readonly")
+               if err != nil {
+                       return err
+               }
+
+               // Delete snapshots
+               err = d.rbdPurgeVolumeSnapshots(vol.name, string(vol.volType))
+               if err != nil {
+                       return err
+               }
+
+               // Unmap image
+               err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+               if err != nil {
+                       return err
+               }
+
+               // Delete image
+               err = d.rbdDeleteVolume(vol.name, string(vol.volType))
+               if err != nil {
+                       return err
+               }
+       } else {
+               err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+               if err != nil {
+                       return err
+               }
+
+               err = d.rbdMarkVolumeDeleted(string(vol.volType), vol.name, 
vol.name, vol.config["block.filesystem"])
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
diff --git a/lxd/storage/drivers/driver_ceph_volumes.go 
b/lxd/storage/drivers/driver_ceph_volumes.go
new file mode 100644
index 0000000000..aa0bf4e5e2
--- /dev/null
+++ b/lxd/storage/drivers/driver_ceph_volumes.go
@@ -0,0 +1,857 @@
+package drivers
+
+import (
+       "fmt"
+       "io"
+       "os"
+       "strings"
+
+       "github.com/lxc/lxd/lxd/db"
+       "github.com/lxc/lxd/lxd/migration"
+       "github.com/lxc/lxd/lxd/operations"
+       "github.com/lxc/lxd/shared"
+       "github.com/lxc/lxd/shared/ioprogress"
+       "github.com/lxc/lxd/shared/logger"
+       "github.com/lxc/lxd/shared/units"
+       "github.com/pborman/uuid"
+       "golang.org/x/sys/unix"
+)
+
+func (d *ceph) HasVolume(vol Volume) bool {
+       return d.rbdVolumeExists(vol.name, string(vol.volType))
+}
+
+func (d *ceph) ValidateVolume(vol Volume, removeUnknownKeys bool) error {
+       rules := map[string]func(value string) error{
+               "block.filesystem":    shared.IsAny,
+               "block.mount_options": shared.IsAny,
+       }
+
+       return d.validateVolume(vol, rules, removeUnknownKeys)
+}
+
+func (d *ceph) CreateVolume(vol Volume, filler *VolumeFiller, op 
*operations.Operation) error {
+       if vol.volType == VolumeTypeImage {
+               err := d.createImage(vol, filler, op)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+
+       err := d.createVolume(vol, filler, op)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) CreateVolumeFromCopy(vol Volume, srcVol Volume, copySnapshots 
bool, op *operations.Operation) error {
+       var err error
+       snapshots := []string{}
+
+       if !srcVol.IsSnapshot() && copySnapshots {
+               snapshots, err = d.VolumeSnapshots(srcVol, op)
+               if err != nil {
+                       return err
+               }
+       }
+
+       revert := true
+
+       if srcVol.volType == VolumeTypeImage {
+               err := d.createVolumeFromImage(srcVol, vol, op)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+
+       // Copy without snapshots
+       if !copySnapshots || len(snapshots) == 0 {
+               if d.config["ceph.rbd.clone_copy"] != "" &&
+                       !shared.IsTrue(d.config["ceph.rbd.clone_copy"]) {
+                       err = d.copyWithoutSnapshotsFull(srcVol, vol)
+               } else {
+                       err = d.copyWithoutSnapshotsSparse(srcVol, vol)
+               }
+               if err != nil {
+                       return err
+               }
+
+               ourMount, err := d.MountVolume(vol, op)
+               if err != nil {
+                       return err
+               }
+
+               if ourMount {
+                       defer d.UnmountVolume(vol, op)
+               }
+
+               revert = false
+               return nil
+       }
+
+       // Copy with snapshots
+       // create empty dummy volume
+       err = d.rbdCreateVolume(vol.name, string(vol.volType), "0")
+       if err != nil {
+               logger.Errorf("Failed to create RBD volume %q on OSD pool %q: 
%s", vol.name, d.config["ceph.osd.pool_name"], err)
+               return err
+       }
+
+       defer func() {
+               if !revert {
+                       return
+               }
+
+               err := d.rbdDeleteVolume(vol.name, string(vol.volType))
+               if err != nil {
+                       logger.Warnf("Failed to delete RBD volume %q on OSD 
pool %q: %s", vol.name, d.config["ceph.osd.pool_name"], err)
+               }
+       }()
+
+       // receive over the dummy volume we created above
+       targetVolumeName := d.getRBDVolumeName(vol.name, "", 
string(vol.volType), false, true)
+
+       lastSnap := ""
+
+       if len(snapshots) > 0 {
+               err := createParentSnapshotDirIfMissing(d.name, vol.volType, 
vol.name)
+               if err != nil {
+                       return err
+               }
+       }
+
+       for i, snap := range snapshots {
+               prev := ""
+               if i > 0 {
+                       prev = fmt.Sprintf("snapshot_%s", snapshots[i-1])
+               }
+
+               lastSnap = fmt.Sprintf("snapshot_%s", snap)
+               sourceVolumeName := d.getRBDVolumeName(srcVol.name, lastSnap, 
string(srcVol.volType), false, true)
+
+               err = d.copyWithSnapshots(
+                       sourceVolumeName,
+                       targetVolumeName,
+                       prev)
+               if err != nil {
+                       return fmt.Errorf("Failed to copy RBD volume %q to %q", 
sourceVolumeName, targetVolumeName)
+               }
+               logger.Debugf(`Copied RBD container storage %s to %s`,
+                       sourceVolumeName, targetVolumeName)
+
+               defer func() {
+                       if !revert {
+                               return
+                       }
+
+                       err := d.rbdDeleteVolumeSnapshot(vol.name, snap, 
string(vol.volType))
+                       if err != nil {
+                               logger.Warnf("Failed to delete RBD volume 
snapshot %s/%s", vol.name, snap)
+                       }
+               }()
+
+               snapVol, err := vol.NewSnapshot(snap)
+               if err != nil {
+                       return err
+               }
+
+               err = snapVol.EnsureMountPath()
+               if err != nil {
+                       return err
+               }
+       }
+
+       // copy snapshot
+       sourceVolumeName := d.getRBDVolumeName(srcVol.name, "", 
string(srcVol.volType), false, true)
+
+       err = d.copyWithSnapshots(
+               sourceVolumeName,
+               targetVolumeName,
+               lastSnap)
+       if err != nil {
+               return err
+       }
+
+       // Re-generate the UUID
+       err = d.generateUUID(vol)
+       if err != nil {
+               return err
+       }
+
+       ourMount, err := d.MountVolume(vol, op)
+       if err != nil {
+               return err
+       }
+
+       if ourMount {
+               defer d.UnmountVolume(vol, op)
+       }
+
+       revert = false
+
+       return nil
+}
+
+func (d *ceph) RefreshVolume(vol Volume, srcVol Volume, srcSnapshots []Volume, 
op *operations.Operation) error {
+       return genericCopyVolume(d, nil, vol, srcVol, srcSnapshots, true, op)
+}
+
+func (d *ceph) DeleteVolume(vol Volume, op *operations.Operation) error {
+       if vol.volType == VolumeTypeImage {
+               err := d.deleteImage(vol, op)
+               if err != nil {
+                       return err
+               }
+       } else {
+               _, err := d.UnmountVolume(vol, op)
+               if err != nil {
+                       return err
+               }
+
+               if !d.rbdVolumeExists(vol.name, string(vol.volType)) {
+                       return nil
+               }
+
+               ret := d.deleteVolume(vol.name, string(vol.volType))
+               if ret < 0 {
+                       return fmt.Errorf("Failed to delete volume")
+               }
+       }
+
+       err := wipeDirectory(vol.MountPath())
+       if err != nil {
+               return err
+       }
+
+       err = os.Remove(vol.MountPath())
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) RenameVolume(vol Volume, newName string, op 
*operations.Operation) error {
+       revert := true
+
+       _, err := d.UnmountVolume(vol, op)
+       if err != nil {
+               return err
+       }
+
+       err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+       if err != nil {
+               return nil
+       }
+
+       defer func() {
+               if !revert {
+                       return
+               }
+
+               _, err := d.rbdMapVolume(vol.name, string(vol.volType))
+               if err != nil {
+                       logger.Warnf("Failed to map RBD volume %q", vol.name)
+               }
+       }()
+
+       err = d.rbdRenameVolume(vol.name, newName, string(vol.volType))
+       if err != nil {
+               return err
+       }
+
+       defer func() {
+               if !revert {
+                       return
+               }
+
+               d.rbdRenameVolume(newName, vol.name, string(vol.volType))
+       }()
+
+       _, err = d.rbdMapVolume(newName, string(vol.volType))
+       if err != nil {
+               return err
+       }
+
+       err = genericVFSRenameVolume(d, vol, newName, op)
+       if err != nil {
+               return nil
+       }
+
+       revert = false
+       return nil
+}
+
+func (d *ceph) UpdateVolume(vol Volume, changedConfig map[string]string) error 
{
+       if vol.volType != VolumeTypeCustom {
+               return ErrNotSupported
+       }
+
+       val, ok := changedConfig["size"]
+       if ok {
+               err := d.SetVolumeQuota(vol, val, nil)
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+func (d *ceph) GetVolumeUsage(vol Volume) (int64, error) {
+       return -1, fmt.Errorf("RBD quotas are currently not supported")
+}
+
+func (d *ceph) SetVolumeQuota(vol Volume, size string, op 
*operations.Operation) error {
+       fsType := d.getRBDFilesystem(vol)
+
+       RBDDevPath, ret := d.getRBDMappedDevPath(vol.pool, vol.name, 
string(vol.volType), true)
+       if ret < 0 {
+               return fmt.Errorf("Failed to get mapped RBD path")
+       }
+
+       oldSize, err := units.ParseByteSizeString(vol.config["size"])
+       if err != nil {
+               return err
+       }
+
+       newSize, err := units.ParseByteSizeString(size)
+       if err != nil {
+               return err
+       }
+
+       // The right disjunct just means that someone unset the size property in
+       // the container's config. We obviously cannot resize to 0.
+       if oldSize == newSize || newSize == 0 {
+               return nil
+       }
+
+       if newSize < oldSize {
+               err = d.rbdShrink(RBDDevPath, newSize, fsType, vol)
+       } else {
+               err = d.rbdGrow(RBDDevPath, newSize, fsType, vol)
+       }
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) GetVolumeDiskPath(vol Volume) (string, error) {
+       return "", ErrNotImplemented
+}
+
+func (d *ceph) MountVolume(vol Volume, op *operations.Operation) (bool, error) 
{
+       RBDFilesystem := d.getRBDFilesystem(vol)
+       mountPath := vol.MountPath()
+
+       if shared.IsMountPoint(mountPath) {
+               return false, nil
+       }
+
+       err := vol.EnsureMountPath()
+       if err != nil {
+               return false, err
+       }
+
+       RBDDevPath, ret := d.getRBDMappedDevPath(vol.pool, vol.name, 
string(vol.volType), true)
+       if ret < 0 {
+               return false, nil
+       }
+
+       mountFlags, mountOptions := 
resolveMountOptions(d.getRBDMountOptions(vol))
+
+       err = TryMount(RBDDevPath, mountPath, RBDFilesystem, mountFlags, 
mountOptions)
+       if err != nil {
+               return false, err
+       }
+
+       return true, nil
+}
+
+func (d *ceph) MountVolumeSnapshot(snapVol Volume, op *operations.Operation) 
(bool, error) {
+       revert := true
+       parentName, snapshotOnlyName, _ := 
shared.InstanceGetParentAndSnapshotName(snapVol.name)
+       prefixedSnapOnlyName := fmt.Sprintf("snapshot_%s", snapshotOnlyName)
+
+       // Protect snapshot to prevent data loss.
+       err := d.rbdProtectVolumeSnapshot(parentName, string(snapVol.volType), 
prefixedSnapOnlyName)
+       if err != nil {
+               return false, err
+       }
+
+       defer func() {
+               if !revert {
+                       return
+               }
+
+               d.rbdUnprotectVolumeSnapshot(parentName, 
string(snapVol.volType), prefixedSnapOnlyName)
+       }()
+
+       // Clone snapshot.
+       cloneName := fmt.Sprintf("%s_%s_start_clone", parentName, 
snapshotOnlyName)
+
+       err = d.rbdCreateClone(parentName, string(snapVol.volType), 
prefixedSnapOnlyName, cloneName, "snapshots")
+       if err != nil {
+               return false, err
+       }
+
+       defer func() {
+               if !revert {
+                       return
+               }
+
+               d.rbdDeleteVolume(cloneName, "snapshots")
+       }()
+
+       // Map volume
+       rbdDevPath, err := d.rbdMapVolume(cloneName, "snapshots")
+       if err != nil {
+               return false, err
+       }
+
+       defer func() {
+               if !revert {
+                       return
+               }
+
+               d.rbdUnmapVolume(cloneName, "snapshots", true)
+       }()
+
+       mountPath := snapVol.MountPath()
+
+       if shared.IsMountPoint(mountPath) {
+               return false, nil
+       }
+
+       err = snapVol.EnsureMountPath()
+       if err != nil {
+               return false, err
+       }
+
+       RBDFilesystem := d.getRBDFilesystem(snapVol)
+       mountFlags, mountOptions := 
resolveMountOptions(d.getRBDMountOptions(snapVol))
+       if RBDFilesystem == "xfs" {
+               idx := strings.Index(mountOptions, "nouuid")
+               if idx < 0 {
+                       mountOptions += ",nouuid"
+               }
+       }
+
+       err = TryMount(rbdDevPath, mountPath, RBDFilesystem, mountFlags, 
mountOptions)
+       if err != nil {
+               return false, err
+       }
+
+       revert = false
+
+       return true, nil
+}
+
+func (d *ceph) UnmountVolume(vol Volume, op *operations.Operation) (bool, 
error) {
+       mountPath := vol.MountPath()
+
+       if !shared.IsMountPoint(mountPath) {
+               return false, nil
+       }
+
+       err := TryUnmount(mountPath, unix.MNT_DETACH)
+       if err != nil {
+               return false, err
+       }
+
+       // Attempt to unmap
+       if vol.volType == VolumeTypeCustom {
+               err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+               if err != nil {
+                       return true, err
+               }
+       }
+
+       return true, nil
+}
+
+func (d *ceph) UnmountVolumeSnapshot(snapVol Volume, op *operations.Operation) 
(bool, error) {
+       mountPath := snapVol.MountPath()
+
+       if !shared.IsMountPoint(mountPath) {
+               return false, nil
+       }
+
+       err := TryUnmount(mountPath, unix.MNT_DETACH)
+       if err != nil {
+               return false, err
+       }
+
+       parentName, snapshotOnlyName, _ := 
shared.InstanceGetParentAndSnapshotName(snapVol.name)
+       cloneName := fmt.Sprintf("%s_%s_start_clone", parentName, 
snapshotOnlyName)
+
+       err = d.rbdUnmapVolume(cloneName, "snapshots", true)
+       if err != nil {
+               return false, err
+       }
+
+       if !d.rbdVolumeExists(cloneName, "snapshots") {
+               return true, nil
+       }
+
+       // Delete the temporary RBD volume
+       err = d.rbdDeleteVolume(cloneName, "snapshots")
+       if err != nil {
+               return false, err
+       }
+
+       return true, nil
+}
+
+func (d *ceph) CreateVolumeSnapshot(snapVol Volume, op *operations.Operation) 
error {
+       parentName, snapshotOnlyName, _ := 
shared.InstanceGetParentAndSnapshotName(snapVol.name)
+       sourcePath := GetVolumeMountPath(d.name, snapVol.volType, parentName)
+       snapshotName := fmt.Sprintf("snapshot_%s", snapshotOnlyName)
+
+       if shared.IsMountPoint(sourcePath) {
+               // This is costly but we need to ensure that all cached data has
+               // been committed to disk. If we don't then the rbd snapshot of
+               // the underlying filesystem can be inconsistent or - worst case
+               // - empty.
+               unix.Sync()
+
+               _, err := shared.TryRunCommand("fsfreeze", "--freeze", 
sourcePath)
+               if err == nil {
+                       defer shared.TryRunCommand("fsfreeze", "--unfreeze", 
sourcePath)
+               }
+       }
+
+       // Create the parent directory.
+       err := createParentSnapshotDirIfMissing(d.name, snapVol.volType, 
parentName)
+       if err != nil {
+               return err
+       }
+
+       err = snapVol.EnsureMountPath()
+       if err != nil {
+               return err
+       }
+
+       err = d.rbdCreateVolumeSnapshot(parentName, string(snapVol.volType), 
snapshotName)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) DeleteVolumeSnapshot(snapVol Volume, op *operations.Operation) 
error {
+       parentName, snapshotOnlyName, _ := 
shared.InstanceGetParentAndSnapshotName(snapVol.name)
+       snapshotName := fmt.Sprintf("snapshot_%s", snapshotOnlyName)
+
+       if !d.rbdVolumeSnapshotExists(parentName, string(snapVol.volType), 
snapshotName) {
+               return nil
+       }
+
+       ret := d.deleteVolumeSnapshot(parentName, string(snapVol.volType), 
snapshotName)
+       if ret < 0 {
+               return fmt.Errorf("Failed to delete volume snapshot")
+       }
+
+       err := wipeDirectory(snapVol.MountPath())
+       if err != nil {
+               return err
+       }
+
+       err = os.Remove(snapVol.MountPath())
+       if err != nil {
+               return err
+       }
+
+       // Remove the parent snapshot directory if this is the last snapshot 
being removed.
+       err = deleteParentSnapshotDirIfEmpty(d.name, snapVol.volType, 
parentName)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) RenameVolumeSnapshot(snapVol Volume, newSnapshotName string, op 
*operations.Operation) error {
+       parentName, snapshotOnlyName, _ := 
shared.InstanceGetParentAndSnapshotName(snapVol.name)
+       oldSnapOnlyName := fmt.Sprintf("snapshot_%s", snapshotOnlyName)
+       newSnapOnlyName := fmt.Sprintf("snapshot_%s", newSnapshotName)
+
+       err := d.rbdRenameVolumeSnapshot(parentName, string(snapVol.volType), 
oldSnapOnlyName, newSnapOnlyName)
+       if err != nil {
+               return err
+       }
+
+       return genericVFSRenameVolumeSnapshot(d, snapVol, newSnapshotName, op)
+}
+
+func (d *ceph) VolumeSnapshots(vol Volume, op *operations.Operation) 
([]string, error) {
+       snapshots, err := d.rbdListVolumeSnapshots(vol.name, 
string(vol.volType))
+       if err != nil {
+               if err == db.ErrNoSuchObject {
+                       return nil, nil
+               }
+
+               return nil, err
+       }
+
+       var ret []string
+
+       for _, snap := range snapshots {
+               // Ignore zombie snapshots as these are only used internally and
+               // not relevant for users.
+               if strings.HasPrefix(snap, "zombie_") || 
strings.HasPrefix(snap, "migration-send-") {
+                       continue
+               }
+
+               ret = append(ret, strings.TrimPrefix(snap, "snapshot_"))
+       }
+
+       return ret, nil
+}
+
+func (d *ceph) RestoreVolume(vol Volume, snapshotName string, op 
*operations.Operation) error {
+       ourUmount, err := d.UnmountVolume(vol, op)
+       if err != nil {
+               return err
+       }
+
+       if ourUmount {
+               defer d.MountVolume(vol, op)
+       }
+
+       prefixedSnapshotName := fmt.Sprintf("snapshot_%s", snapshotName)
+
+       err = d.rbdRestoreVolume(vol.name, string(vol.volType), 
prefixedSnapshotName)
+       if err != nil {
+               return err
+       }
+
+       snapVol, err := vol.NewSnapshot(snapshotName)
+       if err != nil {
+               return err
+       }
+
+       err = d.generateUUID(snapVol)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) MigrateVolume(vol Volume, conn io.ReadWriteCloser, volSrcArgs 
*migration.VolumeSourceArgs, op *operations.Operation) error {
+       if vol.contentType != ContentTypeFS {
+               return ErrNotSupported
+       }
+
+       // Handle simple rsync through generic.
+       if volSrcArgs.MigrationType.FSType == migration.MigrationFSType_RSYNC {
+               return genericVFSMigrateVolume(d, d.state, vol, conn, 
volSrcArgs, op)
+       } else if volSrcArgs.MigrationType.FSType != 
migration.MigrationFSType_RBD {
+               return ErrNotSupported
+       }
+
+       if shared.IsSnapshot(vol.name) {
+               parentName, snapOnlyName, _ := 
shared.InstanceGetParentAndSnapshotName(vol.name)
+               sendName := fmt.Sprintf("%s/snapshots_%s_%s_start_clone", 
d.name, parentName, snapOnlyName)
+
+               cloneVol := NewVolume(d, d.name, vol.volType, vol.contentType, 
vol.name, nil, nil)
+
+               // Mounting the volume snapshot will create the clone 
"snapshots_<parent>_<snap>_start_clone".
+               _, err := d.MountVolumeSnapshot(cloneVol, op)
+               if err != nil {
+                       return err
+               }
+               defer d.UnmountVolumeSnapshot(cloneVol, op)
+
+               // Setup progress tracking.
+               var wrapper *ioprogress.ProgressTracker
+               if volSrcArgs.TrackProgress {
+                       wrapper = migration.ProgressTracker(op, "fs_progress", 
vol.name)
+               }
+
+               err = d.sendVolume(conn, sendName, "", wrapper)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+
+       lastSnap := ""
+
+       if !volSrcArgs.FinalSync {
+               for i, snapName := range volSrcArgs.Snapshots {
+                       snapshot, _ := vol.NewSnapshot(snapName)
+
+                       prev := ""
+
+                       if i > 0 {
+                               prev = fmt.Sprintf("snapshot_%s", 
volSrcArgs.Snapshots[i-1])
+                       }
+
+                       lastSnap = fmt.Sprintf("snapshot_%s", snapName)
+                       sendSnapName := d.getRBDVolumeName(vol.name, lastSnap, 
string(vol.volType), false, true)
+
+                       // Setup progress tracking.
+                       var wrapper *ioprogress.ProgressTracker
+
+                       if volSrcArgs.TrackProgress {
+                               wrapper = migration.ProgressTracker(op, 
"fs_progress", snapshot.name)
+                       }
+
+                       err := d.sendVolume(conn, sendSnapName, prev, wrapper)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+
+       // Setup progress tracking.
+       var wrapper *ioprogress.ProgressTracker
+       if volSrcArgs.TrackProgress {
+               wrapper = migration.ProgressTracker(op, "fs_progress", vol.name)
+       }
+
+       runningSnapName := fmt.Sprintf("migration-send-%s", 
uuid.NewRandom().String())
+
+       err := d.rbdCreateVolumeSnapshot(vol.name, string(vol.volType), 
runningSnapName)
+       if err != nil {
+               return err
+       }
+       defer d.rbdDeleteVolumeSnapshot(vol.name, string(vol.volType), 
runningSnapName)
+
+       cur := d.getRBDVolumeName(vol.name, runningSnapName, 
string(vol.volType), false, true)
+
+       err = d.sendVolume(conn, cur, lastSnap, wrapper)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) CreateVolumeFromMigration(vol Volume, conn io.ReadWriteCloser, 
volTargetArgs migration.VolumeTargetArgs, preFiller *VolumeFiller, op 
*operations.Operation) error {
+       if vol.contentType != ContentTypeFS {
+               return ErrNotSupported
+       }
+
+       // Handle simple rsync through generic.
+       if volTargetArgs.MigrationType.FSType == 
migration.MigrationFSType_RSYNC {
+               return genericCreateVolumeFromMigration(d, nil, vol, conn, 
volTargetArgs, preFiller, op)
+       } else if volTargetArgs.MigrationType.FSType != 
migration.MigrationFSType_RBD {
+               return ErrNotSupported
+       }
+
+       recvName := d.getRBDVolumeName(vol.name, "", string(vol.volType), 
false, true)
+
+       if !d.rbdVolumeExists(vol.name, string(vol.volType)) {
+               err := d.rbdCreateVolume(vol.name, string(vol.volType), "0")
+               if err != nil {
+                       return err
+               }
+       }
+
+       err := vol.EnsureMountPath()
+       if err != nil {
+               return err
+       }
+
+       // Handle zfs send/receive migration.
+       if len(volTargetArgs.Snapshots) > 0 {
+               // Create the parent directory.
+               err := createParentSnapshotDirIfMissing(d.name, vol.volType, 
vol.name)
+               if err != nil {
+                       return err
+               }
+
+               // Transfer the snapshots.
+               for _, snapName := range volTargetArgs.Snapshots {
+                       fullSnapshotName := d.getRBDVolumeName(vol.name, 
snapName, string(vol.volType), false, true)
+                       wrapper := migration.ProgressWriter(op, "fs_progress", 
fullSnapshotName)
+
+                       err = d.receiveVolume(recvName, conn, wrapper)
+                       if err != nil {
+                               return err
+                       }
+
+                       snapVol, err := vol.NewSnapshot(snapName)
+                       if err != nil {
+                               return err
+                       }
+
+                       err = snapVol.EnsureMountPath()
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+
+       defer func() {
+               // Delete all migration-send-* snapshots
+               snaps, err := d.rbdListVolumeSnapshots(vol.name, 
string(vol.volType))
+               if err != nil {
+                       return
+               }
+
+               for _, snap := range snaps {
+                       if !strings.HasPrefix(snap, "migration-send") {
+                               continue
+                       }
+
+                       d.rbdDeleteVolumeSnapshot(vol.name, 
string(vol.volType), snap)
+               }
+       }()
+
+       wrapper := migration.ProgressWriter(op, "fs_progress", vol.name)
+
+       err = d.receiveVolume(recvName, conn, wrapper)
+       if err != nil {
+               return err
+       }
+
+       if volTargetArgs.Live {
+               err = d.receiveVolume(recvName, conn, wrapper)
+               if err != nil {
+                       return err
+               }
+       }
+
+       err = d.generateUUID(vol)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *ceph) BackupVolume(vol Volume, targetPath string, optimized bool, 
snapshots bool, op *operations.Operation) error {
+       if snapshots {
+               snaps, err := vol.Snapshots(op)
+               if err != nil {
+                       return err
+               }
+
+               for _, snap := range snaps {
+                       err := d.rbdCreateVolumeBackup(snap, targetPath)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+
+       return d.rbdCreateVolumeBackup(vol, targetPath)
+}
+
+func (d *ceph) CreateVolumeFromBackup(vol Volume, snapshots []string, srcData 
io.ReadSeeker, optimizedStorage bool, op *operations.Operation) (func(vol 
Volume) error, func(), error) {
+       return nil, nil, ErrNotImplemented
+}
diff --git a/lxd/storage/drivers/load.go b/lxd/storage/drivers/load.go
index 2f7a114558..55f141038c 100644
--- a/lxd/storage/drivers/load.go
+++ b/lxd/storage/drivers/load.go
@@ -11,6 +11,7 @@ var drivers = map[string]func() driver{
        "dir":    func() driver { return &dir{} },
        "lvm":    func() driver { return &lvm{} },
        "zfs":    func() driver { return &zfs{} },
+       "ceph":   func() driver { return &ceph{} },
 }
 
 // Validators contains functions used for validating a drivers's config.

From ff0e24ecdd290ff043a48d0fd6bf4611a39f5cc3 Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.h...@canonical.com>
Date: Mon, 20 Jan 2020 15:55:29 +0100
Subject: [PATCH 2/2] DEBUG: disable tests

This commit will be removed again.

Signed-off-by: Thomas Hipp <thomas.h...@canonical.com>
---
 test/main.sh | 210 +++++++++++++++++++++++++--------------------------
 1 file changed, 105 insertions(+), 105 deletions(-)

diff --git a/test/main.sh b/test/main.sh
index ab098a5f48..b6931539e5 100755
--- a/test/main.sh
+++ b/test/main.sh
@@ -154,111 +154,111 @@ if [ "$#" -gt 0 ]; then
   exit
 fi
 
-run_test test_check_deps "checking dependencies"
-run_test test_static_analysis "static analysis"
-run_test test_database_update "database schema updates"
-run_test test_database_restore "database restore"
-run_test test_database_no_disk_space "database out of disk space"
-run_test test_sql "lxd sql"
-run_test test_basic_usage "basic usage"
-run_test test_remote_url "remote url handling"
-run_test test_remote_admin "remote administration"
-run_test test_remote_usage "remote usage"
-run_test test_clustering_enable "clustering enable"
-run_test test_clustering_membership "clustering membership"
-run_test test_clustering_containers "clustering containers"
-run_test test_clustering_storage "clustering storage"
-run_test test_clustering_network "clustering network"
-run_test test_clustering_publish "clustering publish"
-run_test test_clustering_profiles "clustering profiles"
-run_test test_clustering_join_api "clustering join api"
-run_test test_clustering_shutdown_nodes "clustering shutdown"
-run_test test_clustering_projects "clustering projects"
-run_test test_clustering_address "clustering address"
-run_test test_clustering_image_replication "clustering image replication"
-run_test test_clustering_dns "clustering DNS"
-run_test test_clustering_recover "clustering recovery"
-run_test test_clustering_handover "clustering handover"
-run_test test_clustering_rebalance "clustering rebalance"
-run_test test_clustering_upgrade "clustering upgrade"
-run_test test_projects_default "default project"
-run_test test_projects_crud "projects CRUD operations"
-run_test test_projects_containers "containers inside projects"
-run_test test_projects_snapshots "snapshots inside projects"
-run_test test_projects_backups "backups inside projects"
-run_test test_projects_profiles "profiles inside projects"
-run_test test_projects_profiles_default "profiles from the global default 
project"
-run_test test_projects_images "images inside projects"
-run_test test_projects_images_default "images from the global default project"
-run_test test_projects_storage "projects and storage pools"
-run_test test_projects_network "projects and networks"
-run_test test_container_devices_disk "container devices - disk"
-run_test test_container_devices_nic_p2p "container devices - nic - p2p"
-run_test test_container_devices_nic_bridged "container devices - nic - bridged"
-run_test test_container_devices_nic_bridged_filtering "container devices - nic 
- bridged - filtering"
-run_test test_container_devices_nic_physical "container devices - nic - 
physical"
-run_test test_container_devices_nic_macvlan "container devices - nic - macvlan"
-run_test test_container_devices_nic_ipvlan "container devices - nic - ipvlan"
-run_test test_container_devices_nic_sriov "container devices - nic - sriov"
-run_test test_container_devices_nic_routed "container devices - nic - routed"
-run_test test_container_devices_infiniband_physical "container devices - 
infiniband - physical"
-run_test test_container_devices_infiniband_sriov "container devices - 
infiniband - sriov"
-run_test test_container_devices_proxy "container devices - proxy"
-run_test test_container_devices_gpu "container devices - gpu"
-run_test test_container_devices_unix_char "container devices - unix-char"
-run_test test_container_devices_unix_block "container devices - unix-block"
-run_test test_security "security features"
-run_test test_security_protection "container protection"
-run_test test_image_expiry "image expiry"
-run_test test_image_list_all_aliases "image list all aliases"
-run_test test_image_auto_update "image auto-update"
-run_test test_image_prefer_cached "image prefer cached"
-run_test test_image_import_dir "import image from directory"
-run_test test_concurrent_exec "concurrent exec"
-run_test test_concurrent "concurrent startup"
-run_test test_snapshots "container snapshots"
-run_test test_snap_restore "snapshot restores"
-run_test test_snap_expiry "snapshot expiry"
-run_test test_config_profiles "profiles and configuration"
-run_test test_config_edit "container configuration edit"
-run_test test_config_edit_container_snapshot_pool_config "container and 
snapshot volume configuration edit"
-run_test test_container_metadata "manage container metadata and templates"
-run_test test_container_snapshot_config "container snapshot configuration"
-run_test test_server_config "server configuration"
-run_test test_filemanip "file manipulations"
-run_test test_network "network management"
-run_test test_idmap "id mapping"
-run_test test_template "file templating"
-run_test test_pki "PKI mode"
-run_test test_devlxd "/dev/lxd"
-run_test test_fuidshift "fuidshift"
-run_test test_migration "migration"
-run_test test_fdleak "fd leak"
-run_test test_storage "storage"
-run_test test_storage_volume_snapshots "storage volume snapshots"
-run_test test_init_auto "lxd init auto"
-run_test test_init_interactive "lxd init interactive"
-run_test test_init_preseed "lxd init preseed"
-run_test test_storage_profiles "storage profiles"
-run_test test_container_import "container import"
-run_test test_storage_volume_attach "attaching storage volumes"
-run_test test_storage_driver_ceph "ceph storage driver"
-run_test test_storage_driver_cephfs "cephfs storage driver"
-run_test test_resources "resources"
-run_test test_kernel_limits "kernel limits"
-run_test test_macaroon_auth "macaroon authentication"
-run_test test_console "console"
-run_test test_query "query"
-run_test test_storage_local_volume_handling "storage local volume handling"
-run_test test_backup_import "backup import"
-run_test test_backup_export "backup export"
-run_test test_backup_rename "backup rename"
-run_test test_container_local_cross_pool_handling "container local cross pool 
handling"
-run_test test_incremental_copy "incremental container copy"
-run_test test_profiles_project_default "profiles in default project"
-run_test test_profiles_project_images_profiles "profiles in project with 
images and profiles enabled"
-run_test test_profiles_project_images "profiles in project with images enabled 
and profiles disabled"
-run_test test_profiles_project_profiles "profiles in project with images 
disabled and profiles enabled"
+# run_test test_check_deps "checking dependencies"
+# run_test test_static_analysis "static analysis"
+# run_test test_database_update "database schema updates"
+# run_test test_database_restore "database restore"
+# run_test test_database_no_disk_space "database out of disk space"
+# run_test test_sql "lxd sql"
+# run_test test_basic_usage "basic usage"
+# run_test test_remote_url "remote url handling"
+# run_test test_remote_admin "remote administration"
+# run_test test_remote_usage "remote usage"
+# run_test test_clustering_enable "clustering enable"
+# run_test test_clustering_membership "clustering membership"
+# run_test test_clustering_containers "clustering containers"
+# run_test test_clustering_storage "clustering storage"
+# run_test test_clustering_network "clustering network"
+# run_test test_clustering_publish "clustering publish"
+# run_test test_clustering_profiles "clustering profiles"
+# run_test test_clustering_join_api "clustering join api"
+# run_test test_clustering_shutdown_nodes "clustering shutdown"
+# run_test test_clustering_projects "clustering projects"
+# run_test test_clustering_address "clustering address"
+# run_test test_clustering_image_replication "clustering image replication"
+# run_test test_clustering_dns "clustering DNS"
+# run_test test_clustering_recover "clustering recovery"
+# run_test test_clustering_handover "clustering handover"
+# run_test test_clustering_rebalance "clustering rebalance"
+# run_test test_clustering_upgrade "clustering upgrade"
+# run_test test_projects_default "default project"
+# run_test test_projects_crud "projects CRUD operations"
+# run_test test_projects_containers "containers inside projects"
+# run_test test_projects_snapshots "snapshots inside projects"
+# run_test test_projects_backups "backups inside projects"
+# run_test test_projects_profiles "profiles inside projects"
+# run_test test_projects_profiles_default "profiles from the global default 
project"
+# run_test test_projects_images "images inside projects"
+# run_test test_projects_images_default "images from the global default 
project"
+# run_test test_projects_storage "projects and storage pools"
+# run_test test_projects_network "projects and networks"
+# run_test test_container_devices_disk "container devices - disk"
+# run_test test_container_devices_nic_p2p "container devices - nic - p2p"
+# run_test test_container_devices_nic_bridged "container devices - nic - 
bridged"
+# run_test test_container_devices_nic_bridged_filtering "container devices - 
nic - bridged - filtering"
+# run_test test_container_devices_nic_physical "container devices - nic - 
physical"
+# run_test test_container_devices_nic_macvlan "container devices - nic - 
macvlan"
+# run_test test_container_devices_nic_ipvlan "container devices - nic - ipvlan"
+# run_test test_container_devices_nic_sriov "container devices - nic - sriov"
+# run_test test_container_devices_nic_routed "container devices - nic - routed"
+# run_test test_container_devices_infiniband_physical "container devices - 
infiniband - physical"
+# run_test test_container_devices_infiniband_sriov "container devices - 
infiniband - sriov"
+# run_test test_container_devices_proxy "container devices - proxy"
+# run_test test_container_devices_gpu "container devices - gpu"
+# run_test test_container_devices_unix_char "container devices - unix-char"
+# run_test test_container_devices_unix_block "container devices - unix-block"
+# run_test test_security "security features"
+# run_test test_security_protection "container protection"
+# run_test test_image_expiry "image expiry"
+# run_test test_image_list_all_aliases "image list all aliases"
+# run_test test_image_auto_update "image auto-update"
+# run_test test_image_prefer_cached "image prefer cached"
+# run_test test_image_import_dir "import image from directory"
+# run_test test_concurrent_exec "concurrent exec"
+# run_test test_concurrent "concurrent startup"
+# run_test test_snapshots "container snapshots"
+# run_test test_snap_restore "snapshot restores"
+# run_test test_snap_expiry "snapshot expiry"
+# run_test test_config_profiles "profiles and configuration"
+# run_test test_config_edit "container configuration edit"
+# run_test test_config_edit_container_snapshot_pool_config "container and 
snapshot volume configuration edit"
+# run_test test_container_metadata "manage container metadata and templates"
+# run_test test_container_snapshot_config "container snapshot configuration"
+# run_test test_server_config "server configuration"
+# run_test test_filemanip "file manipulations"
+# run_test test_network "network management"
+# run_test test_idmap "id mapping"
+# run_test test_template "file templating"
+# run_test test_pki "PKI mode"
+# run_test test_devlxd "/dev/lxd"
+# run_test test_fuidshift "fuidshift"
+# run_test test_migration "migration"
+# run_test test_fdleak "fd leak"
+# run_test test_storage "storage"
+# run_test test_storage_volume_snapshots "storage volume snapshots"
+# run_test test_init_auto "lxd init auto"
+# run_test test_init_interactive "lxd init interactive"
+# run_test test_init_preseed "lxd init preseed"
+# run_test test_storage_profiles "storage profiles"
+# run_test test_container_import "container import"
+# run_test test_storage_volume_attach "attaching storage volumes"
+# run_test test_storage_driver_ceph "ceph storage driver"
+# run_test test_storage_driver_cephfs "cephfs storage driver"
+# run_test test_resources "resources"
+# run_test test_kernel_limits "kernel limits"
+# run_test test_macaroon_auth "macaroon authentication"
+# run_test test_console "console"
+# run_test test_query "query"
+# run_test test_storage_local_volume_handling "storage local volume handling"
+# run_test test_backup_import "backup import"
+# run_test test_backup_export "backup export"
+# run_test test_backup_rename "backup rename"
+# run_test test_container_local_cross_pool_handling "container local cross 
pool handling"
+# run_test test_incremental_copy "incremental container copy"
+# run_test test_profiles_project_default "profiles in default project"
+# run_test test_profiles_project_images_profiles "profiles in project with 
images and profiles enabled"
+# run_test test_profiles_project_images "profiles in project with images 
enabled and profiles disabled"
+# run_test test_profiles_project_profiles "profiles in project with images 
disabled and profiles enabled"
 
 # shellcheck disable=SC2034
 TEST_RESULT=success
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to