How to use: 1. prepare test image (any distribution with working network), and modify IMAGE variable in script
You'll need to setup static ip in the image, for example in Ubuntu: cat /etc/netplan/01-netcfg.yaml network: version: 2 renderer: networkd ethernets: ens1: dhcp4: no addresses: [10.0.1.2/24] gateway4: 10.0.1.1 nameservers: addresses: [8.8.8.8, 8.8.4.4] 2. run script with sudo (to manipulate with TAPs) cd python sudo ./tap-migration-stand.py --qemu-binary path/to/qemu-system-x86_64 \ --mode (new | cpr) mode=new means current series, mode=cpr means "[RFC V2 0/8] Live update: tap and vhost" 3. run in separate terminal something like while true; do gvncviewer :0; gvncviewer :1; sleep 0.5; done to monitor our vms (on live update, vnc port will change every time) 4. run ping processes, for example in host: ping -f 10.0.1.2 -i 0.002 in vm: ping -f 10.0.1.1 -i 0 5. in running script, run command "u" (without quotes), which means live-update. 6. you may also pass a counter, like "u 1000", to update 1000 times, which helps to catch racy bugs. Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@yandex-team.ru> --- python/qemu/machine/machine.py | 37 ++-- python/tap-migration-stand.py | 348 +++++++++++++++++++++++++++++++++ 2 files changed, 369 insertions(+), 16 deletions(-) create mode 100755 python/tap-migration-stand.py diff --git a/python/qemu/machine/machine.py b/python/qemu/machine/machine.py index ebb58d5b68..fed54f768d 100644 --- a/python/qemu/machine/machine.py +++ b/python/qemu/machine/machine.py @@ -360,6 +360,7 @@ def _pre_launch(self) -> None: # _post_shutdown()! # pylint: disable=consider-using-with self._qemu_log_path = os.path.join(self.log_dir, self._name + ".log") + print(f"Open log file: {self._qemu_log_path}") self._qemu_log_file = open(self._qemu_log_path, 'wb') self._iolog = None @@ -435,17 +436,17 @@ def _post_shutdown(self) -> None: self._user_killed = False self._launched = False - def launch(self) -> None: + def launch(self, do_launch=True, do_post_launch=True) -> None: """ Launch the VM and make sure we cleanup and expose the command line/output in case of exception """ - if self._launched: + if self._launched and do_launch: raise QEMUMachineError('VM already launched') try: - self._launch() + self._launch(do_launch, do_post_launch) except BaseException as exc: # We may have launched the process but it may # have exited before we could connect via QMP. @@ -468,23 +469,26 @@ def launch(self) -> None: # that exception. However, we still want to clean up. raise - def _launch(self) -> None: + def _launch(self, do_launch=True, do_post_launch=True) -> None: """ Launch the VM and establish a QMP connection """ - self._pre_launch() - LOG.debug('VM launch command: %r', ' '.join(self._qemu_full_args)) + if do_launch: + self._pre_launch() + print('VM launch command: %r', ' '.join(self._qemu_full_args)) - # Cleaning up of this subprocess is guaranteed by _do_shutdown. - # pylint: disable=consider-using-with - self._popen = subprocess.Popen(self._qemu_full_args, - stdin=subprocess.DEVNULL, - stdout=self._qemu_log_file, - stderr=subprocess.STDOUT, - shell=False, - close_fds=False) - self._launched = True - self._post_launch() + # Cleaning up of this subprocess is guaranteed by _do_shutdown. + # pylint: disable=consider-using-with + self._popen = subprocess.Popen(self._qemu_full_args, + stdin=subprocess.DEVNULL, + stdout=self._qemu_log_file, + stderr=subprocess.STDOUT, + shell=False, + close_fds=False) + self._launched = True + + if do_post_launch: + self._post_launch() def _close_qmp_connection(self) -> None: """ @@ -732,6 +736,7 @@ def cmd(self, cmd: str, conv_keys = True qmp_args = self._qmp_args(conv_keys, args) + print(cmd, qmp_args) ret = self._qmp.cmd(cmd, **qmp_args) if cmd == 'quit': self._quit_issued = True diff --git a/python/tap-migration-stand.py b/python/tap-migration-stand.py new file mode 100755 index 0000000000..24e0e58e40 --- /dev/null +++ b/python/tap-migration-stand.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +import argparse +import subprocess +import time +from enum import Enum +from typing import Tuple + +from qemu.machine import QEMUMachine + + +IMAGE = "/home/vsementsov/work/vms/newfocal.raw" + + +def run(cmd: str, check: bool = True) -> None: + subprocess.run(cmd, check=check, shell=True) + + +def del_tap(tap: str) -> None: + run(f"sudo ip tuntap del {tap} mode tap multi_queue", check=False) + + +def init_tap(tap: str) -> None: + run(f"sudo ip tuntap add dev {tap} mode tap multi_queue") + run(f"sudo ip link set dev {tap} address e6:1d:44:b5:03:5d") + run(f"sudo ip addr add 10.0.1.1/24 dev {tap}") + run(f"sudo ip link set {tap} up") + + +class MigrationFailed(Exception): + pass + + +class MyVM(QEMUMachine): + class Mode(Enum): + CPR = "cpr" + CPR_NO_TAP = "cpr-no-tap" + NO_TAP = "no-tap" + OPEN_SAME_TAP = "open-same-tap" + OPEN_NEW_TAP = "open-new-tap" + NEW = "new" + + def __init__( + self, + binary: str, + mode: Mode, + incoming: bool = False, + ind: int = 0, + vhost: bool = False, + ): + assert ind in (0, 1) + self.tap_name = f"tap{ind}" if mode == MyVM.Mode.OPEN_NEW_TAP else "tap0" + self.cpr = mode in (MyVM.Mode.CPR, MyVM.Mode.CPR_NO_TAP) + self.no_tap = mode in (MyVM.Mode.NO_TAP, MyVM.Mode.CPR_NO_TAP) + self.mode = mode + self.ind = ind + self.qemu_binary = binary + self.vhost = vhost + self.fds = None + auxshare_str = "-machine aux-ram-share=on" if self.cpr else "" + + additional_args = [] + if incoming: + additional_args = ["-incoming", "defer"] + if self.cpr: + additional_args += [ + "-incoming", + '{"channel-type": "cpr","addr": ' + '{ "transport": "socket","type": "unix", "path": "/tmp/cpr.sock"}}', + ] + + new_traces = "-trace tap_*" if mode == MyVM.Mode.NEW else "" + + super().__init__( + binary=binary, + log_dir="/tmp/logdir/", + name=f"mytest{ind}", + args=f""" + -device pxb-pcie,bus_nr=128,bus=pcie.0,id=pcie.1 + -device pcie-root-port,id=s0,slot=0,bus=pcie.1 + -device pcie-root-port,id=s1,slot=1,bus=pcie.1 + -device pcie-root-port,id=s2,slot=2,bus=pcie.1 + + -hda {IMAGE} + -m 4G -enable-kvm -M q35 -vnc :{ind} -nodefaults -vga std + -qmp stdio + -msg timestamp + -S + -trace migrate_* + -trace migration_cleanup + -trace migration_cancel + -trace handle_qmp_command + -trace monitor_qmp_respond + {new_traces} + -object memory-backend-file,id=ram0,size=4G,mem-path=/dev/shm/ram0,share=on + -machine memory-backend=ram0 {auxshare_str} + """.split() + + additional_args, + ) + + def add_tap_netdev(self, tap, vhost: bool, local_incoming: bool = False): + args = { + "id": "netdev.1", + "vhost": vhost, + "vhostforce": vhost, + "type": "tap", + "ifname": tap, + "script": "no", + "downscript": "no", + "queues": 4, + } + + if self.cpr: + args["cpr"] = True + elif local_incoming: + args["local-incoming"] = True + + self.cmd("netdev_add", args) + + self.cmd( + "device_add", + driver="virtio-net-pci", + romfile="", + id="vnet.1", + netdev="netdev.1", + mq=True, + vectors=18, + bus="s1", + mac="d6:0d:75:f8:0f:b7", + disable_legacy="off", + ) + + def setup_network_first_time(self): + if self.no_tap: + return + + del_tap("tap0") + del_tap("tap1") + assert self.tap_name == "tap0" + init_tap("tap0") + + self.add_tap_netdev(self.tap_name, self.vhost) + + def setup_network_incoming(self, fds=None): + if self.no_tap: + return + + if self.mode == MyVM.Mode.OPEN_NEW_TAP: + run(f"sudo ip tuntap add dev {self.tap_name} mode tap multi_queue") + run(f"sudo ip link set {self.tap_name} up") + tap = self.tap_name + else: + tap = "tap0" + + self.add_tap_netdev( + tap, self.vhost, local_incoming=(self.mode == MyVM.Mode.NEW) + ) + + def pre_start_network_switch(self): + assert self.mode == MyVM.Mode.OPEN_NEW_TAP + + a = time.time() + prev_tap = f"tap{1 - self.ind}" + run(f"sudo ip link set {self.tap_name} down") + run(f"sudo ip link set {prev_tap} down") + run(f"sudo ip addr delete 10.0.1.1/24 dev {prev_tap}") + run(f"sudo ip link set dev {self.tap_name} address e6:1d:44:b5:03:5d") + run(f"sudo ip addr add 10.0.1.1/24 dev {self.tap_name}") + run(f"sudo ip link set {self.tap_name} up") + b = time.time() + print("network switch:", b - a) + + def wait_migration_complete(self) -> bool: + while True: + event = self.event_wait("MIGRATION", timeout=1000) + print("source:", event) + assert event + if event["data"]["status"] == "completed": + return True + if event["data"]["status"] == "failed": + print("MIGRATION FAILED!") + print(self.cmd("query-migrate")) + return False + + def mig_cap(self): + if self.cpr: + self.cmd("migrate-set-parameters", {"mode": "cpr-transfer"}) + cap_list = ["events", "x-ignore-shared"] + if self.mode == MyVM.Mode.NEW: + cap_list.append("local-tap") + caps = [{"capability": c, "state": True} for c in cap_list] + self.cmd("migrate-set-capabilities", {"capabilities": caps}) + + def migrate(self): + self.mig_cap() + if self.cpr: + self.cmd( + "migrate", + { + "channels": [ + { + "channel-type": "main", + "addr": { + "transport": "socket", + "type": "unix", + "path": "/tmp/migr.sock", + }, + }, + { + "channel-type": "cpr", + "addr": { + "transport": "socket", + "type": "unix", + "path": "/tmp/cpr.sock", + }, + }, + ] + }, + ) + else: + self.cmd("migrate", uri="unix:/tmp/migr.sock") + + def live_update(self) -> Tuple["MyVM", float]: + ind = 1 - self.ind + target = MyVM( + binary=self.qemu_binary, + ind=ind, + mode=self.mode, + incoming=True, + vhost=self.vhost, + ) + + if self.cpr: + print("launch target (cpr)") + target.launch(do_post_launch=False) + time.sleep(1) + + print("call migrate on source, will pass fds") + self.migrate() + + print("vm:", self.cmd("query-status"), self.cmd("query-migrate")) + print("post launch and qmp connect to target..") + target.launch(do_launch=False) + else: + print("launch target (usual)") + target.launch() + + target.setup_network_incoming(self.fds) + + target.mig_cap() + + if self.cpr: + freeze_start = time.time() + target.cmd("migrate-incoming", {"uri": "unix:/tmp/migr.sock"}) + else: + target.cmd("migrate-incoming", {"uri": "unix:/tmp/migr.sock"}) + freeze_start = time.time() + self.migrate() + + print("wait migration on source") + if not self.wait_migration_complete(): + target.shutdown() + raise MigrationFailed + + print("wait source STOP") + stop_event = self.event_wait("STOP", timeout=1000) + assert stop_event + print(stop_event) + + print("wait migration on target") + assert target.wait_migration_complete() + + result = self.qmp("query-status") + assert result["return"]["status"] == "postmigrate" + + result = target.qmp("query-status") + assert result["return"]["status"] == "paused" + + if self.mode == MyVM.Mode.OPEN_NEW_TAP: + target.pre_start_network_switch() + + print("target CONT") + target.qmp("cont") + freeze_end = time.time() + + freeze_time = freeze_end - freeze_start + print("freeze-time: ", freeze_time) + + self.shutdown() + + if self.mode == MyVM.Mode.OPEN_NEW_TAP: + del_tap(self.tap_name) + + print(target.cmd("query-version")) + print(target.cmd("query-status")) + return target, freeze_time + + +def main(): + # cleanup previous test runs + run("rm -rf /tmp/logdir", check=False) + run("mkdir /tmp/logdir", check=False) + run("killall qemu-system-x86_64", check=False) + + p = argparse.ArgumentParser() + p.add_argument("--qemu-binary", required=True) + p.add_argument("--vhost", action="store_true") + p.add_argument("--mode", choices=[e.value for e in MyVM.Mode], required=True) + args = p.parse_args() + + print("vhost:", args.vhost) + print("mode:", args.mode) + + vm = MyVM(binary=args.qemu_binary, mode=MyVM.Mode(args.mode), vhost=args.vhost) + vm.launch() + vm.setup_network_first_time() + vm.cmd("cont") + + while True: + cmd = input().strip() + if cmd == "q": + break + + if cmd == "s": + print(vm.cmd("query-status")) + vm.cmd("cont") + continue + + if cmd.startswith("u"): + spl = cmd.split() + assert len(spl) <= 2 + num = int(cmd.split(maxsplit=1)[1]) if len(spl) == 2 else 1 + total_freeze_time = 0 + try: + for i in range(num): + vm, freeze_time = vm.live_update() + print("DONE:", i) + total_freeze_time += freeze_time + except MigrationFailed: + continue + + print(f"avg freeze-time: {total_freeze_time / num}") + continue + + vm.shutdown() + + +if __name__ == "__main__": + main() -- 2.48.1