Module harvester_e2e_tests.integrations.test_4_vm_snapshot

Functions

def create_vm(name, api_client, ssh_keypair, image, timeout_secs)
Expand source code
def create_vm(name, api_client, ssh_keypair, image, timeout_secs):
    cpu = 1
    mem = 2
    pubkey, _ = ssh_keypair

    vm_spec = api_client.vms.Spec(cpu, mem)
    vm_spec.add_image("disk-0", image["id"])

    userdata = yaml.safe_load(vm_spec.user_data)
    userdata["ssh_authorized_keys"] = [pubkey]
    vm_spec.user_data = yaml.dump(userdata)

    _, data = api_client.vms.create(name, vm_spec)
    deadline = datetime.now() + timedelta(seconds=timeout_secs)
    while deadline > datetime.now():
        _, data = api_client.vms.get(name)
        if "Running" == data.get("status", {}).get("printableStatus"):
            break
        sleep(1)
    else:
        raise AssertionError(f"timed out waiting for {name} to transition to Running")

    return name, image["user"]
def delete_vm(name, api_client, timeout_secs)
Expand source code
def delete_vm(name, api_client, timeout_secs):
    code, data = api_client.vms.get(name)
    if code == 404:
        return

    vm_spec = api_client.vms.Spec.from_dict(data)

    api_client.vms.delete(name)
    deadline = datetime.now() + timedelta(seconds=timeout_secs)
    while deadline > datetime.now():
        code, data = api_client.vms.get_status(name)
        if 404 == code:
            break
        sleep(3)

    for vol in vm_spec.volumes:
        vol_name = vol["volume"]["persistentVolumeClaim"]["claimName"]
        api_client.volumes.delete(vol_name)
def image(api_client, image_opensuse, wait_timeout)
Expand source code
@pytest.fixture(scope="module")
def image(api_client, image_opensuse, wait_timeout):
    unique_image_id = unique_name("image")
    display_name = f"{unique_image_id}-{image_opensuse.name}"
    code, data = api_client.images.create_by_url(unique_image_id,
                                                 image_opensuse.url,
                                                 display_name=display_name)

    assert 201 == code, (code, data)

    deadline = datetime.now() + timedelta(seconds=wait_timeout)
    while deadline > datetime.now():
        code, data = api_client.images.get(unique_image_id)
        if 100 == data.get('status', {}).get('progress', 0):
            break
        sleep(3)
    else:
        raise AssertionError(
            "Failed to create Image with error:\n"
            f"Status({code}): {data}"
        )

    yield dict(id=f"{data['metadata']['namespace']}/{unique_image_id}",
               user=image_opensuse.ssh_user)

    code, data = api_client.images.delete(unique_image_id)
def restored_from_snapshot_name()
Expand source code
@pytest.fixture(scope="class")
def restored_from_snapshot_name():
    return unique_name("vm-from-snapshot")
def restored_from_snapshot_name_2()
Expand source code
@pytest.fixture(scope="class")
def restored_from_snapshot_name_2():
    return unique_name("vm-from-snapshot-2")
def restored_from_snapshot_vm(api_client,
restored_from_snapshot_name,
vm_snapshot_name,
source_vm,
host_shell,
vm_shell,
ssh_keypair,
wait_timeout)
Expand source code
@pytest.fixture(scope="class")
def restored_from_snapshot_vm(api_client, restored_from_snapshot_name,
                              vm_snapshot_name, source_vm, host_shell,
                              vm_shell, ssh_keypair, wait_timeout):
    name, ssh_user = source_vm
    start_vm(name, api_client, wait_timeout)

    def modify(sh):
        _, _ = sh.exec_command("echo 5678 > test.txt && sync")

    vm_shell_do(name, api_client,
                host_shell, vm_shell,
                ssh_user, ssh_keypair,
                modify, wait_timeout)

    # Just to wait for `sync`
    sleep(2)

    stop_vm(name, api_client, wait_timeout)

    spec = api_client.vm_snapshots.RestoreSpec.for_new(restored_from_snapshot_name)
    code, data = api_client.vm_snapshots.restore(vm_snapshot_name, spec)
    assert 201 == code

    deadline = datetime.now() + timedelta(seconds=wait_timeout)
    while deadline > datetime.now():
        code, data = api_client.vms.get(restored_from_snapshot_name)
        if 200 == code and "Running" == data.get("status", {}).get("printableStatus"):
            break
        print("waiting for restored vm to be running")
        sleep(3)
    else:
        raise AssertionError(f"timed out waiting to restore into new VM"
                             f"{restored_from_snapshot_name}")

    yield restored_from_snapshot_name, ssh_user

    delete_vm(restored_from_snapshot_name, api_client, wait_timeout)
def restored_vm_2(api_client,
restored_from_snapshot_name_2,
vm_snapshot_name,
source_vm,
host_shell,
vm_shell,
ssh_keypair,
wait_timeout)
Expand source code
@pytest.fixture(scope="class")
def restored_vm_2(api_client, restored_from_snapshot_name_2,
                  vm_snapshot_name, source_vm,
                  host_shell, vm_shell,
                  ssh_keypair, wait_timeout):
    name, ssh_user = source_vm

    start_vm(name, api_client, wait_timeout)

    def modify(sh):
        _, _ = sh.exec_command("echo 99999999 > test.txt && sync")

    vm_shell_do(name, api_client,
                host_shell, vm_shell,
                ssh_user, ssh_keypair,
                modify, wait_timeout)

    # Just to wait for `sync`
    sleep(2)

    stop_vm(name, api_client, wait_timeout)

    spec = api_client.vm_snapshots.RestoreSpec.for_new(restored_from_snapshot_name_2)
    code, data = api_client.vm_snapshots.restore(vm_snapshot_name, spec)
    assert 201 == code

    deadline = datetime.now() + timedelta(seconds=wait_timeout)
    while deadline > datetime.now():
        code, data = api_client.vms.get(restored_from_snapshot_name_2)
        if 200 == code and "Running" == data.get("status", {}).get("printableStatus"):
            break
        print("waiting for restored vm to be running")
        sleep(3)
    else:
        raise AssertionError(f"timed out waiting to restore into new VM"
                             f"{restored_from_snapshot_name_2}")

    yield restored_from_snapshot_name_2, ssh_user

    delete_vm(restored_from_snapshot_name_2, api_client, wait_timeout)
def source_vm(sourcevm_name, api_client, ssh_keypair, image, wait_timeout)
Expand source code
@pytest.fixture(scope="class")
def source_vm(sourcevm_name, api_client, ssh_keypair, image, wait_timeout):
    yield create_vm(sourcevm_name, api_client, ssh_keypair, image, wait_timeout)
    delete_vm(sourcevm_name, api_client, wait_timeout)
def sourcevm_name()
Expand source code
@pytest.fixture(scope="class")
def sourcevm_name():
    return unique_name("source-vm")
def start_vm(name, api_client, timeout_secs)
Expand source code
def start_vm(name, api_client, timeout_secs):
    _, data = api_client.vms.start(name)
    deadline = datetime.now() + timedelta(seconds=timeout_secs)
    while deadline > datetime.now():
        _, data = api_client.vms.get(name)
        status = data.get("status", {}).get("printableStatus")
        if "Running" == status:
            return
        sleep(1)

    raise AssertionError(f"timed out trying to start {name}")
def stop_vm(name, api_client, timeout_secs)
Expand source code
def stop_vm(name, api_client, timeout_secs):
    _, data = api_client.vms.stop(name)
    deadline = datetime.now() + timedelta(seconds=timeout_secs)
    while deadline > datetime.now():
        _, data = api_client.vms.get(name)
        status = data.get("status", {}).get("printableStatus")
        if "Stopped" == status:
            return
        sleep(1)

    raise AssertionError(f"timed out trying to stop {name}")
def unique_name(name)
Expand source code
def unique_name(name):
    return f"{datetime.now().strftime('%m%S%f')}-{name}"
def vm_shell_do(name, api_client, host_shell, vm_shell, user, ssh_keypair, action, wait_timeout)
Expand source code
def vm_shell_do(name, api_client, host_shell, vm_shell, user, ssh_keypair, action, wait_timeout):
    _, privatekey = ssh_keypair

    deadline = datetime.now() + timedelta(seconds=wait_timeout)
    while deadline > datetime.now():
        code, data = api_client.vms.get_status(name)
        if 200 == code:
            phase = data.get("status", {}).get("phase")
            conds = data.get("status", {}).get("conditions", [{}])
            if ("Running" == phase
                    and "AgentConnected" == conds[-1].get("type")
                    and data["status"].get("interfaces")):
                break
            sleep(3)

        vm_ip = next(iface["ipAddress"] for iface in data["status"]["interfaces"]
                     if iface["name"] == "default")

        code, data = api_client.hosts.get(data["status"]["nodeName"])
        host_ip = next(addr["address"] for addr in data["status"]["addresses"]
                       if addr["type"] == "InternalIP")

        with host_shell.login(host_ip, jumphost=True) as h:
            vm_sh = vm_shell(user, pkey=privatekey)

            deadline = datetime.now() + timedelta(seconds=wait_timeout)
            while deadline > datetime.now():
                try:
                    vm_sh.connect(vm_ip, jumphost=h.client)
                except ChannelException as e:
                    print(e)
                    sleep(3)
                else:
                    break
            else:
                raise AssertionError(f"Unable to login to {name}")

            with vm_sh as sh:
                action(sh)
def vm_snapshot_2_name()
Expand source code
@pytest.fixture(scope="class")
def vm_snapshot_2_name():
    return "vm-snapshot-2"
def vm_snapshot_name()
Expand source code
@pytest.fixture(scope="class")
def vm_snapshot_name():
    return "vm-snapshot"

Classes

class TestVMSnapshot
Expand source code
@pytest.mark.p0
@pytest.mark.virtualmachines
class TestVMSnapshot:
    @pytest.mark.dependency(name="source_vm_snapshot")
    def test_vm_snapshot_create(self, api_client,
                                source_vm, vm_snapshot_name,
                                host_shell, vm_shell,
                                ssh_keypair, wait_timeout):
        """
        Test that the VM snapshot can be created.

        Prerequisite:
        A virtual machine has been created and is running.
        """
        name, ssh_user = source_vm

        def action(sh):
            _, _ = sh.exec_command("echo 123 > test.txt")
            _, _ = sh.exec_command("sync")

        vm_shell_do(name, api_client,
                    host_shell, vm_shell,
                    ssh_user, ssh_keypair,
                    action, wait_timeout)

        # Since `sync` isn't actually synchronous, wait a couple of
        # seconds to let the I/O flush to disk.
        sleep(2)

        code, _ = api_client.vm_snapshots.create(name, vm_snapshot_name)
        assert 201 == code

        deadline = datetime.now() + timedelta(seconds=wait_timeout)
        while deadline > datetime.now():
            code, data = api_client.vm_snapshots.get(vm_snapshot_name)
            if data.get("status", {}).get("readyToUse"):
                break
            print(f"waiting for {vm_snapshot_name} to be ready")
            sleep(3)
        else:
            raise AssertionError(f"timed out waiting for {vm_snapshot_name} to be ready")

        assert 200 == code
        assert data.get("status", {}).get("readyToUse") is True

    @pytest.mark.dependency(depends=["source_vm_snapshot"])
    def test_volume_snapshot_not_exist(self, api_client):
        ''' ref: https://github.com/harvester/tests/issues/524 '''
        code, data = api_client.vol_snapshots.get()

        assert 200 == code, (code, data)
        assert not data['data'], (code, data)

    @pytest.mark.dependency(depends=["source_vm_snapshot"])
    def test_restore_into_new_vm_from_vm_snapshot(self, api_client,
                                                  restored_from_snapshot_vm,
                                                  ssh_keypair, host_shell,
                                                  vm_shell, wait_timeout):
        """
        Test that restoring the `vm-snapshot` into a new virtual
        machine results in a virtual machine with the expected
        well-known file (`test.txt`) with the expected contents
        (`123`).

        Prerequisites:
        1. The source VM from the first test case and its
           snapshot (`vm-snapshot`).
        """

        name, ssh_user = restored_from_snapshot_vm

        def actassert(sh):
            out, _ = sh.exec_command("cat test.txt")
            assert "123" in out

        vm_shell_do(name, api_client,
                    host_shell, vm_shell,
                    ssh_user, ssh_keypair,
                    actassert, wait_timeout)

    @pytest.mark.dependency(depends=["source_vm_snapshot"])
    def test_replace_is_rejected_when_deletepolicy_is_retain(self, api_client,
                                                             source_vm, vm_snapshot_name,
                                                             wait_timeout):
        name, _ = source_vm

        stop_vm(name, api_client, wait_timeout)

        """
        Test that the Harvester API rejects a `replace`
        VirtualMachineRestore where the deletePolicy is
        not `retain`.

        Prequisites:
        1. The original VM (`source-vm`) and snapshot (`vm-snapshot`)
        from the first test case.
        """
        spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True)
        code, data = api_client.vm_snapshots.restore(vm_snapshot_name, spec)

        reason = data.get("message")

        wantmsg = "Delete policy with backup type snapshot"
        " for replacing VM is not supported"

        assert wantmsg in reason
        assert 422 == code

    @pytest.mark.dependency(name="replaced_source_vm", depends=["source_vm_snapshot"])
    def test_replace_vm_with_vm_snapshot(self, api_client,
                                         source_vm, vm_snapshot_name,
                                         ssh_keypair, host_shell,
                                         vm_shell, wait_timeout):
        """
        Test that the original virtual machine can be replaced
        from its original snapshot (`vm-snapshot`) and that
        the snapshot's data contains the well-known file (`test.txt`)
        and its expected contents (`123`).

        Prerequisites:
        `vm-snapshot` VM snapshot exists.
        """
        name, ssh_user = source_vm
        start_vm(name, api_client, wait_timeout)

        def modify(sh):
            _, _ = sh.exec_command("rm -f test.txt && sync")

        vm_shell_do(name, api_client,
                    host_shell, vm_shell,
                    ssh_user, ssh_keypair,
                    modify, wait_timeout)

        # Just to wait for `sync`
        sleep(2)

        stop_vm(name, api_client, wait_timeout)

        spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=False)
        code, data = api_client.vm_snapshots.restore(vm_snapshot_name, spec)

        deadline = datetime.now() + timedelta(seconds=wait_timeout)
        while deadline > datetime.now():
            code, data = api_client.vms.get(name)
            if 200 == code and "Running" == data.get("status", {}).get("printableStatus"):
                break
            print("waiting for restored vm to be running")
            sleep(3)
        assert "Running" == data.get("status", {}).get("printableStatus")

        def actassert(sh):
            out, _ = sh.exec_command("cat test.txt")
            assert "123" in out

        vm_shell_do(name, api_client,
                    host_shell, vm_shell,
                    ssh_user, ssh_keypair,
                    actassert, wait_timeout)

    @pytest.mark.dependency(name="detached_source_vm_pvc", depends=["replaced_source_vm"])
    def test_restore_from_vm_snapshot_while_pvc_detached_from_source(self,
                                                                     api_client,
                                                                     restored_vm_2,
                                                                     host_shell,
                                                                     vm_shell,
                                                                     ssh_keypair,
                                                                     wait_timeout):
        """
        Test that a new virtual machine can be created from a
        VM snapshot created from a source PersistentVolumeClaim
        that is now detached.

        Prerequisites:
        The original VM (`source-vm`) exists and is stopped (so that
        the PVC is detached.)

        The original snapshot (`vm-snapshot`) exists.
        """

        name, ssh_user = restored_vm_2

        def actassert(sh):
            out, _ = sh.exec_command("cat test.txt")
            assert "123" in out

        vm_shell_do(name, api_client,
                    host_shell, vm_shell,
                    ssh_user, ssh_keypair,
                    actassert, wait_timeout)

    @pytest.mark.dependency(name="snapshot_created_from_detached_source_vm_pvc",
                            depends=["detached_source_vm_pvc"])
    def test_create_vm_snapshot_while_pvc_detached(self, api_client,
                                                   vm_snapshot_2_name, source_vm, wait_timeout):
        """
        Test that a VM snapshot can be created when the source
        PVC is detached.

        Prerequisites:
        The original VM (`source-vm`) exists and is stopped (so that
        the PVC is detached.)
        """
        name, _ = source_vm

        stop_vm(name, api_client, wait_timeout)

        code, _ = api_client.vm_snapshots.create(name, vm_snapshot_2_name)
        assert 201 == code

        deadline = datetime.now() + timedelta(seconds=wait_timeout)
        while deadline > datetime.now():
            code, data = api_client.vm_snapshots.get(vm_snapshot_2_name)
            if data.get("status", {}).get("readyToUse"):
                break
            print(f"waiting for {vm_snapshot_2_name} to be ready")
            sleep(3)
        else:
            raise AssertionError(f"timed out waiting for {vm_snapshot_2_name} to be ready")

        code, data = api_client.vm_snapshots.get(vm_snapshot_2_name)

        assert 200 == code
        assert data.get("status", {}).get("readyToUse") is True

    @pytest.mark.dependency(name="cleaned_up_after_vm_delete")
    def test_vm_snapshots_are_cleaned_up_after_source_vm_deleted(self, api_client,
                                                                 source_vm, vm_snapshot_name,
                                                                 wait_timeout):
        """
        Test that VM snapshots are removed when the VM they correspond
        to have been deleted.

        Prerequisites:
        The original VM (`source-vm`) exists and so does its first
        snapshot (`vm-snapshot`).

        Assert that the snapshot exists, then delete the VM
        and assert that the snapshot has been removed.
        """

        code, _ = api_client.vm_snapshots.get(vm_snapshot_name)
        assert 200 == code

        name, _ = source_vm

        code, _ = api_client.vms.delete(name)
        assert 200 == code

        def wait_for_snapshot_to_disappear(snapshot):
            deadline = datetime.now() + timedelta(seconds=wait_timeout)
            while deadline > datetime.now():
                code, _ = api_client.vm_snapshots.get(snapshot)
                if code == 404:
                    return
                sleep(1)
            else:
                AssertionError(f"timeout while waiting for {snapshot}"
                               f" to be deleted after its VM was deleted")

        wait_for_snapshot_to_disappear(vm_snapshot_name)

    @pytest.mark.dependency(depends=["snapshot_created_from_detached_source_vm_pvc",
                                     "cleaned_up_after_vm_delete"])
    def test_volume_snapshots_are_cleaned_up_after_source_volume_deleted(self, api_client,
                                                                         source_vm, wait_timeout):
        """
        Test that any volume snapshots that result from taking
        a VM snapshot while the PVC is detached are cleaned up
        after the volume is deleted.

        Prerequisites:
        The volume from the original VM (`source-vm`) exists
        and is not attached because the original VM was replaced
        and the deletePolicy was `retain`.
        """

        # First, assert that the expected volume exists.
        name, _ = source_vm
        volumename = f"{name}-disk-0"

        code, _ = api_client.volumes.get(volumename)
        assert 200 == code

        # And assert that it has a volume snapshot associated with it.
        volumesnapshotname = f"vm-snapshot-volume-{volumename}"

        code, data = api_client.vol_snapshots.get(volumesnapshotname)
        assert 200 == code

        ownerpvc = data.get("spec", {}).get("source", {}).get("persistentVolumeClaimName")
        assert volumename == ownerpvc

        # Then delete the volume and wait for it to disappear.
        code, _ = api_client.volumes.delete(volumename)
        deadline = datetime.now() + timedelta(seconds=wait_timeout)
        while deadline > datetime.now():
            code, _ = api_client.volumes.get(volumename)
            if code == 404:
                break
            sleep(1)
        else:
            raise AssertionError(f"timed out waiting for {volumename} to be deleted")

        # Finally, wait for the volume snapshot to be cleaned up
        # automatically.
        code, _ = api_client.vol_snapshots.get(volumesnapshotname)
        while deadline > datetime.now():
            code, _ = api_client.vol_snapshots.get(volumesnapshotname)
            if code == 404:
                break
            sleep(1)
        else:
            raise AssertionError(f"timed out waiting for {volumesnapshotname} to be deleted")

Class variables

var pytestmark

Methods

def test_create_vm_snapshot_while_pvc_detached(self, api_client, vm_snapshot_2_name, source_vm, wait_timeout)
Expand source code
@pytest.mark.dependency(name="snapshot_created_from_detached_source_vm_pvc",
                        depends=["detached_source_vm_pvc"])
def test_create_vm_snapshot_while_pvc_detached(self, api_client,
                                               vm_snapshot_2_name, source_vm, wait_timeout):
    """
    Test that a VM snapshot can be created when the source
    PVC is detached.

    Prerequisites:
    The original VM (`source-vm`) exists and is stopped (so that
    the PVC is detached.)
    """
    name, _ = source_vm

    stop_vm(name, api_client, wait_timeout)

    code, _ = api_client.vm_snapshots.create(name, vm_snapshot_2_name)
    assert 201 == code

    deadline = datetime.now() + timedelta(seconds=wait_timeout)
    while deadline > datetime.now():
        code, data = api_client.vm_snapshots.get(vm_snapshot_2_name)
        if data.get("status", {}).get("readyToUse"):
            break
        print(f"waiting for {vm_snapshot_2_name} to be ready")
        sleep(3)
    else:
        raise AssertionError(f"timed out waiting for {vm_snapshot_2_name} to be ready")

    code, data = api_client.vm_snapshots.get(vm_snapshot_2_name)

    assert 200 == code
    assert data.get("status", {}).get("readyToUse") is True

Test that a VM snapshot can be created when the source PVC is detached.

Prerequisites: The original VM (source-vm) exists and is stopped (so that the PVC is detached.)

def test_replace_is_rejected_when_deletepolicy_is_retain(self, api_client, source_vm, vm_snapshot_name, wait_timeout)
Expand source code
@pytest.mark.dependency(depends=["source_vm_snapshot"])
def test_replace_is_rejected_when_deletepolicy_is_retain(self, api_client,
                                                         source_vm, vm_snapshot_name,
                                                         wait_timeout):
    name, _ = source_vm

    stop_vm(name, api_client, wait_timeout)

    """
    Test that the Harvester API rejects a `replace`
    VirtualMachineRestore where the deletePolicy is
    not `retain`.

    Prequisites:
    1. The original VM (`source-vm`) and snapshot (`vm-snapshot`)
    from the first test case.
    """
    spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True)
    code, data = api_client.vm_snapshots.restore(vm_snapshot_name, spec)

    reason = data.get("message")

    wantmsg = "Delete policy with backup type snapshot"
    " for replacing VM is not supported"

    assert wantmsg in reason
    assert 422 == code
def test_replace_vm_with_vm_snapshot(self,
api_client,
source_vm,
vm_snapshot_name,
ssh_keypair,
host_shell,
vm_shell,
wait_timeout)
Expand source code
@pytest.mark.dependency(name="replaced_source_vm", depends=["source_vm_snapshot"])
def test_replace_vm_with_vm_snapshot(self, api_client,
                                     source_vm, vm_snapshot_name,
                                     ssh_keypair, host_shell,
                                     vm_shell, wait_timeout):
    """
    Test that the original virtual machine can be replaced
    from its original snapshot (`vm-snapshot`) and that
    the snapshot's data contains the well-known file (`test.txt`)
    and its expected contents (`123`).

    Prerequisites:
    `vm-snapshot` VM snapshot exists.
    """
    name, ssh_user = source_vm
    start_vm(name, api_client, wait_timeout)

    def modify(sh):
        _, _ = sh.exec_command("rm -f test.txt && sync")

    vm_shell_do(name, api_client,
                host_shell, vm_shell,
                ssh_user, ssh_keypair,
                modify, wait_timeout)

    # Just to wait for `sync`
    sleep(2)

    stop_vm(name, api_client, wait_timeout)

    spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=False)
    code, data = api_client.vm_snapshots.restore(vm_snapshot_name, spec)

    deadline = datetime.now() + timedelta(seconds=wait_timeout)
    while deadline > datetime.now():
        code, data = api_client.vms.get(name)
        if 200 == code and "Running" == data.get("status", {}).get("printableStatus"):
            break
        print("waiting for restored vm to be running")
        sleep(3)
    assert "Running" == data.get("status", {}).get("printableStatus")

    def actassert(sh):
        out, _ = sh.exec_command("cat test.txt")
        assert "123" in out

    vm_shell_do(name, api_client,
                host_shell, vm_shell,
                ssh_user, ssh_keypair,
                actassert, wait_timeout)

Test that the original virtual machine can be replaced from its original snapshot (vm-snapshot) and that the snapshot's data contains the well-known file (test.txt) and its expected contents (123).

Prerequisites: vm-snapshot VM snapshot exists.

def test_restore_from_vm_snapshot_while_pvc_detached_from_source(self, api_client, restored_vm_2, host_shell, vm_shell, ssh_keypair, wait_timeout)
Expand source code
@pytest.mark.dependency(name="detached_source_vm_pvc", depends=["replaced_source_vm"])
def test_restore_from_vm_snapshot_while_pvc_detached_from_source(self,
                                                                 api_client,
                                                                 restored_vm_2,
                                                                 host_shell,
                                                                 vm_shell,
                                                                 ssh_keypair,
                                                                 wait_timeout):
    """
    Test that a new virtual machine can be created from a
    VM snapshot created from a source PersistentVolumeClaim
    that is now detached.

    Prerequisites:
    The original VM (`source-vm`) exists and is stopped (so that
    the PVC is detached.)

    The original snapshot (`vm-snapshot`) exists.
    """

    name, ssh_user = restored_vm_2

    def actassert(sh):
        out, _ = sh.exec_command("cat test.txt")
        assert "123" in out

    vm_shell_do(name, api_client,
                host_shell, vm_shell,
                ssh_user, ssh_keypair,
                actassert, wait_timeout)

Test that a new virtual machine can be created from a VM snapshot created from a source PersistentVolumeClaim that is now detached.

Prerequisites: The original VM (source-vm) exists and is stopped (so that the PVC is detached.)

The original snapshot (vm-snapshot) exists.

def test_restore_into_new_vm_from_vm_snapshot(self,
api_client,
restored_from_snapshot_vm,
ssh_keypair,
host_shell,
vm_shell,
wait_timeout)
Expand source code
@pytest.mark.dependency(depends=["source_vm_snapshot"])
def test_restore_into_new_vm_from_vm_snapshot(self, api_client,
                                              restored_from_snapshot_vm,
                                              ssh_keypair, host_shell,
                                              vm_shell, wait_timeout):
    """
    Test that restoring the `vm-snapshot` into a new virtual
    machine results in a virtual machine with the expected
    well-known file (`test.txt`) with the expected contents
    (`123`).

    Prerequisites:
    1. The source VM from the first test case and its
       snapshot (`vm-snapshot`).
    """

    name, ssh_user = restored_from_snapshot_vm

    def actassert(sh):
        out, _ = sh.exec_command("cat test.txt")
        assert "123" in out

    vm_shell_do(name, api_client,
                host_shell, vm_shell,
                ssh_user, ssh_keypair,
                actassert, wait_timeout)

Test that restoring the vm-snapshot into a new virtual machine results in a virtual machine with the expected well-known file (test.txt) with the expected contents (123).

Prerequisites: 1. The source VM from the first test case and its snapshot (vm-snapshot).

def test_vm_snapshot_create(self,
api_client,
source_vm,
vm_snapshot_name,
host_shell,
vm_shell,
ssh_keypair,
wait_timeout)
Expand source code
@pytest.mark.dependency(name="source_vm_snapshot")
def test_vm_snapshot_create(self, api_client,
                            source_vm, vm_snapshot_name,
                            host_shell, vm_shell,
                            ssh_keypair, wait_timeout):
    """
    Test that the VM snapshot can be created.

    Prerequisite:
    A virtual machine has been created and is running.
    """
    name, ssh_user = source_vm

    def action(sh):
        _, _ = sh.exec_command("echo 123 > test.txt")
        _, _ = sh.exec_command("sync")

    vm_shell_do(name, api_client,
                host_shell, vm_shell,
                ssh_user, ssh_keypair,
                action, wait_timeout)

    # Since `sync` isn't actually synchronous, wait a couple of
    # seconds to let the I/O flush to disk.
    sleep(2)

    code, _ = api_client.vm_snapshots.create(name, vm_snapshot_name)
    assert 201 == code

    deadline = datetime.now() + timedelta(seconds=wait_timeout)
    while deadline > datetime.now():
        code, data = api_client.vm_snapshots.get(vm_snapshot_name)
        if data.get("status", {}).get("readyToUse"):
            break
        print(f"waiting for {vm_snapshot_name} to be ready")
        sleep(3)
    else:
        raise AssertionError(f"timed out waiting for {vm_snapshot_name} to be ready")

    assert 200 == code
    assert data.get("status", {}).get("readyToUse") is True

Test that the VM snapshot can be created.

Prerequisite: A virtual machine has been created and is running.

def test_vm_snapshots_are_cleaned_up_after_source_vm_deleted(self, api_client, source_vm, vm_snapshot_name, wait_timeout)
Expand source code
@pytest.mark.dependency(name="cleaned_up_after_vm_delete")
def test_vm_snapshots_are_cleaned_up_after_source_vm_deleted(self, api_client,
                                                             source_vm, vm_snapshot_name,
                                                             wait_timeout):
    """
    Test that VM snapshots are removed when the VM they correspond
    to have been deleted.

    Prerequisites:
    The original VM (`source-vm`) exists and so does its first
    snapshot (`vm-snapshot`).

    Assert that the snapshot exists, then delete the VM
    and assert that the snapshot has been removed.
    """

    code, _ = api_client.vm_snapshots.get(vm_snapshot_name)
    assert 200 == code

    name, _ = source_vm

    code, _ = api_client.vms.delete(name)
    assert 200 == code

    def wait_for_snapshot_to_disappear(snapshot):
        deadline = datetime.now() + timedelta(seconds=wait_timeout)
        while deadline > datetime.now():
            code, _ = api_client.vm_snapshots.get(snapshot)
            if code == 404:
                return
            sleep(1)
        else:
            AssertionError(f"timeout while waiting for {snapshot}"
                           f" to be deleted after its VM was deleted")

    wait_for_snapshot_to_disappear(vm_snapshot_name)

Test that VM snapshots are removed when the VM they correspond to have been deleted.

Prerequisites: The original VM (source-vm) exists and so does its first snapshot (vm-snapshot).

Assert that the snapshot exists, then delete the VM and assert that the snapshot has been removed.

def test_volume_snapshot_not_exist(self, api_client)
Expand source code
@pytest.mark.dependency(depends=["source_vm_snapshot"])
def test_volume_snapshot_not_exist(self, api_client):
    ''' ref: https://github.com/harvester/tests/issues/524 '''
    code, data = api_client.vol_snapshots.get()

    assert 200 == code, (code, data)
    assert not data['data'], (code, data)
def test_volume_snapshots_are_cleaned_up_after_source_volume_deleted(self, api_client, source_vm, wait_timeout)
Expand source code
@pytest.mark.dependency(depends=["snapshot_created_from_detached_source_vm_pvc",
                                 "cleaned_up_after_vm_delete"])
def test_volume_snapshots_are_cleaned_up_after_source_volume_deleted(self, api_client,
                                                                     source_vm, wait_timeout):
    """
    Test that any volume snapshots that result from taking
    a VM snapshot while the PVC is detached are cleaned up
    after the volume is deleted.

    Prerequisites:
    The volume from the original VM (`source-vm`) exists
    and is not attached because the original VM was replaced
    and the deletePolicy was `retain`.
    """

    # First, assert that the expected volume exists.
    name, _ = source_vm
    volumename = f"{name}-disk-0"

    code, _ = api_client.volumes.get(volumename)
    assert 200 == code

    # And assert that it has a volume snapshot associated with it.
    volumesnapshotname = f"vm-snapshot-volume-{volumename}"

    code, data = api_client.vol_snapshots.get(volumesnapshotname)
    assert 200 == code

    ownerpvc = data.get("spec", {}).get("source", {}).get("persistentVolumeClaimName")
    assert volumename == ownerpvc

    # Then delete the volume and wait for it to disappear.
    code, _ = api_client.volumes.delete(volumename)
    deadline = datetime.now() + timedelta(seconds=wait_timeout)
    while deadline > datetime.now():
        code, _ = api_client.volumes.get(volumename)
        if code == 404:
            break
        sleep(1)
    else:
        raise AssertionError(f"timed out waiting for {volumename} to be deleted")

    # Finally, wait for the volume snapshot to be cleaned up
    # automatically.
    code, _ = api_client.vol_snapshots.get(volumesnapshotname)
    while deadline > datetime.now():
        code, _ = api_client.vol_snapshots.get(volumesnapshotname)
        if code == 404:
            break
        sleep(1)
    else:
        raise AssertionError(f"timed out waiting for {volumesnapshotname} to be deleted")

Test that any volume snapshots that result from taking a VM snapshot while the PVC is detached are cleaned up after the volume is deleted.

Prerequisites: The volume from the original VM (source-vm) exists and is not attached because the original VM was replaced and the deletePolicy was retain.