The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6374

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===

From 07686d47246ee073bcc529473d3ecd9ee0d645b9 Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.h...@canonical.com>
Date: Thu, 31 Oct 2019 09:53:45 +0100
Subject: [PATCH 1/2] lxd/storage: Add locking for mount operations

Signed-off-by: Thomas Hipp <thomas.h...@canonical.com>
---
 lxd/storage/backend_lxd.go | 24 +++++++++++++
 lxd/storage/lock.go        | 69 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 93 insertions(+)
 create mode 100644 lxd/storage/lock.go

diff --git a/lxd/storage/backend_lxd.go b/lxd/storage/backend_lxd.go
index 9f91ab84d9..d7e5f560be 100644
--- a/lxd/storage/backend_lxd.go
+++ b/lxd/storage/backend_lxd.go
@@ -125,11 +125,23 @@ func (b *lxdBackend) Delete(op *operations.Operation) 
error {
 
 // Mount mounts the storage pool.
 func (b *lxdBackend) Mount() (bool, error) {
+       unlock := lock(getPoolMountLockID(b.name))
+       if unlock == nil {
+               return false, nil
+       }
+       defer unlock()
+
        return b.driver.Mount()
 }
 
 // Unmount unmounts the storage pool.
 func (b *lxdBackend) Unmount() (bool, error) {
+       unlock := lock(getPoolUmountLockID(b.name))
+       if unlock == nil {
+               return false, nil
+       }
+       defer unlock()
+
        return b.driver.Unmount()
 }
 
@@ -596,11 +608,23 @@ func (b *lxdBackend) SetCustomVolumeQuota(vol 
api.StorageVolume, quota uint64) e
 
 // MountCustomVolume mounts a custom volume.
 func (b *lxdBackend) MountCustomVolume(volName string, op 
*operations.Operation) (bool, error) {
+       unlock := lock(getVolumeMountLockID(volName))
+       if unlock == nil {
+               return false, nil
+       }
+       defer unlock()
+
        return b.driver.MountVolume(drivers.VolumeTypeCustom, volName, op)
 }
 
 // UnmountCustomVolume unmounts a custom volume.
 func (b *lxdBackend) UnmountCustomVolume(volName string, op 
*operations.Operation) (bool, error) {
+       unlock := lock(getVolumeUmountLockID(volName))
+       if unlock == nil {
+               return false, nil
+       }
+       defer unlock()
+
        return b.driver.UnmountVolume(drivers.VolumeTypeCustom, volName, op)
 }
 
diff --git a/lxd/storage/lock.go b/lxd/storage/lock.go
new file mode 100644
index 0000000000..e16a794adf
--- /dev/null
+++ b/lxd/storage/lock.go
@@ -0,0 +1,69 @@
+package storage
+
+import (
+       "fmt"
+       "sync"
+
+       "github.com/lxc/lxd/shared/logger"
+)
+
+// lxdStorageLockMap is a hashmap that allows functions to check whether the
+// operation they are about to perform is already in progress. If it is the
+// channel can be used to wait for the operation to finish. If it is not, the
+// function that wants to perform the operation should store its code in the
+// hashmap.
+// Note that any access to this map must be done while holding a lock.
+var lxdStorageOngoingOperationMap = map[string]chan bool{}
+
+// lxdStorageMapLock is used to access lxdStorageOngoingOperationMap.
+var lxdStorageMapLock sync.Mutex
+
+// The following functions are used to construct simple operation codes that 
are
+// unique.
+func getPoolMountLockID(poolName string) string {
+       return fmt.Sprintf("mount/pool/%s", poolName)
+}
+
+func getPoolUmountLockID(poolName string) string {
+       return fmt.Sprintf("umount/pool/%s", poolName)
+}
+
+func getVolumeMountLockID(volumeName string) string {
+       return fmt.Sprintf("mount/volume/%s", volumeName)
+}
+
+func getVolumeUmountLockID(volumeName string) string {
+       return fmt.Sprintf("umount/volume/%s", volumeName)
+}
+
+func lock(lockID string) func() {
+       lxdStorageMapLock.Lock()
+
+       if waitChannel, ok := lxdStorageOngoingOperationMap[lockID]; ok {
+               lxdStorageMapLock.Unlock()
+
+               _, ok := <-waitChannel
+               if ok {
+                       logger.Warnf("Received value over semaphore, this 
should ot have happened")
+               }
+
+               // Give the benefit of the doubt and assume that the other
+               // thread actually succeeded in mounting the storage pool.
+               return nil
+       }
+
+       lxdStorageOngoingOperationMap[lockID] = make(chan bool)
+       lxdStorageMapLock.Unlock()
+
+       return func() {
+               lxdStorageMapLock.Lock()
+
+               waitChannel, ok := lxdStorageOngoingOperationMap[lockID]
+               if ok {
+                       close(waitChannel)
+                       delete(lxdStorageOngoingOperationMap, lockID)
+               }
+
+               lxdStorageMapLock.Unlock()
+       }
+}

From 0b07c363a61600d1fa76d6b7568396b5ba67160d Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.h...@canonical.com>
Date: Wed, 30 Oct 2019 19:24:31 +0100
Subject: [PATCH 2/2] lxd/storage/drivers: Add cephfs

This adds the cephfs storage driver.

Signed-off-by: Thomas Hipp <thomas.h...@canonical.com>
---
 lxd/storage/drivers/driver_cephfs.go | 1013 ++++++++++++++++++++++++++
 lxd/storage/drivers/load.go          |    3 +-
 2 files changed, 1015 insertions(+), 1 deletion(-)
 create mode 100644 lxd/storage/drivers/driver_cephfs.go

diff --git a/lxd/storage/drivers/driver_cephfs.go 
b/lxd/storage/drivers/driver_cephfs.go
new file mode 100644
index 0000000000..26d04d91bf
--- /dev/null
+++ b/lxd/storage/drivers/driver_cephfs.go
@@ -0,0 +1,1013 @@
+package drivers
+
+import (
+       "bufio"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "os"
+       "path/filepath"
+       "strings"
+
+       "github.com/lxc/lxd/lxd/migration"
+       "github.com/lxc/lxd/lxd/operations"
+       "github.com/lxc/lxd/lxd/rsync"
+       "github.com/lxc/lxd/lxd/storage/quota"
+       "github.com/lxc/lxd/shared"
+       "github.com/lxc/lxd/shared/api"
+       "github.com/lxc/lxd/shared/ioprogress"
+       "github.com/lxc/lxd/shared/units"
+)
+
+type cephfs struct {
+       common
+
+       fsName  string
+       version string
+}
+
+func (d *cephfs) Info() Info {
+       if d.version == "" {
+               msg, err := shared.RunCommand("rbd", "--version")
+               if err != nil {
+                       d.version = "unknown"
+               } else {
+                       d.version = strings.TrimSpace(msg)
+               }
+       }
+
+       return Info{
+               Name:            "cephfs",
+               Version:         d.version,
+               OptimizedImages: true,
+               PreservesInodes: false,
+               Usable:          true,
+               Remote:          false,
+               VolumeTypes:     []VolumeType{VolumeTypeCustom},
+       }
+}
+
+func (d *cephfs) HasVolume(volType VolumeType, volName string) bool {
+       if shared.PathExists(GetVolumeMountPath(d.name, volType, volName)) {
+               return true
+       }
+
+       return false
+}
+
+func (d *cephfs) Create() error {
+       if d.config["source"] == "" {
+               return fmt.Errorf("Missing required source name/path")
+       }
+
+       if d.config["cephfs.path"] != "" && d.config["cephfs.path"] != 
d.config["source"] {
+               return fmt.Errorf("cephfs.path must match the source")
+       }
+
+       if d.config["cephfs.cluster_name"] == "" {
+               d.config["cephfs.cluster_name"] = "ceph"
+       }
+
+       if d.config["cephfs.user.name"] != "" {
+               d.config["cephfs.user.name"] = "admin"
+       }
+
+       d.fsName = d.config["source"]
+
+       // Parse the namespace / path
+       fields := strings.SplitN(d.fsName, "/", 2)
+       fsName := fields[0]
+       fsPath := "/"
+       if len(fields) > 1 {
+               fsPath = fields[1]
+       }
+
+       // Check that the filesystem exists
+       if !d.fsExists(d.config["cephfs.cluster_name"], 
d.config["cephfs.user.name"], fsName) {
+               return fmt.Errorf("The requested '%v' CEPHFS doesn't exist", 
fsName)
+       }
+
+       // Create a temporary mountpoint
+       mountPath, err := ioutil.TempDir("", "lxd_cephfs_")
+       if err != nil {
+               return err
+       }
+       defer os.RemoveAll(mountPath)
+
+       err = os.Chmod(mountPath, 0700)
+       if err != nil {
+               return err
+       }
+
+       mountPoint := filepath.Join(mountPath, "mount")
+
+       err = os.Mkdir(mountPoint, 0700)
+       if err != nil {
+               return err
+       }
+
+       // Get the credentials and host
+       monAddresses, userSecret, err := 
d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"])
+       if err != nil {
+               return err
+       }
+
+       connected := false
+       for _, monAddress := range monAddresses {
+               uri := fmt.Sprintf("%s:6789:/", monAddress)
+               err = tryMount(uri, mountPoint, "ceph", 0, 
fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], 
userSecret, fsName))
+               if err != nil {
+                       continue
+               }
+
+               connected = true
+               defer forceUnmount(mountPoint)
+               break
+       }
+
+       if !connected {
+               return err
+       }
+
+       // Create the path if missing
+       err = os.MkdirAll(filepath.Join(mountPoint, fsPath), 0755)
+       if err != nil {
+               return err
+       }
+
+       // Check that the existing path is empty
+       ok, _ := shared.PathIsEmpty(filepath.Join(mountPoint, fsPath))
+       if !ok {
+               return fmt.Errorf("Only empty CEPHFS paths can be used as a LXD 
storage pool")
+       }
+
+       return nil
+}
+
+func (d *cephfs) Delete(op *operations.Operation) error {
+       // Parse the namespace / path
+       fields := strings.SplitN(d.fsName, "/", 2)
+       fsName := fields[0]
+       fsPath := "/"
+       if len(fields) > 1 {
+               fsPath = fields[1]
+       }
+
+       // Create a temporary mountpoint
+       mountPath, err := ioutil.TempDir("", "lxd_cephfs_")
+       if err != nil {
+               return err
+       }
+       defer os.RemoveAll(mountPath)
+
+       err = os.Chmod(mountPath, 0700)
+       if err != nil {
+               return err
+       }
+
+       mountPoint := filepath.Join(mountPath, "mount")
+       err = os.Mkdir(mountPoint, 0700)
+       if err != nil {
+               return err
+       }
+
+       // Get the credentials and host
+       monAddresses, userSecret, err := 
d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"])
+       if err != nil {
+               return err
+       }
+
+       connected := false
+       for _, monAddress := range monAddresses {
+               uri := fmt.Sprintf("%s:6789:/", monAddress)
+               err = tryMount(uri, mountPoint, "ceph", 0, 
fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], 
userSecret, fsName))
+               if err != nil {
+                       continue
+               }
+
+               connected = true
+               defer forceUnmount(mountPoint)
+               break
+       }
+
+       if !connected {
+               return err
+       }
+
+       if shared.PathExists(filepath.Join(mountPoint, fsPath)) {
+               // Delete the usual directories
+               for _, dir := range []string{"custom", "custom-snapshots"} {
+                       if shared.PathExists(filepath.Join(mountPoint, fsPath, 
dir)) {
+                               err = os.Remove(filepath.Join(mountPoint, 
fsPath, dir))
+                               if err != nil {
+                                       return err
+                               }
+                       }
+               }
+
+               // Confirm that the path is now empty
+               ok, _ := shared.PathIsEmpty(filepath.Join(mountPoint, fsPath))
+               if !ok {
+                       return fmt.Errorf("Only empty CEPHFS paths can be used 
as a LXD storage pool")
+               }
+
+               // Delete the path itself
+               if fsPath != "" && fsPath != "/" {
+                       err = os.Remove(filepath.Join(mountPoint, fsPath))
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+
+       // On delete, wipe everything in the directory.
+       err = wipeDirectory(GetPoolMountPath(d.name))
+       if err != nil {
+               return err
+       }
+
+       // Make sure the existing pool is unmounted
+       _, err = d.Unmount()
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *cephfs) Mount() (bool, error) {
+       // Parse the namespace / path
+       fields := strings.SplitN(d.fsName, "/", 2)
+       fsName := fields[0]
+       fsPath := "/"
+       if len(fields) > 1 {
+               fsPath = fields[1]
+       }
+
+       // Get the credentials and host
+       monAddresses, secret, err := 
d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"])
+       if err != nil {
+               return false, err
+       }
+
+       // Do the actual mount
+       connected := false
+       for _, monAddress := range monAddresses {
+               uri := fmt.Sprintf("%s:6789:/%s", monAddress, fsPath)
+               err = tryMount(uri, GetPoolMountPath(d.name), "ceph", 0, 
fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], 
secret, fsName))
+               if err != nil {
+                       continue
+               }
+
+               connected = true
+               break
+       }
+
+       if !connected {
+               return false, err
+       }
+
+       return true, nil
+}
+
+func (d *cephfs) Unmount() (bool, error) {
+       return forceUnmount(GetPoolMountPath(d.name))
+}
+
+func (d *cephfs) GetResources() (*api.ResourcesStoragePool, error) {
+       // Use the generic VFS resources.
+       return vfsResources(GetPoolMountPath(d.name))
+}
+
+func (d *cephfs) ValidateVolume(volConfig map[string]string, removeUnknownKeys 
bool) error {
+       return d.validateVolume(volConfig, nil, removeUnknownKeys)
+}
+
+func (d *cephfs) CreateVolume(vol Volume, filler func(path string) error, op 
*operations.Operation) error {
+       if vol.volType != VolumeTypeCustom {
+               return fmt.Errorf("Volume type not supported")
+       }
+
+       if vol.contentType != ContentTypeFS {
+               return fmt.Errorf("Content type not supported")
+       }
+
+       volPath := vol.MountPath()
+
+       // Get the volume ID for the new volume, which is used to set project 
quota.
+       volID, err := d.getVolID(vol.volType, vol.name)
+       if err != nil {
+               return err
+       }
+
+       err = os.MkdirAll(volPath, 0711)
+       if err != nil {
+               return err
+       }
+
+       revertPath := true
+       defer func() {
+               if revertPath {
+                       d.deleteQuota(volPath, volID)
+                       os.RemoveAll(volPath)
+               }
+       }()
+
+       // Initialise the volume's quota using the volume ID.
+       err = d.initQuota(volPath, volID)
+       if err != nil {
+               return err
+       }
+
+       // Set the quota if specified in volConfig or pool config.
+       err = d.setQuota(volPath, volID, vol.config["size"])
+       if err != nil {
+               return err
+       }
+
+       if filler != nil {
+               err = filler(volPath)
+               if err != nil {
+                       return err
+               }
+       }
+
+       revertPath = false
+       return nil
+}
+
+func (d *cephfs) CreateVolumeFromCopy(vol Volume, srcVol Volume, copySnapshots 
bool, op *operations.Operation) error {
+       if vol.volType != VolumeTypeCustom || srcVol.volType != 
VolumeTypeCustom {
+               return fmt.Errorf("Volume type not supported")
+       }
+
+       if vol.contentType != ContentTypeFS || srcVol.contentType != 
ContentTypeFS {
+               return fmt.Errorf("Content type not supported")
+       }
+
+       bwlimit := d.config["rsync.bwlimit"]
+
+       // Get the volume ID for the new volumes, which is used to set project 
quota.
+       volID, err := d.getVolID(vol.volType, vol.name)
+       if err != nil {
+               return err
+       }
+
+       // Create slice of paths created if revert needed later.
+       revertPaths := []string{}
+       defer func() {
+               // Remove any paths created if we are reverting.
+               for _, path := range revertPaths {
+                       d.deleteQuota(path, volID)
+                       os.RemoveAll(path)
+               }
+       }()
+
+       if copySnapshots && !srcVol.IsSnapshot() {
+               srcSnapshots, err := srcVol.Snapshots(op)
+               if err != nil {
+                       return err
+               }
+
+               for _, srcSnapshot := range srcSnapshots {
+                       _, snapName, _ := 
shared.ContainerGetParentAndSnapshotName(srcSnapshot.name)
+                       dstSnapshot, err := vol.NewSnapshot(snapName)
+                       if err != nil {
+                               return err
+                       }
+
+                       dstSnapPath := dstSnapshot.MountPath()
+                       err = os.MkdirAll(dstSnapPath, 0711)
+                       if err != nil {
+                               return err
+                       }
+
+                       revertPaths = append(revertPaths, dstSnapPath)
+
+                       // Initialise the snapshot's quota with the parent 
volume's ID.
+                       err = d.initQuota(dstSnapPath, volID)
+                       if err != nil {
+                               return err
+                       }
+
+                       err = srcSnapshot.MountTask(func(srcMountPath string, 
op *operations.Operation) error {
+                               return dstSnapshot.MountTask(func(dstMountPath 
string, op *operations.Operation) error {
+                                       _, err = rsync.LocalCopy(srcMountPath, 
dstMountPath, bwlimit, true)
+                                       if err != nil {
+                                               return err
+                                       }
+
+                                       cephSnapPath := 
filepath.Join(dstMountPath, ".snap", snapName)
+                                       err := os.Mkdir(cephSnapPath, 0711)
+                                       if err != nil {
+                                               return err
+                                       }
+
+                                       // Make the snapshot path a symlink
+                                       targetPath := 
GetVolumeMountPath(d.name, VolumeTypeCustom, GetSnapshotVolumeName(vol.name, 
snapName))
+                                       err = 
os.MkdirAll(filepath.Dir(targetPath), 0711)
+                                       if err != nil {
+                                               return err
+                                       }
+
+                                       return os.Symlink(cephSnapPath, 
targetPath)
+                               }, op)
+                       }, op)
+               }
+       }
+
+       volPath := vol.MountPath()
+       err = os.MkdirAll(volPath, 0711)
+       if err != nil {
+               return err
+       }
+
+       revertPaths = append(revertPaths, volPath)
+
+       // Initialise the volume's quota using the volume ID.
+       err = d.initQuota(volPath, volID)
+       if err != nil {
+               return err
+       }
+
+       // Set the quota if specified in volConfig or pool config.
+       err = d.setQuota(volPath, volID, vol.config["size"])
+       if err != nil {
+               return err
+       }
+
+       // Copy source to destination (mounting each volume if needed).
+       err = srcVol.MountTask(func(srcMountPath string, op 
*operations.Operation) error {
+               return vol.MountTask(func(dstMountPath string, op 
*operations.Operation) error {
+                       _, err := rsync.LocalCopy(srcMountPath, dstMountPath, 
bwlimit, true)
+                       if err != nil {
+                               return err
+                       }
+
+                       if vol.IsSnapshot() {
+                               _, snapName, _ := 
shared.ContainerGetParentAndSnapshotName(vol.name)
+
+                               cephSnapPath := filepath.Join(dstMountPath, 
".snap", snapName)
+                               err := os.Mkdir(cephSnapPath, 0711)
+                               if err != nil {
+                                       return err
+                               }
+
+                               // Make the snapshot path a symlink
+                               targetPath := GetVolumeMountPath(d.name, 
VolumeTypeCustom, fmt.Sprintf("%s/%s", vol.name, snapName))
+                               err = os.MkdirAll(filepath.Dir(targetPath), 
0711)
+                               if err != nil {
+                                       return err
+                               }
+
+                               return os.Symlink(cephSnapPath, targetPath)
+                       }
+
+                       return nil
+               }, op)
+       }, op)
+       if err != nil {
+               return err
+       }
+
+       revertPaths = nil // Don't revert.
+       return nil
+}
+
+func (d *cephfs) DeleteVolume(volType VolumeType, volName string, op 
*operations.Operation) error {
+       if volType != VolumeTypeCustom {
+               return fmt.Errorf("Volume type not supported")
+       }
+
+       snapshots, err := d.VolumeSnapshots(volType, volName, op)
+       if err != nil {
+               return err
+       }
+
+       if len(snapshots) > 0 {
+               return fmt.Errorf("Cannot remove a volume that has snapshots")
+       }
+
+       volPath := GetVolumeMountPath(d.name, volType, volName)
+
+       // If the volume doesn't exist, then nothing more to do.
+       if !shared.PathExists(volPath) {
+               return nil
+       }
+
+       // Get the volume ID for the volume, which is used to remove project 
quota.
+       volID, err := d.getVolID(volType, volName)
+       if err != nil {
+               return err
+       }
+
+       // Remove the project quota.
+       err = d.deleteQuota(volPath, volID)
+       if err != nil {
+               return err
+       }
+
+       // Remove the volume from the storage device.
+       err = os.RemoveAll(volPath)
+       if err != nil {
+               return err
+       }
+
+       // Although the volume snapshot directory should already be removed, 
lets remove it here
+       // to just in case the top-level directory is left.
+       snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName)
+       if err != nil {
+               return err
+       }
+
+       err = os.RemoveAll(snapshotDir)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *cephfs) RenameVolume(volType VolumeType, volName string, newName 
string, op *operations.Operation) error {
+       if volType != VolumeTypeCustom {
+               return fmt.Errorf("Volume type not supported")
+       }
+
+       vol := NewVolume(d, d.name, volType, ContentTypeFS, volName, nil)
+
+       // Create new snapshots directory.
+       snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, newName)
+       if err != nil {
+               return err
+       }
+
+       err = os.MkdirAll(snapshotDir, 0711)
+       if err != nil {
+               return err
+       }
+
+       type volRevert struct {
+               oldPath   string
+               newPath   string
+               isSymlink bool
+       }
+
+       // Create slice to record paths renamed if revert needed later.
+       revertPaths := []volRevert{}
+       defer func() {
+               // Remove any paths rename if we are reverting.
+               for _, vol := range revertPaths {
+                       if vol.isSymlink {
+                               os.Symlink(vol.oldPath, vol.newPath)
+                       } else {
+                               os.Rename(vol.newPath, vol.oldPath)
+                       }
+               }
+
+               // Remove the new snapshot directory if we are reverting.
+               if len(revertPaths) > 0 {
+                       err = os.RemoveAll(snapshotDir)
+               }
+       }()
+
+       // Rename any snapshots of the volume too.
+       snapshots, err := vol.Snapshots(op)
+       if err != nil {
+               return err
+       }
+
+       for _, snapshot := range snapshots {
+               srcSnapshotDir, err := GetVolumeSnapshotDir(d.name, volType, 
volName)
+               if err != nil {
+                       return err
+               }
+
+               targetSnapshotDir, err := GetVolumeSnapshotDir(d.name, volType, 
newName)
+               if err != nil {
+                       return err
+               }
+
+               err = os.Rename(srcSnapshotDir, targetSnapshotDir)
+               if err != nil {
+                       return err
+               }
+
+               sourcePath := GetVolumeMountPath(d.name, volType, newName)
+               targetPath := GetVolumeMountPath(d.name, volType, newName)
+               _, snapName, _ := 
shared.ContainerGetParentAndSnapshotName(snapshot.name)
+
+               oldCephSnapPath := filepath.Join(sourcePath, ".snap", snapName)
+               newCephSnapPath := filepath.Join(targetPath, ".snap", snapName)
+
+               oldPath := GetVolumeMountPath(d.name, volType, 
GetSnapshotVolumeName(volName, snapName))
+               newPath := GetVolumeMountPath(d.name, volType, 
GetSnapshotVolumeName(newName, snapName))
+
+               err = os.Symlink(newCephSnapPath, newPath)
+               if err != nil {
+                       return err
+               }
+
+               revertPaths = append(revertPaths, volRevert{
+                       oldPath:   oldPath,
+                       newPath:   oldCephSnapPath,
+                       isSymlink: true,
+               })
+
+               revertPaths = append(revertPaths, volRevert{
+                       oldPath: srcSnapshotDir,
+                       newPath: targetSnapshotDir,
+               })
+       }
+
+       oldPath := GetVolumeMountPath(d.name, volType, volName)
+       newPath := GetVolumeMountPath(d.name, volType, newName)
+       err = os.Rename(oldPath, newPath)
+       if err != nil {
+               return err
+       }
+
+       revertPaths = append(revertPaths, volRevert{
+               oldPath: oldPath,
+               newPath: newPath,
+       })
+
+       revertPaths = nil
+       return nil
+}
+
+func (d *cephfs) MountVolume(volType VolumeType, volName string, op 
*operations.Operation) (bool, error) {
+       if volType != VolumeTypeCustom {
+               return false, fmt.Errorf("Volume type not supported")
+       }
+
+       return false, nil
+}
+
+func (d *cephfs) MountVolumeSnapshot(volType VolumeType, VolName, snapshotName 
string, op *operations.Operation) (bool, error) {
+       if volType != VolumeTypeCustom {
+               return false, fmt.Errorf("Volume type not supported")
+       }
+
+       return false, nil
+}
+
+func (d *cephfs) UnmountVolume(volType VolumeType, volName string, op 
*operations.Operation) (bool, error) {
+       if volType != VolumeTypeCustom {
+               return false, fmt.Errorf("Volume type not supported")
+       }
+
+       return false, nil
+}
+
+func (d *cephfs) UnmountVolumeSnapshot(volType VolumeType, volName, 
snapshotName string, op *operations.Operation) (bool, error) {
+       if volType != VolumeTypeCustom {
+               return false, fmt.Errorf("Volume type not supported")
+       }
+
+       return false, nil
+}
+
+func (d *cephfs) CreateVolumeSnapshot(volType VolumeType, volName string, 
newSnapshotName string, op *operations.Operation) error {
+       if volType != VolumeTypeCustom {
+               return fmt.Errorf("Volume type not supported")
+       }
+
+       // Create the snapshot
+       sourcePath := GetVolumeMountPath(d.name, volType, volName)
+       cephSnapPath := filepath.Join(sourcePath, ".snap", newSnapshotName)
+
+       err := os.Mkdir(cephSnapPath, 0711)
+       if err != nil {
+               return err
+       }
+
+       targetPath := GetVolumeMountPath(d.name, volType, 
GetSnapshotVolumeName(volName, newSnapshotName))
+
+       err = os.MkdirAll(filepath.Dir(targetPath), 0711)
+       if err != nil {
+               return err
+       }
+
+       err = os.Symlink(cephSnapPath, targetPath)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *cephfs) DeleteVolumeSnapshot(volType VolumeType, volName string, 
snapshotName string, op *operations.Operation) error {
+       if volType != VolumeTypeCustom {
+               return fmt.Errorf("Volume type not supported")
+       }
+
+       sourcePath := GetVolumeMountPath(d.name, volType, volName)
+       cephSnapPath := filepath.Join(sourcePath, ".snap", snapshotName)
+
+       err := os.RemoveAll(cephSnapPath)
+       if err != nil {
+               return err
+       }
+
+       // Get the volume ID for the parent volume, which is used to remove 
project quota.
+       volID, err := d.getVolID(volType, volName)
+       if err != nil {
+               return err
+       }
+
+       // Make the snapshot path a symlink
+       snapPath := GetVolumeMountPath(d.name, volType, 
GetSnapshotVolumeName(volName, snapshotName))
+
+       // Remove the project quota.
+       err = d.deleteQuota(snapPath, volID)
+       if err != nil {
+               return err
+       }
+
+       err = os.RemoveAll(snapPath)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *cephfs) RenameVolumeSnapshot(volType VolumeType, volName string, 
snapshotName string, newSnapshotName string, op *operations.Operation) error {
+       if volType != VolumeTypeCustom {
+               return fmt.Errorf("Volume type not supported")
+       }
+
+       sourcePath := GetVolumeMountPath(d.name, volType, volName)
+       oldCephSnapPath := filepath.Join(sourcePath, ".snap", snapshotName)
+       newCephSnapPath := filepath.Join(sourcePath, ".snap", newSnapshotName)
+
+       err := os.Rename(oldCephSnapPath, newCephSnapPath)
+       if err != nil {
+               return err
+       }
+
+       // Re-generate the snapshot symlink
+       oldPath := GetVolumeMountPath(d.name, volType, 
GetSnapshotVolumeName(volName, snapshotName))
+       err = os.Remove(oldPath)
+       if err != nil {
+               return err
+       }
+
+       newPath := GetVolumeMountPath(d.name, volType, 
GetSnapshotVolumeName(volName, newSnapshotName))
+       err = os.Symlink(newCephSnapPath, newPath)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (d *cephfs) VolumeSnapshots(volType VolumeType, volName string, op 
*operations.Operation) ([]string, error) {
+       if volType != VolumeTypeCustom {
+               return nil, fmt.Errorf("Volume type not supported")
+       }
+
+       snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName)
+       if err != nil {
+               return nil, err
+       }
+
+       snapshots := []string{}
+
+       ents, err := ioutil.ReadDir(snapshotDir)
+       if err != nil {
+               // If the snapshots directory doesn't exist, there are no 
snapshots.
+               if os.IsNotExist(err) {
+                       return snapshots, nil
+               }
+
+               return nil, err
+       }
+
+       for _, ent := range ents {
+               fileInfo, err := os.Stat(filepath.Join(snapshotDir, ent.Name()))
+               if err != nil {
+                       return nil, err
+               }
+
+               if !fileInfo.IsDir() {
+                       continue
+               }
+
+               snapshots = append(snapshots, ent.Name())
+       }
+
+       return snapshots, nil
+}
+
+func (d *cephfs) MigrateVolume(vol Volume, conn io.ReadWriteCloser, volSrcArgs 
migration.VolumeSourceArgs, op *operations.Operation) error {
+       if vol.volType != VolumeTypeCustom {
+               return fmt.Errorf("Volume type not supported")
+       }
+
+       if vol.contentType != ContentTypeFS {
+               return fmt.Errorf("Content type not supported")
+       }
+
+       if volSrcArgs.MigrationType.FSType != migration.MigrationFSType_RSYNC {
+               return fmt.Errorf("Migration type not supported")
+       }
+
+       bwlimit := d.config["rsync.bwlimit"]
+
+       for _, snapName := range volSrcArgs.Snapshots {
+               snapshot, err := vol.NewSnapshot(snapName)
+               if err != nil {
+                       return err
+               }
+
+               // Send snapshot to recipient (ensure local snapshot volume is 
mounted if needed).
+               err = snapshot.MountTask(func(mountPath string, op 
*operations.Operation) error {
+                       var wrapper *ioprogress.ProgressTracker
+                       if volSrcArgs.TrackProgress {
+                               wrapper = migration.ProgressTracker(op, 
"fs_progress", snapshot.name)
+                       }
+
+                       path := shared.AddSlash(mountPath)
+                       return rsync.Send(snapshot.name, path, conn, wrapper, 
volSrcArgs.MigrationType.Features, bwlimit, d.state.OS.ExecPath)
+               }, op)
+               if err != nil {
+                       return err
+               }
+       }
+
+       // Send volume to recipient (ensure local volume is mounted if needed).
+       return vol.MountTask(func(mountPath string, op *operations.Operation) 
error {
+               var wrapper *ioprogress.ProgressTracker
+               if volSrcArgs.TrackProgress {
+                       wrapper = migration.ProgressTracker(op, "fs_progress", 
vol.name)
+               }
+
+               path := shared.AddSlash(mountPath)
+               return rsync.Send(vol.name, path, conn, wrapper, 
volSrcArgs.MigrationType.Features, bwlimit, d.state.OS.ExecPath)
+       }, op)
+}
+
+func (d *cephfs) CreateVolumeFromMigration(vol Volume, conn 
io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, op 
*operations.Operation) error {
+       return nil
+}
+
+func (d *cephfs) fsExists(clusterName string, userName string, fsName string) 
bool {
+       _, err := shared.RunCommand("ceph", "--name", fmt.Sprintf("client.%s", 
userName), "--cluster", clusterName, "fs", "get", fsName)
+       if err != nil {
+               return false
+       }
+
+       return true
+}
+
+func (d *cephfs) getConfig(clusterName string, userName string) ([]string, 
string, error) {
+       // Parse the CEPH configuration
+       cephConf, err := os.Open(fmt.Sprintf("/etc/ceph/%s.conf", clusterName))
+       if err != nil {
+               return nil, "", err
+       }
+
+       cephMon := []string{}
+
+       scan := bufio.NewScanner(cephConf)
+       for scan.Scan() {
+               line := scan.Text()
+               line = strings.TrimSpace(line)
+
+               if line == "" {
+                       continue
+               }
+
+               if strings.HasPrefix(line, "mon_host") {
+                       fields := strings.SplitN(line, "=", 2)
+                       if len(fields) < 2 {
+                               continue
+                       }
+
+                       servers := strings.Split(fields[1], ",")
+                       for _, server := range servers {
+                               cephMon = append(cephMon, 
strings.TrimSpace(server))
+                       }
+                       break
+               }
+       }
+
+       if len(cephMon) == 0 {
+               return nil, "", fmt.Errorf("Couldn't find a CPEH mon")
+       }
+
+       // Parse the CEPH keyring
+       cephKeyring, err := 
os.Open(fmt.Sprintf("/etc/ceph/%v.client.%v.keyring", clusterName, userName))
+       if err != nil {
+               return nil, "", err
+       }
+
+       var cephSecret string
+
+       scan = bufio.NewScanner(cephKeyring)
+       for scan.Scan() {
+               line := scan.Text()
+               line = strings.TrimSpace(line)
+
+               if line == "" {
+                       continue
+               }
+
+               if strings.HasPrefix(line, "key") {
+                       fields := strings.SplitN(line, "=", 2)
+                       if len(fields) < 2 {
+                               continue
+                       }
+
+                       cephSecret = strings.TrimSpace(fields[1])
+                       break
+               }
+       }
+
+       if cephSecret == "" {
+               return nil, "", fmt.Errorf("Couldn't find a keyring entry")
+       }
+
+       return cephMon, cephSecret, nil
+}
+
+// initQuota initialises the project quota on the path. The volID generates a 
quota project ID.
+func (d *cephfs) initQuota(path string, volID int64) error {
+       if volID == 0 {
+               return fmt.Errorf("Missing volume ID")
+       }
+
+       ok, err := quota.Supported(path)
+       if err != nil || !ok {
+               // Skipping quota as underlying filesystem doesn't suppport 
project quotas.
+               return nil
+       }
+
+       err = quota.SetProject(path, d.quotaProjectID(volID))
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// setQuota sets the project quota on the path. The volID generates a quota 
project ID.
+func (d *cephfs) setQuota(path string, volID int64, size string) error {
+       if volID == 0 {
+               return fmt.Errorf("Missing volume ID")
+       }
+
+       // If size not specified in volume config, then use pool's default 
volume.size setting.
+       if size == "" || size == "0" {
+               size = d.config["volume.size"]
+       }
+
+       sizeBytes, err := units.ParseByteSizeString(size)
+       if err != nil {
+               return err
+       }
+
+       ok, err := quota.Supported(path)
+       if err != nil || !ok {
+               // Skipping quota as underlying filesystem doesn't suppport 
project quotas.
+               return nil
+       }
+
+       err = quota.SetProjectQuota(path, d.quotaProjectID(volID), sizeBytes)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// deleteQuota removes the project quota for a volID from a path.
+func (d *cephfs) deleteQuota(path string, volID int64) error {
+       if volID == 0 {
+               return fmt.Errorf("Missing volume ID")
+       }
+
+       ok, err := quota.Supported(path)
+       if err != nil || !ok {
+               // Skipping quota as underlying filesystem doesn't suppport 
project quotas.
+               return nil
+       }
+
+       err = quota.SetProject(path, 0)
+       if err != nil {
+               return err
+       }
+
+       err = quota.SetProjectQuota(path, d.quotaProjectID(volID), 0)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// quotaProjectID generates a project quota ID from a volume ID.
+func (d *cephfs) quotaProjectID(volID int64) uint32 {
+       return uint32(volID + 10000)
+}
diff --git a/lxd/storage/drivers/load.go b/lxd/storage/drivers/load.go
index a10fdfd01f..cffa6439a6 100644
--- a/lxd/storage/drivers/load.go
+++ b/lxd/storage/drivers/load.go
@@ -5,7 +5,8 @@ import (
 )
 
 var drivers = map[string]func() driver{
-       "dir": func() driver { return &dir{} },
+       "dir":    func() driver { return &dir{} },
+       "cephfs": func() driver { return &cephfs{} },
 }
 
 // Load returns a Driver for an existing low-level storage pool.
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to