The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6374
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) ===
From 07686d47246ee073bcc529473d3ecd9ee0d645b9 Mon Sep 17 00:00:00 2001 From: Thomas Hipp <thomas.h...@canonical.com> Date: Thu, 31 Oct 2019 09:53:45 +0100 Subject: [PATCH 1/2] lxd/storage: Add locking for mount operations Signed-off-by: Thomas Hipp <thomas.h...@canonical.com> --- lxd/storage/backend_lxd.go | 24 +++++++++++++ lxd/storage/lock.go | 69 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 lxd/storage/lock.go diff --git a/lxd/storage/backend_lxd.go b/lxd/storage/backend_lxd.go index 9f91ab84d9..d7e5f560be 100644 --- a/lxd/storage/backend_lxd.go +++ b/lxd/storage/backend_lxd.go @@ -125,11 +125,23 @@ func (b *lxdBackend) Delete(op *operations.Operation) error { // Mount mounts the storage pool. func (b *lxdBackend) Mount() (bool, error) { + unlock := lock(getPoolMountLockID(b.name)) + if unlock == nil { + return false, nil + } + defer unlock() + return b.driver.Mount() } // Unmount unmounts the storage pool. func (b *lxdBackend) Unmount() (bool, error) { + unlock := lock(getPoolUmountLockID(b.name)) + if unlock == nil { + return false, nil + } + defer unlock() + return b.driver.Unmount() } @@ -596,11 +608,23 @@ func (b *lxdBackend) SetCustomVolumeQuota(vol api.StorageVolume, quota uint64) e // MountCustomVolume mounts a custom volume. func (b *lxdBackend) MountCustomVolume(volName string, op *operations.Operation) (bool, error) { + unlock := lock(getVolumeMountLockID(volName)) + if unlock == nil { + return false, nil + } + defer unlock() + return b.driver.MountVolume(drivers.VolumeTypeCustom, volName, op) } // UnmountCustomVolume unmounts a custom volume. func (b *lxdBackend) UnmountCustomVolume(volName string, op *operations.Operation) (bool, error) { + unlock := lock(getVolumeUmountLockID(volName)) + if unlock == nil { + return false, nil + } + defer unlock() + return b.driver.UnmountVolume(drivers.VolumeTypeCustom, volName, op) } diff --git a/lxd/storage/lock.go b/lxd/storage/lock.go new file mode 100644 index 0000000000..e16a794adf --- /dev/null +++ b/lxd/storage/lock.go @@ -0,0 +1,69 @@ +package storage + +import ( + "fmt" + "sync" + + "github.com/lxc/lxd/shared/logger" +) + +// lxdStorageLockMap is a hashmap that allows functions to check whether the +// operation they are about to perform is already in progress. If it is the +// channel can be used to wait for the operation to finish. If it is not, the +// function that wants to perform the operation should store its code in the +// hashmap. +// Note that any access to this map must be done while holding a lock. +var lxdStorageOngoingOperationMap = map[string]chan bool{} + +// lxdStorageMapLock is used to access lxdStorageOngoingOperationMap. +var lxdStorageMapLock sync.Mutex + +// The following functions are used to construct simple operation codes that are +// unique. +func getPoolMountLockID(poolName string) string { + return fmt.Sprintf("mount/pool/%s", poolName) +} + +func getPoolUmountLockID(poolName string) string { + return fmt.Sprintf("umount/pool/%s", poolName) +} + +func getVolumeMountLockID(volumeName string) string { + return fmt.Sprintf("mount/volume/%s", volumeName) +} + +func getVolumeUmountLockID(volumeName string) string { + return fmt.Sprintf("umount/volume/%s", volumeName) +} + +func lock(lockID string) func() { + lxdStorageMapLock.Lock() + + if waitChannel, ok := lxdStorageOngoingOperationMap[lockID]; ok { + lxdStorageMapLock.Unlock() + + _, ok := <-waitChannel + if ok { + logger.Warnf("Received value over semaphore, this should ot have happened") + } + + // Give the benefit of the doubt and assume that the other + // thread actually succeeded in mounting the storage pool. + return nil + } + + lxdStorageOngoingOperationMap[lockID] = make(chan bool) + lxdStorageMapLock.Unlock() + + return func() { + lxdStorageMapLock.Lock() + + waitChannel, ok := lxdStorageOngoingOperationMap[lockID] + if ok { + close(waitChannel) + delete(lxdStorageOngoingOperationMap, lockID) + } + + lxdStorageMapLock.Unlock() + } +} From 0b07c363a61600d1fa76d6b7568396b5ba67160d Mon Sep 17 00:00:00 2001 From: Thomas Hipp <thomas.h...@canonical.com> Date: Wed, 30 Oct 2019 19:24:31 +0100 Subject: [PATCH 2/2] lxd/storage/drivers: Add cephfs This adds the cephfs storage driver. Signed-off-by: Thomas Hipp <thomas.h...@canonical.com> --- lxd/storage/drivers/driver_cephfs.go | 1013 ++++++++++++++++++++++++++ lxd/storage/drivers/load.go | 3 +- 2 files changed, 1015 insertions(+), 1 deletion(-) create mode 100644 lxd/storage/drivers/driver_cephfs.go diff --git a/lxd/storage/drivers/driver_cephfs.go b/lxd/storage/drivers/driver_cephfs.go new file mode 100644 index 0000000000..26d04d91bf --- /dev/null +++ b/lxd/storage/drivers/driver_cephfs.go @@ -0,0 +1,1013 @@ +package drivers + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + + "github.com/lxc/lxd/lxd/migration" + "github.com/lxc/lxd/lxd/operations" + "github.com/lxc/lxd/lxd/rsync" + "github.com/lxc/lxd/lxd/storage/quota" + "github.com/lxc/lxd/shared" + "github.com/lxc/lxd/shared/api" + "github.com/lxc/lxd/shared/ioprogress" + "github.com/lxc/lxd/shared/units" +) + +type cephfs struct { + common + + fsName string + version string +} + +func (d *cephfs) Info() Info { + if d.version == "" { + msg, err := shared.RunCommand("rbd", "--version") + if err != nil { + d.version = "unknown" + } else { + d.version = strings.TrimSpace(msg) + } + } + + return Info{ + Name: "cephfs", + Version: d.version, + OptimizedImages: true, + PreservesInodes: false, + Usable: true, + Remote: false, + VolumeTypes: []VolumeType{VolumeTypeCustom}, + } +} + +func (d *cephfs) HasVolume(volType VolumeType, volName string) bool { + if shared.PathExists(GetVolumeMountPath(d.name, volType, volName)) { + return true + } + + return false +} + +func (d *cephfs) Create() error { + if d.config["source"] == "" { + return fmt.Errorf("Missing required source name/path") + } + + if d.config["cephfs.path"] != "" && d.config["cephfs.path"] != d.config["source"] { + return fmt.Errorf("cephfs.path must match the source") + } + + if d.config["cephfs.cluster_name"] == "" { + d.config["cephfs.cluster_name"] = "ceph" + } + + if d.config["cephfs.user.name"] != "" { + d.config["cephfs.user.name"] = "admin" + } + + d.fsName = d.config["source"] + + // Parse the namespace / path + fields := strings.SplitN(d.fsName, "/", 2) + fsName := fields[0] + fsPath := "/" + if len(fields) > 1 { + fsPath = fields[1] + } + + // Check that the filesystem exists + if !d.fsExists(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"], fsName) { + return fmt.Errorf("The requested '%v' CEPHFS doesn't exist", fsName) + } + + // Create a temporary mountpoint + mountPath, err := ioutil.TempDir("", "lxd_cephfs_") + if err != nil { + return err + } + defer os.RemoveAll(mountPath) + + err = os.Chmod(mountPath, 0700) + if err != nil { + return err + } + + mountPoint := filepath.Join(mountPath, "mount") + + err = os.Mkdir(mountPoint, 0700) + if err != nil { + return err + } + + // Get the credentials and host + monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"]) + if err != nil { + return err + } + + connected := false + for _, monAddress := range monAddresses { + uri := fmt.Sprintf("%s:6789:/", monAddress) + err = tryMount(uri, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], userSecret, fsName)) + if err != nil { + continue + } + + connected = true + defer forceUnmount(mountPoint) + break + } + + if !connected { + return err + } + + // Create the path if missing + err = os.MkdirAll(filepath.Join(mountPoint, fsPath), 0755) + if err != nil { + return err + } + + // Check that the existing path is empty + ok, _ := shared.PathIsEmpty(filepath.Join(mountPoint, fsPath)) + if !ok { + return fmt.Errorf("Only empty CEPHFS paths can be used as a LXD storage pool") + } + + return nil +} + +func (d *cephfs) Delete(op *operations.Operation) error { + // Parse the namespace / path + fields := strings.SplitN(d.fsName, "/", 2) + fsName := fields[0] + fsPath := "/" + if len(fields) > 1 { + fsPath = fields[1] + } + + // Create a temporary mountpoint + mountPath, err := ioutil.TempDir("", "lxd_cephfs_") + if err != nil { + return err + } + defer os.RemoveAll(mountPath) + + err = os.Chmod(mountPath, 0700) + if err != nil { + return err + } + + mountPoint := filepath.Join(mountPath, "mount") + err = os.Mkdir(mountPoint, 0700) + if err != nil { + return err + } + + // Get the credentials and host + monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"]) + if err != nil { + return err + } + + connected := false + for _, monAddress := range monAddresses { + uri := fmt.Sprintf("%s:6789:/", monAddress) + err = tryMount(uri, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], userSecret, fsName)) + if err != nil { + continue + } + + connected = true + defer forceUnmount(mountPoint) + break + } + + if !connected { + return err + } + + if shared.PathExists(filepath.Join(mountPoint, fsPath)) { + // Delete the usual directories + for _, dir := range []string{"custom", "custom-snapshots"} { + if shared.PathExists(filepath.Join(mountPoint, fsPath, dir)) { + err = os.Remove(filepath.Join(mountPoint, fsPath, dir)) + if err != nil { + return err + } + } + } + + // Confirm that the path is now empty + ok, _ := shared.PathIsEmpty(filepath.Join(mountPoint, fsPath)) + if !ok { + return fmt.Errorf("Only empty CEPHFS paths can be used as a LXD storage pool") + } + + // Delete the path itself + if fsPath != "" && fsPath != "/" { + err = os.Remove(filepath.Join(mountPoint, fsPath)) + if err != nil { + return err + } + } + } + + // On delete, wipe everything in the directory. + err = wipeDirectory(GetPoolMountPath(d.name)) + if err != nil { + return err + } + + // Make sure the existing pool is unmounted + _, err = d.Unmount() + if err != nil { + return err + } + + return nil +} + +func (d *cephfs) Mount() (bool, error) { + // Parse the namespace / path + fields := strings.SplitN(d.fsName, "/", 2) + fsName := fields[0] + fsPath := "/" + if len(fields) > 1 { + fsPath = fields[1] + } + + // Get the credentials and host + monAddresses, secret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"]) + if err != nil { + return false, err + } + + // Do the actual mount + connected := false + for _, monAddress := range monAddresses { + uri := fmt.Sprintf("%s:6789:/%s", monAddress, fsPath) + err = tryMount(uri, GetPoolMountPath(d.name), "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], secret, fsName)) + if err != nil { + continue + } + + connected = true + break + } + + if !connected { + return false, err + } + + return true, nil +} + +func (d *cephfs) Unmount() (bool, error) { + return forceUnmount(GetPoolMountPath(d.name)) +} + +func (d *cephfs) GetResources() (*api.ResourcesStoragePool, error) { + // Use the generic VFS resources. + return vfsResources(GetPoolMountPath(d.name)) +} + +func (d *cephfs) ValidateVolume(volConfig map[string]string, removeUnknownKeys bool) error { + return d.validateVolume(volConfig, nil, removeUnknownKeys) +} + +func (d *cephfs) CreateVolume(vol Volume, filler func(path string) error, op *operations.Operation) error { + if vol.volType != VolumeTypeCustom { + return fmt.Errorf("Volume type not supported") + } + + if vol.contentType != ContentTypeFS { + return fmt.Errorf("Content type not supported") + } + + volPath := vol.MountPath() + + // Get the volume ID for the new volume, which is used to set project quota. + volID, err := d.getVolID(vol.volType, vol.name) + if err != nil { + return err + } + + err = os.MkdirAll(volPath, 0711) + if err != nil { + return err + } + + revertPath := true + defer func() { + if revertPath { + d.deleteQuota(volPath, volID) + os.RemoveAll(volPath) + } + }() + + // Initialise the volume's quota using the volume ID. + err = d.initQuota(volPath, volID) + if err != nil { + return err + } + + // Set the quota if specified in volConfig or pool config. + err = d.setQuota(volPath, volID, vol.config["size"]) + if err != nil { + return err + } + + if filler != nil { + err = filler(volPath) + if err != nil { + return err + } + } + + revertPath = false + return nil +} + +func (d *cephfs) CreateVolumeFromCopy(vol Volume, srcVol Volume, copySnapshots bool, op *operations.Operation) error { + if vol.volType != VolumeTypeCustom || srcVol.volType != VolumeTypeCustom { + return fmt.Errorf("Volume type not supported") + } + + if vol.contentType != ContentTypeFS || srcVol.contentType != ContentTypeFS { + return fmt.Errorf("Content type not supported") + } + + bwlimit := d.config["rsync.bwlimit"] + + // Get the volume ID for the new volumes, which is used to set project quota. + volID, err := d.getVolID(vol.volType, vol.name) + if err != nil { + return err + } + + // Create slice of paths created if revert needed later. + revertPaths := []string{} + defer func() { + // Remove any paths created if we are reverting. + for _, path := range revertPaths { + d.deleteQuota(path, volID) + os.RemoveAll(path) + } + }() + + if copySnapshots && !srcVol.IsSnapshot() { + srcSnapshots, err := srcVol.Snapshots(op) + if err != nil { + return err + } + + for _, srcSnapshot := range srcSnapshots { + _, snapName, _ := shared.ContainerGetParentAndSnapshotName(srcSnapshot.name) + dstSnapshot, err := vol.NewSnapshot(snapName) + if err != nil { + return err + } + + dstSnapPath := dstSnapshot.MountPath() + err = os.MkdirAll(dstSnapPath, 0711) + if err != nil { + return err + } + + revertPaths = append(revertPaths, dstSnapPath) + + // Initialise the snapshot's quota with the parent volume's ID. + err = d.initQuota(dstSnapPath, volID) + if err != nil { + return err + } + + err = srcSnapshot.MountTask(func(srcMountPath string, op *operations.Operation) error { + return dstSnapshot.MountTask(func(dstMountPath string, op *operations.Operation) error { + _, err = rsync.LocalCopy(srcMountPath, dstMountPath, bwlimit, true) + if err != nil { + return err + } + + cephSnapPath := filepath.Join(dstMountPath, ".snap", snapName) + err := os.Mkdir(cephSnapPath, 0711) + if err != nil { + return err + } + + // Make the snapshot path a symlink + targetPath := GetVolumeMountPath(d.name, VolumeTypeCustom, GetSnapshotVolumeName(vol.name, snapName)) + err = os.MkdirAll(filepath.Dir(targetPath), 0711) + if err != nil { + return err + } + + return os.Symlink(cephSnapPath, targetPath) + }, op) + }, op) + } + } + + volPath := vol.MountPath() + err = os.MkdirAll(volPath, 0711) + if err != nil { + return err + } + + revertPaths = append(revertPaths, volPath) + + // Initialise the volume's quota using the volume ID. + err = d.initQuota(volPath, volID) + if err != nil { + return err + } + + // Set the quota if specified in volConfig or pool config. + err = d.setQuota(volPath, volID, vol.config["size"]) + if err != nil { + return err + } + + // Copy source to destination (mounting each volume if needed). + err = srcVol.MountTask(func(srcMountPath string, op *operations.Operation) error { + return vol.MountTask(func(dstMountPath string, op *operations.Operation) error { + _, err := rsync.LocalCopy(srcMountPath, dstMountPath, bwlimit, true) + if err != nil { + return err + } + + if vol.IsSnapshot() { + _, snapName, _ := shared.ContainerGetParentAndSnapshotName(vol.name) + + cephSnapPath := filepath.Join(dstMountPath, ".snap", snapName) + err := os.Mkdir(cephSnapPath, 0711) + if err != nil { + return err + } + + // Make the snapshot path a symlink + targetPath := GetVolumeMountPath(d.name, VolumeTypeCustom, fmt.Sprintf("%s/%s", vol.name, snapName)) + err = os.MkdirAll(filepath.Dir(targetPath), 0711) + if err != nil { + return err + } + + return os.Symlink(cephSnapPath, targetPath) + } + + return nil + }, op) + }, op) + if err != nil { + return err + } + + revertPaths = nil // Don't revert. + return nil +} + +func (d *cephfs) DeleteVolume(volType VolumeType, volName string, op *operations.Operation) error { + if volType != VolumeTypeCustom { + return fmt.Errorf("Volume type not supported") + } + + snapshots, err := d.VolumeSnapshots(volType, volName, op) + if err != nil { + return err + } + + if len(snapshots) > 0 { + return fmt.Errorf("Cannot remove a volume that has snapshots") + } + + volPath := GetVolumeMountPath(d.name, volType, volName) + + // If the volume doesn't exist, then nothing more to do. + if !shared.PathExists(volPath) { + return nil + } + + // Get the volume ID for the volume, which is used to remove project quota. + volID, err := d.getVolID(volType, volName) + if err != nil { + return err + } + + // Remove the project quota. + err = d.deleteQuota(volPath, volID) + if err != nil { + return err + } + + // Remove the volume from the storage device. + err = os.RemoveAll(volPath) + if err != nil { + return err + } + + // Although the volume snapshot directory should already be removed, lets remove it here + // to just in case the top-level directory is left. + snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName) + if err != nil { + return err + } + + err = os.RemoveAll(snapshotDir) + if err != nil { + return err + } + + return nil +} + +func (d *cephfs) RenameVolume(volType VolumeType, volName string, newName string, op *operations.Operation) error { + if volType != VolumeTypeCustom { + return fmt.Errorf("Volume type not supported") + } + + vol := NewVolume(d, d.name, volType, ContentTypeFS, volName, nil) + + // Create new snapshots directory. + snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, newName) + if err != nil { + return err + } + + err = os.MkdirAll(snapshotDir, 0711) + if err != nil { + return err + } + + type volRevert struct { + oldPath string + newPath string + isSymlink bool + } + + // Create slice to record paths renamed if revert needed later. + revertPaths := []volRevert{} + defer func() { + // Remove any paths rename if we are reverting. + for _, vol := range revertPaths { + if vol.isSymlink { + os.Symlink(vol.oldPath, vol.newPath) + } else { + os.Rename(vol.newPath, vol.oldPath) + } + } + + // Remove the new snapshot directory if we are reverting. + if len(revertPaths) > 0 { + err = os.RemoveAll(snapshotDir) + } + }() + + // Rename any snapshots of the volume too. + snapshots, err := vol.Snapshots(op) + if err != nil { + return err + } + + for _, snapshot := range snapshots { + srcSnapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName) + if err != nil { + return err + } + + targetSnapshotDir, err := GetVolumeSnapshotDir(d.name, volType, newName) + if err != nil { + return err + } + + err = os.Rename(srcSnapshotDir, targetSnapshotDir) + if err != nil { + return err + } + + sourcePath := GetVolumeMountPath(d.name, volType, newName) + targetPath := GetVolumeMountPath(d.name, volType, newName) + _, snapName, _ := shared.ContainerGetParentAndSnapshotName(snapshot.name) + + oldCephSnapPath := filepath.Join(sourcePath, ".snap", snapName) + newCephSnapPath := filepath.Join(targetPath, ".snap", snapName) + + oldPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, snapName)) + newPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(newName, snapName)) + + err = os.Symlink(newCephSnapPath, newPath) + if err != nil { + return err + } + + revertPaths = append(revertPaths, volRevert{ + oldPath: oldPath, + newPath: oldCephSnapPath, + isSymlink: true, + }) + + revertPaths = append(revertPaths, volRevert{ + oldPath: srcSnapshotDir, + newPath: targetSnapshotDir, + }) + } + + oldPath := GetVolumeMountPath(d.name, volType, volName) + newPath := GetVolumeMountPath(d.name, volType, newName) + err = os.Rename(oldPath, newPath) + if err != nil { + return err + } + + revertPaths = append(revertPaths, volRevert{ + oldPath: oldPath, + newPath: newPath, + }) + + revertPaths = nil + return nil +} + +func (d *cephfs) MountVolume(volType VolumeType, volName string, op *operations.Operation) (bool, error) { + if volType != VolumeTypeCustom { + return false, fmt.Errorf("Volume type not supported") + } + + return false, nil +} + +func (d *cephfs) MountVolumeSnapshot(volType VolumeType, VolName, snapshotName string, op *operations.Operation) (bool, error) { + if volType != VolumeTypeCustom { + return false, fmt.Errorf("Volume type not supported") + } + + return false, nil +} + +func (d *cephfs) UnmountVolume(volType VolumeType, volName string, op *operations.Operation) (bool, error) { + if volType != VolumeTypeCustom { + return false, fmt.Errorf("Volume type not supported") + } + + return false, nil +} + +func (d *cephfs) UnmountVolumeSnapshot(volType VolumeType, volName, snapshotName string, op *operations.Operation) (bool, error) { + if volType != VolumeTypeCustom { + return false, fmt.Errorf("Volume type not supported") + } + + return false, nil +} + +func (d *cephfs) CreateVolumeSnapshot(volType VolumeType, volName string, newSnapshotName string, op *operations.Operation) error { + if volType != VolumeTypeCustom { + return fmt.Errorf("Volume type not supported") + } + + // Create the snapshot + sourcePath := GetVolumeMountPath(d.name, volType, volName) + cephSnapPath := filepath.Join(sourcePath, ".snap", newSnapshotName) + + err := os.Mkdir(cephSnapPath, 0711) + if err != nil { + return err + } + + targetPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, newSnapshotName)) + + err = os.MkdirAll(filepath.Dir(targetPath), 0711) + if err != nil { + return err + } + + err = os.Symlink(cephSnapPath, targetPath) + if err != nil { + return err + } + + return nil +} + +func (d *cephfs) DeleteVolumeSnapshot(volType VolumeType, volName string, snapshotName string, op *operations.Operation) error { + if volType != VolumeTypeCustom { + return fmt.Errorf("Volume type not supported") + } + + sourcePath := GetVolumeMountPath(d.name, volType, volName) + cephSnapPath := filepath.Join(sourcePath, ".snap", snapshotName) + + err := os.RemoveAll(cephSnapPath) + if err != nil { + return err + } + + // Get the volume ID for the parent volume, which is used to remove project quota. + volID, err := d.getVolID(volType, volName) + if err != nil { + return err + } + + // Make the snapshot path a symlink + snapPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, snapshotName)) + + // Remove the project quota. + err = d.deleteQuota(snapPath, volID) + if err != nil { + return err + } + + err = os.RemoveAll(snapPath) + if err != nil { + return err + } + + return nil +} + +func (d *cephfs) RenameVolumeSnapshot(volType VolumeType, volName string, snapshotName string, newSnapshotName string, op *operations.Operation) error { + if volType != VolumeTypeCustom { + return fmt.Errorf("Volume type not supported") + } + + sourcePath := GetVolumeMountPath(d.name, volType, volName) + oldCephSnapPath := filepath.Join(sourcePath, ".snap", snapshotName) + newCephSnapPath := filepath.Join(sourcePath, ".snap", newSnapshotName) + + err := os.Rename(oldCephSnapPath, newCephSnapPath) + if err != nil { + return err + } + + // Re-generate the snapshot symlink + oldPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, snapshotName)) + err = os.Remove(oldPath) + if err != nil { + return err + } + + newPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, newSnapshotName)) + err = os.Symlink(newCephSnapPath, newPath) + if err != nil { + return err + } + + return nil +} + +func (d *cephfs) VolumeSnapshots(volType VolumeType, volName string, op *operations.Operation) ([]string, error) { + if volType != VolumeTypeCustom { + return nil, fmt.Errorf("Volume type not supported") + } + + snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName) + if err != nil { + return nil, err + } + + snapshots := []string{} + + ents, err := ioutil.ReadDir(snapshotDir) + if err != nil { + // If the snapshots directory doesn't exist, there are no snapshots. + if os.IsNotExist(err) { + return snapshots, nil + } + + return nil, err + } + + for _, ent := range ents { + fileInfo, err := os.Stat(filepath.Join(snapshotDir, ent.Name())) + if err != nil { + return nil, err + } + + if !fileInfo.IsDir() { + continue + } + + snapshots = append(snapshots, ent.Name()) + } + + return snapshots, nil +} + +func (d *cephfs) MigrateVolume(vol Volume, conn io.ReadWriteCloser, volSrcArgs migration.VolumeSourceArgs, op *operations.Operation) error { + if vol.volType != VolumeTypeCustom { + return fmt.Errorf("Volume type not supported") + } + + if vol.contentType != ContentTypeFS { + return fmt.Errorf("Content type not supported") + } + + if volSrcArgs.MigrationType.FSType != migration.MigrationFSType_RSYNC { + return fmt.Errorf("Migration type not supported") + } + + bwlimit := d.config["rsync.bwlimit"] + + for _, snapName := range volSrcArgs.Snapshots { + snapshot, err := vol.NewSnapshot(snapName) + if err != nil { + return err + } + + // Send snapshot to recipient (ensure local snapshot volume is mounted if needed). + err = snapshot.MountTask(func(mountPath string, op *operations.Operation) error { + var wrapper *ioprogress.ProgressTracker + if volSrcArgs.TrackProgress { + wrapper = migration.ProgressTracker(op, "fs_progress", snapshot.name) + } + + path := shared.AddSlash(mountPath) + return rsync.Send(snapshot.name, path, conn, wrapper, volSrcArgs.MigrationType.Features, bwlimit, d.state.OS.ExecPath) + }, op) + if err != nil { + return err + } + } + + // Send volume to recipient (ensure local volume is mounted if needed). + return vol.MountTask(func(mountPath string, op *operations.Operation) error { + var wrapper *ioprogress.ProgressTracker + if volSrcArgs.TrackProgress { + wrapper = migration.ProgressTracker(op, "fs_progress", vol.name) + } + + path := shared.AddSlash(mountPath) + return rsync.Send(vol.name, path, conn, wrapper, volSrcArgs.MigrationType.Features, bwlimit, d.state.OS.ExecPath) + }, op) +} + +func (d *cephfs) CreateVolumeFromMigration(vol Volume, conn io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, op *operations.Operation) error { + return nil +} + +func (d *cephfs) fsExists(clusterName string, userName string, fsName string) bool { + _, err := shared.RunCommand("ceph", "--name", fmt.Sprintf("client.%s", userName), "--cluster", clusterName, "fs", "get", fsName) + if err != nil { + return false + } + + return true +} + +func (d *cephfs) getConfig(clusterName string, userName string) ([]string, string, error) { + // Parse the CEPH configuration + cephConf, err := os.Open(fmt.Sprintf("/etc/ceph/%s.conf", clusterName)) + if err != nil { + return nil, "", err + } + + cephMon := []string{} + + scan := bufio.NewScanner(cephConf) + for scan.Scan() { + line := scan.Text() + line = strings.TrimSpace(line) + + if line == "" { + continue + } + + if strings.HasPrefix(line, "mon_host") { + fields := strings.SplitN(line, "=", 2) + if len(fields) < 2 { + continue + } + + servers := strings.Split(fields[1], ",") + for _, server := range servers { + cephMon = append(cephMon, strings.TrimSpace(server)) + } + break + } + } + + if len(cephMon) == 0 { + return nil, "", fmt.Errorf("Couldn't find a CPEH mon") + } + + // Parse the CEPH keyring + cephKeyring, err := os.Open(fmt.Sprintf("/etc/ceph/%v.client.%v.keyring", clusterName, userName)) + if err != nil { + return nil, "", err + } + + var cephSecret string + + scan = bufio.NewScanner(cephKeyring) + for scan.Scan() { + line := scan.Text() + line = strings.TrimSpace(line) + + if line == "" { + continue + } + + if strings.HasPrefix(line, "key") { + fields := strings.SplitN(line, "=", 2) + if len(fields) < 2 { + continue + } + + cephSecret = strings.TrimSpace(fields[1]) + break + } + } + + if cephSecret == "" { + return nil, "", fmt.Errorf("Couldn't find a keyring entry") + } + + return cephMon, cephSecret, nil +} + +// initQuota initialises the project quota on the path. The volID generates a quota project ID. +func (d *cephfs) initQuota(path string, volID int64) error { + if volID == 0 { + return fmt.Errorf("Missing volume ID") + } + + ok, err := quota.Supported(path) + if err != nil || !ok { + // Skipping quota as underlying filesystem doesn't suppport project quotas. + return nil + } + + err = quota.SetProject(path, d.quotaProjectID(volID)) + if err != nil { + return err + } + + return nil +} + +// setQuota sets the project quota on the path. The volID generates a quota project ID. +func (d *cephfs) setQuota(path string, volID int64, size string) error { + if volID == 0 { + return fmt.Errorf("Missing volume ID") + } + + // If size not specified in volume config, then use pool's default volume.size setting. + if size == "" || size == "0" { + size = d.config["volume.size"] + } + + sizeBytes, err := units.ParseByteSizeString(size) + if err != nil { + return err + } + + ok, err := quota.Supported(path) + if err != nil || !ok { + // Skipping quota as underlying filesystem doesn't suppport project quotas. + return nil + } + + err = quota.SetProjectQuota(path, d.quotaProjectID(volID), sizeBytes) + if err != nil { + return err + } + + return nil +} + +// deleteQuota removes the project quota for a volID from a path. +func (d *cephfs) deleteQuota(path string, volID int64) error { + if volID == 0 { + return fmt.Errorf("Missing volume ID") + } + + ok, err := quota.Supported(path) + if err != nil || !ok { + // Skipping quota as underlying filesystem doesn't suppport project quotas. + return nil + } + + err = quota.SetProject(path, 0) + if err != nil { + return err + } + + err = quota.SetProjectQuota(path, d.quotaProjectID(volID), 0) + if err != nil { + return err + } + + return nil +} + +// quotaProjectID generates a project quota ID from a volume ID. +func (d *cephfs) quotaProjectID(volID int64) uint32 { + return uint32(volID + 10000) +} diff --git a/lxd/storage/drivers/load.go b/lxd/storage/drivers/load.go index a10fdfd01f..cffa6439a6 100644 --- a/lxd/storage/drivers/load.go +++ b/lxd/storage/drivers/load.go @@ -5,7 +5,8 @@ import ( ) var drivers = map[string]func() driver{ - "dir": func() driver { return &dir{} }, + "dir": func() driver { return &dir{} }, + "cephfs": func() driver { return &cephfs{} }, } // Load returns a Driver for an existing low-level storage pool.
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel