Module harvester_e2e_tests.integrations.test_1_volumes

Functions

def test_create_volume(api_client, unique_name, ubuntu_image, create_as, source_type, polling_for)
  1. Create a volume from image
  2. Create should respond with 201
  3. Wait for volume to create
  4. Failures should be at 0
  5. Get volume metadata
  6. Volume should not be in error or transitioning state
  7. ImageId should match what was used in create
  8. Delete volume
  9. Delete volume should reply 404 after delete Ref.
def test_create_volume_bad_checksum(api_client, unique_name, ubuntu_image_bad_checksum, create_as, source_type, polling_for)
  1. Create a volume from image with a bad checksum
  2. Create should respond with 201
  3. Wait for volume to create
  4. Wait for 4 failures in the volume fail status
  5. Failures should be set at 4
  6. Delete volume
  7. Delete volume should reply 404 after delete Ref. https://github.com/harvester/tests/issues/1121
def ubuntu_image(api_client, unique_name, image_ubuntu, polling_for)

Generates a Ubuntu image

  1. Creates an image name based on unique_name
  2. Create the image based on URL
  3. Response for creation should be 201
  4. Loop while waiting for image to be created
  5. Yield the image with the namespace and name
  6. Delete the image
  7. The response for getting the image name should be 404 after deletion
def ubuntu_image_bad_checksum(api_client, unique_name, image_ubuntu, polling_for)

Generates a Ubuntu image with a bad sha512 checksum

  1. Creates an image name based on unique_name
  2. Create the image based on URL with a bad statically assigned checksum
  3. Response for creation should be 201
  4. Loop while waiting for image to be created
  5. Yield the image with the namespace and name
  6. Delete the image
  7. The response for getting the image name should be 404 after deletion
def ubuntu_vm(api_client, unique_name, ubuntu_image, polling_for)

Classes

class TestVolumeWithVM
Expand source code
@pytest.mark.p0
@pytest.mark.volumes
class TestVolumeWithVM:
    def pause_vm(self, api_client, ubuntu_vm, polling_for):
        vm_name = ubuntu_vm['metadata']['name']
        code, data = api_client.vms.pause(vm_name)
        assert 204 == code, f"Fail to pause VM\n{code}, {data}"
        polling_for("VM do paused",
                    lambda c, d: d.get('status', {}).get('printableStatus') == "Paused",
                    api_client.vms.get, vm_name)

    def stop_vm(self, api_client, ubuntu_vm, polling_for):
        vm_name = ubuntu_vm['metadata']['name']
        code, data = api_client.vms.stop(vm_name)
        assert 204 == code, f"Fail to stop VM\n{code}, {data}"
        polling_for("VM do stopped",
                    lambda c, d: 404 == c,
                    api_client.vms.get_status, vm_name)

    def delete_vm(self, api_client, ubuntu_vm, polling_for):
        vm_name = ubuntu_vm['metadata']['name']
        code, data = api_client.vms.delete(vm_name)
        assert 200 == code, f"Fail to delete VM\n{code}, {data}"
        polling_for("VM do deleted",
                    lambda c, d: 404 == c,
                    api_client.vms.get, vm_name)

    def test_delete_volume_on_existing_vm(self, api_client, ubuntu_image, ubuntu_vm, polling_for):
        """
        1. Create a VM with volume
        2. Delete volume should reply 422
        3. Pause VM
        4. Delete volume should reply 422 too
        5. Stop VM
        6. Delete volume should reply 422 too
        Ref. https://github.com/harvester/tests/issues/905
        """
        vol_name = (ubuntu_vm["spec"]["template"]["spec"]["volumes"][0]
                             ['persistentVolumeClaim']['claimName'])

        code, data = api_client.volumes.delete(vol_name)
        assert 422 == code, f"Should fail to delete volume\n{code}, {data}"

        self.pause_vm(api_client, ubuntu_vm, polling_for)
        code, data = api_client.volumes.delete(vol_name)
        assert 422 == code, f"Should fail to delete volume\n{code}, {data}"

        self.stop_vm(api_client, ubuntu_vm, polling_for)
        code, data = api_client.volumes.delete(vol_name)
        assert 422 == code, f"Should fail to delete volume\n{code}, {data}"

        # Check Volume
        code, data = api_client.volumes.get(vol_name)
        mdata, annotations = data['metadata'], data['metadata']['annotations']
        assert 200 == code, (code, data)
        assert mdata['name'] == vol_name, (code, data)
        # status
        assert not mdata['state']['error'], (code, data)
        assert not mdata['state']['transitioning'], (code, data)
        assert data['status']['phase'] == "Bound", (code, data)
        # source
        assert ubuntu_image["id"] == annotations['harvesterhci.io/imageId'], (code, data)

    def test_delete_volume_on_deleted_vm(self, api_client, ubuntu_image, ubuntu_vm, polling_for):
        """
        1. Create a VM with volume
        2. Delete VM but not volume
        3. Delete volume concurrently with VM
        4. VM should be deleted
        5. Volume should be deleted
        Ref. https://github.com/harvester/tests/issues/652
        """
        vm_name = ubuntu_vm['metadata']['name']
        vol_name = (ubuntu_vm["spec"]["template"]["spec"]["volumes"][0]
                             ['persistentVolumeClaim']['claimName'])

        api_client.vms.delete(vm_name)

        polling_for("Delete volume",
                    lambda c, d: 200 == c,
                    api_client.volumes.delete, vol_name)

        # Retry since VM is deleting
        polling_for("VM do deleted",
                    lambda c, d: 404 == c,
                    api_client.vms.get, vm_name)
        polling_for("Volume do deleted",
                    lambda c, d: 404 == c,
                    api_client.volumes.get, vol_name)

Class variables

var pytestmark

Methods

def delete_vm(self, api_client, ubuntu_vm, polling_for)
def pause_vm(self, api_client, ubuntu_vm, polling_for)
def stop_vm(self, api_client, ubuntu_vm, polling_for)
def test_delete_volume_on_deleted_vm(self, api_client, ubuntu_image, ubuntu_vm, polling_for)
  1. Create a VM with volume
  2. Delete VM but not volume
  3. Delete volume concurrently with VM
  4. VM should be deleted
  5. Volume should be deleted Ref. https://github.com/harvester/tests/issues/652
def test_delete_volume_on_existing_vm(self, api_client, ubuntu_image, ubuntu_vm, polling_for)
  1. Create a VM with volume
  2. Delete volume should reply 422
  3. Pause VM
  4. Delete volume should reply 422 too
  5. Stop VM
  6. Delete volume should reply 422 too Ref. https://github.com/harvester/tests/issues/905