Module harvester_e2e_tests.integrations.test_4_vm_backup_restore

Functions

def NFS_config(request)
def S3_config(request)
def backup_config(request)
def base_vm(api_client, ssh_keypair, unique_name, vm_checker, image, backup_config)
def base_vm_migrated(api_client, vm_checker, backup_config, base_vm)
def base_vm_with_data(api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker, backup_config, base_vm)
def config_backup_target(api_client, conflict_retries, backup_config, wait_timeout)
def conflict_retries()
def image(api_client, unique_name, wait_timeout, image_opensuse)

Classes

class TestBackupRestore
Expand source code
@pytest.mark.p0
@pytest.mark.backup_target
@pytest.mark.parametrize(
    "backup_config", [
        pytest.param("S3", marks=pytest.mark.S3),
        pytest.param("NFS", marks=pytest.mark.NFS)
    ],
    indirect=True)
class TestBackupRestore:

    @pytest.mark.dependency()
    def test_connection(self, api_client, backup_config, config_backup_target):
        code, data = api_client.settings.backup_target_test_connection()
        assert 200 == code, f'Failed to test backup target connection: {data}'

    @pytest.mark.dependency(depends=["TestBackupRestore::test_connection"], param=True)
    def tests_backup_vm(self, api_client, wait_timeout, backup_config, base_vm_with_data):
        unique_vm_name = base_vm_with_data['name']

        # Create backup with the name as VM's name
        code, data = api_client.vms.backup(unique_vm_name, unique_vm_name)
        assert 204 == code, (code, data)
        # Check backup is ready
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, backup = api_client.backups.get(unique_vm_name)
            if 200 == code and backup.get('status', {}).get('readyToUse'):
                break
            sleep(3)
        else:
            raise AssertionError(
                f'Timed-out waiting for the backup \'{unique_vm_name}\' to be ready.'
            )

    @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True)
    def test_update_backup_by_yaml(
        self, api_client, wait_timeout, backup_config, base_vm_with_data
    ):
        backup_name = base_vm_with_data['name']
        # Get backup as yaml
        req_yaml = dict(Accept='application/yaml')
        resp = api_client.backups.get(backup_name, headers=req_yaml, raw=True)
        assert 200 == resp.status_code, (resp.status_code, resp.text)

        # update annotation
        yaml_header = {'Content-Type': 'application/yaml'}
        customized_annotations = {'test.harvesterhci.io': 'for-test-update'}
        data = yaml.safe_load(resp.text)
        data['metadata'].setdefault('annotations', {}).update(customized_annotations)
        yaml_data = yaml.safe_dump(data)
        code, data = api_client.backups.update(backup_name, yaml_data,
                                               as_json=False, headers=yaml_header)
        assert 200 == code, (code, data)

        # Verify annotation updated
        code, data = api_client.backups.get(backup_name)
        all_updated = all(
            True for key, val in data['metadata']['annotations'].items()
            if customized_annotations.get(key, "") == val
        )
        assert all_updated, f"Failed to update annotations: {customized_annotations!r}"

    @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True)
    def test_restore_with_new_vm(
        self, api_client, vm_shell_from_host, vm_checker, ssh_keypair, wait_timeout,
        backup_config, base_vm_with_data
    ):
        unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data']
        pub_key, pri_key = ssh_keypair

        # mess up the existing data
        with vm_shell_from_host(
            base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'],
            base_vm_with_data['ssh_user'], pkey=pri_key
        ) as sh:
            out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}")
            assert not err, (out, err)
            sh.exec_command('sync')

        # Restore VM into new
        restored_vm_name = f"{backup_config[0].lower()}-restore-{unique_vm_name}"
        spec = api_client.backups.RestoreSpec.for_new(restored_vm_name)
        code, data = api_client.backups.restore(unique_vm_name, spec)
        assert 201 == code, (code, data)
        vm_getable, (code, data) = vm_checker.wait_getable(restored_vm_name)
        assert vm_getable, (code, data)

        # Check VM Started then get IPs (vm and host)
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(restored_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({restored_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')

        # Login to the new VM and check data is existing
        with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh:
            endtime = datetime.now() + timedelta(seconds=wait_timeout)
            while endtime > datetime.now():
                out, err = sh.exec_command('cloud-init status')
                if 'done' in out:
                    break
                sleep(3)
            else:
                raise AssertionError(
                    f"VM {restored_vm_name} Started {wait_timeout} seconds"
                    f", but cloud-init still in {out}"
                )

            out, err = sh.exec_command(f"cat {backup_data['path']}")

        assert backup_data['content'] in out, (
            f"cloud-init writefile failed\n"
            f"Executed stdout: {out}\n"
            f"Executed stderr: {err}"
        )

        # teardown: delete restored vm and volumes
        code, data = api_client.vms.get(restored_vm_name)
        vm_spec = api_client.vms.Spec.from_dict(data)
        api_client.vms.delete(restored_vm_name)
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.vms.get(restored_vm_name)
            if 404 == code:
                break
            sleep(3)
        else:
            raise AssertionError(
                f"Failed to Delete VM({restored_vm_name}) with errors:\n"
                f"Status({code}): {data}"
            )
        for vol in vm_spec.volumes:
            vol_name = vol['volume']['persistentVolumeClaim']['claimName']
            api_client.volumes.delete(vol_name)

    @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True)
    def test_restore_replace_with_delete_vols(
        self, api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker,
        backup_config, base_vm_with_data
    ):
        unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data']
        pub_key, pri_key = ssh_keypair

        # mess up the existing data
        with vm_shell_from_host(
            base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'],
            base_vm_with_data['ssh_user'], pkey=pri_key
        ) as sh:
            out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}")
            assert not err, (out, err)
            sh.exec_command('sync')

        # Stop the VM then restore existing
        vm_stopped, (code, data) = vm_checker.wait_stopped(unique_vm_name)
        assert vm_stopped, (
            f"Failed to Stop VM({unique_vm_name}) with errors:\n"
            f"Status({code}): {data}"
        )

        spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True)
        code, data = api_client.backups.restore(unique_vm_name, spec)
        assert 201 == code, f'Failed to restore backup with current VM replaced, {data}'

        vm_running, (code, data) = vm_checker.wait_status_running(unique_vm_name)
        assert vm_running, (
            f"Failed to restore VM({unique_vm_name}) with errors:\n"
            f"Status({code}): {data}"
        )

        # Check VM Started then get IPs (vm and host)
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')
        base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'] = host_ip, vm_ip

        # Login to the new VM and check data is existing
        with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh:
            cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh)
            assert cloud_inited, (
                f"VM {unique_vm_name} Started {wait_timeout} seconds"
                f", but cloud-init still in {out}"
            )
            out, err = sh.exec_command(f"cat {backup_data['path']}")

        assert backup_data['content'] in out, (
            f"cloud-init writefile failed\n"
            f"Executed stdout: {out}\n"
            f"Executed stderr: {err}"
        )

    @pytest.mark.negative
    @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True)
    def test_restore_replace_vm_not_stop(self, api_client, backup_config, base_vm_with_data):
        spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True)
        code, data = api_client.backups.restore(base_vm_with_data['name'], spec)

        assert 422 == code, (code, data)

    @pytest.mark.negative
    @pytest.mark.skip_version_if('< v1.1.2', '< v1.2.1')
    @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True)
    def test_restore_with_invalid_name(self, api_client, backup_config, base_vm_with_data):
        # RFC1123 DNS Subdomain name rules:
        # 1. contain no more than 253 characters
        # 2. contain only lowercase alphanumeric characters, '-' or '.'
        # 3. start with an alphanumeric character
        # 4. end with an alphanumeric character

        unique_vm_name = base_vm_with_data['name']

        # Case 1: longer than 253 chars
        invalid_name = 'a' * 254
        spec = api_client.backups.RestoreSpec.for_new(invalid_name)
        code, data = api_client.backups.restore(unique_vm_name, spec)
        assert 422 == code, (code, data)

        # Case 2: having upper case
        invalid_name = 'the.name.IS.invalid'
        spec = api_client.backups.RestoreSpec.for_new(invalid_name)
        code, data = api_client.backups.restore(unique_vm_name, spec)
        assert 422 == code, (code, data)

        # Case 3: Not start with an alphanumeric character
        invalid_name = '-the.name.is.invalid'
        spec = api_client.backups.RestoreSpec.for_new(invalid_name)
        code, data = api_client.backups.restore(unique_vm_name, spec)
        assert 422 == code, (code, data)

        # Case 4: Not end with an alphanumeric character
        invalid_name = 'the.name.is.invalid.'
        spec = api_client.backups.RestoreSpec.for_new(invalid_name)
        code, data = api_client.backups.restore(unique_vm_name, spec)
        assert 422 == code, (code, data)

    @pytest.mark.skip_version_if('< v1.2.2')
    @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True)
    def test_restore_replace_with_vm_shutdown_command(
        self, api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker,
        backup_config, base_vm_with_data
    ):
        ''' ref: https://github.com/harvester/tests/issues/943
        1. Create VM and write some data
        2. Take backup for the VM
        3. Mess up existing data
        3. Shutdown the VM by executing `shutdown` command in OS
        4. Restore backup to replace existing VM
        5. VM should be restored successfully
        6. Data in VM should be the same as backed up
        '''

        unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data']
        pub_key, pri_key = ssh_keypair

        # mess up the existing data then shutdown it
        with vm_shell_from_host(
            base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'],
            base_vm_with_data['ssh_user'], pkey=pri_key
        ) as sh:
            out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}")
            assert not err, (out, err)
            sh.exec_command('sync')
            sh.exec_command('sudo shutdown now')

        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.vms.get(unique_vm_name)
            if 200 == code and "Stopped" == data.get('status', {}).get('printableStatus'):
                break
            sleep(5)
        else:
            raise AssertionError(
                f"Failed to shut down VM({unique_vm_name}) with errors:\n"
                f"Status({code}): {data}"
            )

        # restore VM to existing
        spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True)
        code, data = api_client.backups.restore(unique_vm_name, spec)
        assert 201 == code, f'Failed to restore backup with current VM replaced, {data}'
        vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name)
        assert vm_getable, (code, data)

        # Check VM Started then get IPs (vm and host)
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')

        # Login to the new VM and check data is existing
        with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh:
            cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh)
            assert cloud_inited, (
                f"VM {unique_vm_name} Started {wait_timeout} seconds"
                f", but cloud-init still in {out}"
            )
            out, err = sh.exec_command(f"cat {backup_data['path']}")

        assert backup_data['content'] in out, (
            f"cloud-init writefile failed\n"
            f"Executed stdout: {out}\n"
            f"Executed stderr: {err}"
        )

Class variables

var pytestmark

Methods

def test_connection(self, api_client, backup_config, config_backup_target)
def test_restore_replace_vm_not_stop(self, api_client, backup_config, base_vm_with_data)
def test_restore_replace_with_delete_vols(self, api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker, backup_config, base_vm_with_data)
def test_restore_replace_with_vm_shutdown_command(self, api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker, backup_config, base_vm_with_data)

ref: https://github.com/harvester/tests/issues/943 1. Create VM and write some data 2. Take backup for the VM 3. Mess up existing data 3. Shutdown the VM by executing shutdown command in OS 4. Restore backup to replace existing VM 5. VM should be restored successfully 6. Data in VM should be the same as backed up

def test_restore_with_invalid_name(self, api_client, backup_config, base_vm_with_data)
def test_restore_with_new_vm(self, api_client, vm_shell_from_host, vm_checker, ssh_keypair, wait_timeout, backup_config, base_vm_with_data)
def test_update_backup_by_yaml(self, api_client, wait_timeout, backup_config, base_vm_with_data)
def tests_backup_vm(self, api_client, wait_timeout, backup_config, base_vm_with_data)
class TestBackupRestoreOnMigration
Expand source code
@pytest.mark.skip("https://github.com/harvester/harvester/issues/1473")
@pytest.mark.p0
@pytest.mark.backup_target
@pytest.mark.parametrize(
    "backup_config", [
        pytest.param("S3", marks=pytest.mark.S3),
        pytest.param("NFS", marks=pytest.mark.NFS)
    ],
    indirect=True
)
class TestBackupRestoreOnMigration:
    @pytest.mark.dependency(param=True)
    def test_backup_migrated_vm(
        self, api_client, wait_timeout, backup_config, config_backup_target,
        base_vm_migrated, base_vm_with_data
    ):
        unique_vm_name = base_vm_with_data['name']

        # Create backup with the name as VM's name
        code, data = api_client.vms.backup(unique_vm_name, unique_vm_name)
        assert 204 == code, (code, data)
        # Check backup is ready
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, backup = api_client.backups.get(unique_vm_name)
            if 200 == code and backup.get('status', {}).get('readyToUse'):
                break
            sleep(3)
        else:
            raise AssertionError(
                f'Timed-out waiting for the backup \'{unique_vm_name}\' to be ready.'
            )

    @pytest.mark.dependency(
        depends=["TestBackupRestoreOnMigration::test_backup_migrated_vm"],
        param=True
    )
    def test_restore_replace_migrated_vm(
        self, api_client, wait_timeout, ssh_keypair, vm_shell_from_host, vm_checker, backup_config,
        base_vm_migrated, base_vm_with_data
    ):
        unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data']
        pub_key, pri_key = ssh_keypair

        # mess up the existing data
        with vm_shell_from_host(
            base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'],
            base_vm_with_data['ssh_user'], pkey=pri_key
        ) as sh:
            out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}")
            assert not err, (out, err)
            sh.exec_command('sync')

        # Stop the VM then restore existing
        vm_stopped, (code, data) = vm_checker.wait_stopped(unique_vm_name)
        assert vm_stopped, (
            f"Failed to Stop VM({unique_vm_name}) with errors:\n"
            f"Status({code}): {data}"
        )

        spec = api_client.backups.RestoreSpec.for_existing()
        code, data = api_client.backups.restore(unique_vm_name, spec)
        assert 201 == code, f'Failed to restore backup with current VM replaced, {data}'
        vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name)
        assert vm_getable, (code, data)

        # Check VM Started
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )

        # Check VM is not hosting on the migrated node
        host = data['status']['nodeName']
        original_host, migrated_host = base_vm_migrated

        assert host == migrated_host, (
            f"Restored VM is not hosted on {migrated_host} but {host},"
            f" the VM was initialized hosted on {original_host}"
        )

        # Get IP of VM and host
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')

        # Login to the new VM and check data is existing
        with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh:
            cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh)
            assert cloud_inited, (
                f"VM {unique_vm_name} Started {vm_checker.wait_timeout} seconds"
                f", but cloud-init still in {out}"
            )
            out, err = sh.exec_command(f"cat {backup_data['path']}")

        assert backup_data['content'] in out, (
            f"cloud-init writefile failed\n"
            f"Executed stdout: {out}\n"
            f"Executed stderr: {err}"
        )

Class variables

var pytestmark

Methods

def test_backup_migrated_vm(self, api_client, wait_timeout, backup_config, config_backup_target, base_vm_migrated, base_vm_with_data)
def test_restore_replace_migrated_vm(self, api_client, wait_timeout, ssh_keypair, vm_shell_from_host, vm_checker, backup_config, base_vm_migrated, base_vm_with_data)
class TestMultipleBackupRestore
Expand source code
@pytest.mark.p1
@pytest.mark.backup_target
@pytest.mark.parametrize(
    "backup_config", [
        pytest.param("S3", marks=pytest.mark.S3),
        pytest.param("NFS", marks=pytest.mark.NFS)
    ],
    indirect=True)
class TestMultipleBackupRestore:
    @pytest.mark.dependency()
    def test_backup_multiple(
        self, api_client, wait_timeout, host_shell, vm_shell, vm_checker, ssh_keypair,
        backup_config, config_backup_target, base_vm_with_data
    ):
        def write_data(content):
            pub_key, pri_key = ssh_keypair
            # Log into VM to make some data
            with host_shell.login(host_ip, jumphost=True) as h:
                vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key)
                endtime = datetime.now() + timedelta(seconds=wait_timeout)
                while endtime > datetime.now():
                    try:
                        vm_sh.connect(vm_ip, jumphost=h.client)
                    except ChannelException as e:
                        login_ex = e
                        sleep(3)
                    else:
                        break
                else:
                    raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex

                with vm_sh as sh:
                    endtime = datetime.now() + timedelta(seconds=wait_timeout)
                    while endtime > datetime.now():
                        out, err = sh.exec_command('cloud-init status')
                        if 'done' in out:
                            break
                        sleep(3)
                    else:
                        raise AssertionError(
                            f"VM {unique_vm_name} Started {wait_timeout} seconds"
                            f", but cloud-init still in {out}"
                        )
                    out, err = sh.exec_command(f'echo {content!r} >> ~/vmname')
                    assert not err, (out, err)
                    sh.exec_command('sync')

        def create_backup(vm_name, backup_name):
            code, data = api_client.vms.backup(vm_name, backup_name)
            assert 204 == code, (code, data)
            # Check backup is ready
            endtime = datetime.now() + timedelta(seconds=wait_timeout)
            while endtime > datetime.now():
                code, backup = api_client.backups.get(backup_name)
                if 200 == code and backup.get('status', {}).get('readyToUse'):
                    break
                sleep(3)
            else:
                raise AssertionError(
                    f'Timed-out waiting for the backup \'{backup_name}\' to be ready.'
                )

        unique_vm_name = base_vm_with_data['name']
        # Check VM started and get IPs (vm and host)
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')

        content = ""
        # Create multiple backups
        for idx in range(0, 5):
            backup_name = f"{idx}-{unique_vm_name}"
            write_data(backup_name)
            create_backup(unique_vm_name, backup_name)
            content += f"{backup_name}\n"
            base_vm_with_data['data'].setdefault('backups', []).append((backup_name, content))

    @pytest.mark.dependency(
        depends=["TestMultipleBackupRestore::test_backup_multiple"], param=True
    )
    def test_delete_first_backup(
        self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout,
        backup_config, config_backup_target, base_vm_with_data
    ):
        unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data']
        pub_key, pri_key = ssh_keypair

        backups = backup_data['backups']
        (first_backup, content), *backup_data['backups'] = backups
        latest_backup = backups[-1][0]

        # Delete first backup
        code, data = api_client.backups.delete(first_backup)
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.backups.get(first_backup)
            if 404 == code:
                break
            sleep(3)
        else:
            raise AssertionError(
                f"Failed to delete backup {first_backup}\n"
                f"API Status({code}): {data}"
            )

        # Stop the VM
        code, data = api_client.vms.stop(unique_vm_name)
        assert 204 == code, "`Stop` return unexpected status code"
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.vms.get_status(unique_vm_name)
            if 404 == code:
                break
            sleep(3)
        else:
            raise AssertionError(
                f"Failed to Stop VM({unique_vm_name}) with errors:\n"
                f"Status({code}): {data}"
            )

        spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True)
        code, data = api_client.backups.restore(latest_backup, spec)
        assert 201 == code, f'Failed to restore backup with current VM replaced, {data}'
        vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name)
        assert vm_getable, (code, data)

        # Check VM Started then get IPs (vm and host)
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')

        # Login to the new VM and check data is existing
        with host_shell.login(host_ip, jumphost=True) as h:
            vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key)
            endtime = datetime.now() + timedelta(seconds=wait_timeout)
            while endtime > datetime.now():
                try:
                    vm_sh.connect(vm_ip, jumphost=h.client)
                except ChannelException as e:
                    login_ex = e
                    sleep(3)
                else:
                    break
            else:
                raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex

            with vm_sh as sh:
                endtime = datetime.now() + timedelta(seconds=wait_timeout)
                while endtime > datetime.now():
                    out, err = sh.exec_command('cloud-init status')
                    if 'done' in out:
                        break
                    sleep(3)
                else:
                    raise AssertionError(
                        f"VM {unique_vm_name} Started {wait_timeout} seconds"
                        f", but cloud-init still in {out}"
                    )

                out, err = sh.exec_command(f"cat {backup_data['path']}")
            assert content in out, (
                f"cloud-init writefile failed\n"
                f"Executed stdout: {out}\n"
                f"Executed stderr: {err}"
            )

    @pytest.mark.dependency(
        depends=["TestMultipleBackupRestore::test_backup_multiple"], param=True
    )
    def test_delete_last_backup(
        self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout,
        backup_config, config_backup_target, base_vm_with_data
    ):
        unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data']
        pub_key, pri_key = ssh_keypair

        *backups, (latest_backup, content), (last_backup, _) = backup_data['backups']
        backup_data['backups'] = backup_data['backups'][:-1]

        # Delete first backup
        code, data = api_client.backups.delete(last_backup)
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.backups.get(last_backup)
            if 404 == code:
                break
            sleep(3)
        else:
            raise AssertionError(
                f"Failed to delete backup {last_backup}\n"
                f"API Status({code}): {data}"
            )

        # Stop the VM
        code, data = api_client.vms.stop(unique_vm_name)
        assert 204 == code, "`Stop` return unexpected status code"
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.vms.get_status(unique_vm_name)
            if 404 == code:
                break
            sleep(3)
        else:
            raise AssertionError(
                f"Failed to Stop VM({unique_vm_name}) with errors:\n"
                f"Status({code}): {data}"
            )

        spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True)
        code, data = api_client.backups.restore(latest_backup, spec)
        assert 201 == code, f'Failed to restore backup with current VM replaced, {data}'
        vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name)
        assert vm_getable, (code, data)

        # Check VM Started then get IPs (vm and host)
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')

        # Login to the new VM and check data is existing
        with host_shell.login(host_ip, jumphost=True) as h:
            vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key)
            endtime = datetime.now() + timedelta(seconds=wait_timeout)
            while endtime > datetime.now():
                try:
                    vm_sh.connect(vm_ip, jumphost=h.client)
                except ChannelException as e:
                    login_ex = e
                    sleep(3)
                else:
                    break
            else:
                raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex

            with vm_sh as sh:
                endtime = datetime.now() + timedelta(seconds=wait_timeout)
                while endtime > datetime.now():
                    out, err = sh.exec_command('cloud-init status')
                    if 'done' in out:
                        break
                    sleep(3)
                else:
                    raise AssertionError(
                        f"VM {unique_vm_name} Started {wait_timeout} seconds"
                        f", but cloud-init still in {out}"
                    )

                out, err = sh.exec_command(f"cat {backup_data['path']}")
            assert content in out, (
                f"cloud-init writefile failed\n"
                f"Executed stdout: {out}\n"
                f"Executed stderr: {err}"
            )

    @pytest.mark.dependency(
        depends=["TestMultipleBackupRestore::test_backup_multiple"], param=True
    )
    def test_delete_middle_backup(
        self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout,
        backup_config, config_backup_target, base_vm_with_data
    ):
        unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data']
        pub_key, pri_key = ssh_keypair

        *backups, (middle_backup, _), (latest_backup, content) = backup_data['backups']
        backup_data['backups'] = backups + [(latest_backup, content)]

        # Delete second last backup
        code, data = api_client.backups.delete(middle_backup)
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.backups.get(middle_backup)
            if 404 == code:
                break
            sleep(3)
        else:
            raise AssertionError(
                f"Failed to delete backup {middle_backup}\n"
                f"API Status({code}): {data}"
            )

        # Stop the VM
        code, data = api_client.vms.stop(unique_vm_name)
        assert 204 == code, "`Stop` return unexpected status code"
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.vms.get_status(unique_vm_name)
            if 404 == code:
                break
            sleep(3)
        else:
            raise AssertionError(
                f"Failed to Stop VM({unique_vm_name}) with errors:\n"
                f"Status({code}): {data}"
            )

        spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True)
        code, data = api_client.backups.restore(latest_backup, spec)
        assert 201 == code, f'Failed to restore backup with current VM replaced, {data}'
        vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name)
        assert vm_getable, (code, data)

        # Check VM Started then get IPs (vm and host)
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')

        # Login to the new VM and check data is existing
        with host_shell.login(host_ip, jumphost=True) as h:
            vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key)
            endtime = datetime.now() + timedelta(seconds=wait_timeout)
            while endtime > datetime.now():
                try:
                    vm_sh.connect(vm_ip, jumphost=h.client)
                except ChannelException as e:
                    login_ex = e
                    sleep(3)
                else:
                    break
            else:
                raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex

            with vm_sh as sh:
                endtime = datetime.now() + timedelta(seconds=wait_timeout)
                while endtime > datetime.now():
                    out, err = sh.exec_command('cloud-init status')
                    if 'done' in out:
                        break
                    sleep(3)
                else:
                    raise AssertionError(
                        f"VM {unique_vm_name} Started {wait_timeout} seconds"
                        f", but cloud-init still in {out}"
                    )

                out, err = sh.exec_command(f"cat {backup_data['path']}")
            assert content in out, (
                f"cloud-init writefile failed\n"
                f"Executed stdout: {out}\n"
                f"Executed stderr: {err}"
            )

Class variables

var pytestmark

Methods

def test_backup_multiple(self, api_client, wait_timeout, host_shell, vm_shell, vm_checker, ssh_keypair, backup_config, config_backup_target, base_vm_with_data)
def test_delete_first_backup(self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout, backup_config, config_backup_target, base_vm_with_data)
def test_delete_last_backup(self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout, backup_config, config_backup_target, base_vm_with_data)
def test_delete_middle_backup(self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout, backup_config, config_backup_target, base_vm_with_data)