Module harvester_e2e_tests.integrations.test_4_vm_host_cpu_pinning

Functions

def cpu_managers(api_client, wait_timeout)
Expand source code
@pytest.fixture(scope="module")
def cpu_managers(api_client, wait_timeout):
    def wait_job_completed():
        url = "/apis/batch/v1/namespaces/harvester-system/jobs"
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            waiting = [x for x in api_client._get(url).json()['items']
                       if 'cpu-manager' in x['metadata']['name'] and x['status'].get('active')]
            if not waiting:
                break
            sleep(3)

    def wait_applied(node, status):
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.hosts.get(node)
            if 200 == code:
                label = data['metadata']['labels'].get('cpumanager')
                if status == label:
                    wait_job_completed()
                    break
            sleep(3)
        else:
            raise AssertionError(
                f"Fail to modify CPU manager on node {node} with {wait_timeout} timed out\n"
                f"The node {node} still labeling cpumanager: {label}, expected to {status}"
            )

    origin, enabled, disabled = dict(), [], []
    code, data = api_client.hosts.get()
    for node in data['data']:
        name = node['metadata']['name']
        status = json.loads(node['metadata']['labels'].get("cpumanager", 'false'))
        origin[name] = status
        if status:
            enabled.append(name)
        else:
            disabled.append(name)

    state = SimpleNamespace(
        nodes=list(origin),
        origin=origin,
        modified=list(),
        enabled=enabled,
        disabled=disabled,
        wait_applied=wait_applied,
        wait_job_completed=wait_job_completed
    )
    yield state

    # Revert the change
    code, data = api_client.hosts.get()
    for node in data['data']:
        name = node['metadata']['name']
        status = json.loads(node['metadata']['labels'].get("cpumanager", 'false'))
        if status != origin[name]:
            wait_applied(name, origin[name])
def ubuntu_image(api_client, unique_name, image_ubuntu, image_checker)
Expand source code
@pytest.fixture(scope="module")
def ubuntu_image(api_client, unique_name, image_ubuntu, image_checker):
    name = f"{image_ubuntu.name}-{unique_name}"
    code, data = api_client.images.create_by_url(name, image_ubuntu.url)
    assert 201 == code, (code, data)

    image_downloaded, (code, data) = image_checker.wait_downloaded(name)
    assert image_downloaded, (code, data)

    namespace = data['metadata']['namespace']
    assert name == data['metadata']['name'], data

    yield SimpleNamespace(
        name=name,
        id=f"{namespace}/{name}",
        ssh_user=image_ubuntu.ssh_user
    )

    # teardown
    code, data = api_client.images.delete(name, namespace)
    assert 200 == code, (code, data)
    image_deleted, (code, data) = image_checker.wait_deleted(name)
    assert image_deleted, (code, data)

Classes

class TestCPUManager
Expand source code
@pytest.mark.p0
@pytest.mark.hosts
@pytest.mark.smoke
class TestCPUManager:
    """ Ref: https://github.com/harvester/harvester/issues/2305 """

    @pytest.mark.dependency(name="enable_manager")
    def test_enable(self, api_client, cpu_managers, wait_timeout):
        for node in cpu_managers.nodes[::-1]:
            if not cpu_managers.origin[node]:
                code, data = api_client.hosts.cpu_manager(node, enable=True)
                assert 204 == code, (code, data)
                cpu_managers.modified.append((node, 'true'))
                break
        else:
            raise AssertionError("CPU manager has been enabled on every nodes")

        cpu_managers.wait_applied(node, 'true')

    @pytest.mark.negative
    @pytest.mark.dependency(depends=["enable_manager"])
    def test_enable_again(self, api_client, cpu_managers):
        node, modified = cpu_managers.modified[-1]
        code, data = api_client.hosts.get(node)

        assert 200 == code, (code, data)
        status = data['metadata']['labels'].get('cpumanager', 'false')
        assert status == modified, (
            f"The node {node} was modified cpumanager to {modified}, "
            f"but it's showing {status} now."
        )

        code, data = api_client.hosts.cpu_manager(node, enable=True)
        assert 400 == code and "policy is already the same" in data, (code, data)
        cpu_managers.wait_job_completed()

    @pytest.mark.dependency(name="disable_manager", depends=["enable_manager"])
    def test_disable(self, api_client, cpu_managers, wait_timeout):
        node, modified = cpu_managers.modified.pop()
        code, data = api_client.hosts.get(node)

        assert 200 == code, (code, data)
        status = data['metadata']['labels'].get('cpumanager', 'false')
        assert status == modified, (
            f"The node {node} was modified cpumanager to {modified}, "
            f"but it's showing {status} now."
        )

        code, data = api_client.hosts.cpu_manager(node, enable=False)
        assert 204 == code, (code, data)

        cpu_managers.wait_applied(node, 'false')

    @pytest.mark.negative
    def test_disable_again(self, api_client, cpu_managers):
        for node in cpu_managers.nodes[::-1]:
            if not cpu_managers.origin[node]:
                code, data = api_client.hosts.cpu_manager(node, enable=False)
                assert 400 == code and "policy is already the same" in data, (code, data)
                break
        else:
            raise AssertionError("CPU manager has been enabled on every nodes")
        cpu_managers.wait_job_completed()

Class variables

var pytestmark

Methods

def test_disable(self, api_client, cpu_managers, wait_timeout)
Expand source code
@pytest.mark.dependency(name="disable_manager", depends=["enable_manager"])
def test_disable(self, api_client, cpu_managers, wait_timeout):
    node, modified = cpu_managers.modified.pop()
    code, data = api_client.hosts.get(node)

    assert 200 == code, (code, data)
    status = data['metadata']['labels'].get('cpumanager', 'false')
    assert status == modified, (
        f"The node {node} was modified cpumanager to {modified}, "
        f"but it's showing {status} now."
    )

    code, data = api_client.hosts.cpu_manager(node, enable=False)
    assert 204 == code, (code, data)

    cpu_managers.wait_applied(node, 'false')
def test_disable_again(self, api_client, cpu_managers)
Expand source code
@pytest.mark.negative
def test_disable_again(self, api_client, cpu_managers):
    for node in cpu_managers.nodes[::-1]:
        if not cpu_managers.origin[node]:
            code, data = api_client.hosts.cpu_manager(node, enable=False)
            assert 400 == code and "policy is already the same" in data, (code, data)
            break
    else:
        raise AssertionError("CPU manager has been enabled on every nodes")
    cpu_managers.wait_job_completed()
def test_enable(self, api_client, cpu_managers, wait_timeout)
Expand source code
@pytest.mark.dependency(name="enable_manager")
def test_enable(self, api_client, cpu_managers, wait_timeout):
    for node in cpu_managers.nodes[::-1]:
        if not cpu_managers.origin[node]:
            code, data = api_client.hosts.cpu_manager(node, enable=True)
            assert 204 == code, (code, data)
            cpu_managers.modified.append((node, 'true'))
            break
    else:
        raise AssertionError("CPU manager has been enabled on every nodes")

    cpu_managers.wait_applied(node, 'true')
def test_enable_again(self, api_client, cpu_managers)
Expand source code
@pytest.mark.negative
@pytest.mark.dependency(depends=["enable_manager"])
def test_enable_again(self, api_client, cpu_managers):
    node, modified = cpu_managers.modified[-1]
    code, data = api_client.hosts.get(node)

    assert 200 == code, (code, data)
    status = data['metadata']['labels'].get('cpumanager', 'false')
    assert status == modified, (
        f"The node {node} was modified cpumanager to {modified}, "
        f"but it's showing {status} now."
    )

    code, data = api_client.hosts.cpu_manager(node, enable=True)
    assert 400 == code and "policy is already the same" in data, (code, data)
    cpu_managers.wait_job_completed()
class TestPinCPUonVM
Expand source code
@pytest.mark.p0
@pytest.mark.hosts
@pytest.mark.virtualmachines
@pytest.mark.smoke
class TestPinCPUonVM:
    """ Ref: https://github.com/harvester/harvester/issues/2305 """

    @pytest.mark.dependency(name="enable_cpu_managers")
    def test_enable_cpu_mangers(self, api_client, wait_timeout, cpu_managers):

        # scenario: 2+ nodes, all enabled, need to disable one of the node
        if len(cpu_managers.enabled) == len(cpu_managers.nodes) > 1:
            node = cpu_managers.enabled.pop()
            code, data = api_client.hosts.cpu_manager(node, enable=False)
            assert 204 == code, (code, data)
            cpu_managers.modified.append((node, True))
            cpu_managers.disabled.append(node)
            cpu_managers.wait_applied(node, 'false')

        # scenario: 3+ nodes, need to enable nodes to hit 2
        if len(cpu_managers.enabled) < 2 < len(cpu_managers.nodes):
            for _ in range(2 - len(cpu_managers.enabled)):
                node = cpu_managers.disabled.pop()
                code, data = api_client.hosts.cpu_manager(node, enable=True)
                assert 204 == code, (code, data)
                cpu_managers.modified.append((node, True))
                cpu_managers.enabled.append(node)
                cpu_managers.wait_applied(node, 'true')

        # scenario: single node
        if len(cpu_managers.disabled) == len(cpu_managers.nodes):
            node = cpu_managers.disabled.pop()
            code, data = api_client.hosts.cpu_manager(node, enable=True)
            assert 204 == code, (code, data)
            cpu_managers.modified.append((node, True))
            cpu_managers.enabled.append(node)
            cpu_managers.wait_applied(node, 'true')

    @pytest.mark.dependency(name="pin_cpu_on_vm", depends=["enable_cpu_managers"])
    def test_pin_cpu_on_vm(self, api_client, unique_name, vm_checker, ubuntu_image):
        cpu, mem, unique_vm_name = 1, 2, f"pin-cpu-{unique_name}"
        vm = api_client.vms.Spec(cpu, mem)
        vm.add_image("disk-0", ubuntu_image.id)
        vm.cpu_pinning = True

        code, data = api_client.vms.create(unique_vm_name, vm)
        assert 201 == code, (code, data)
        vm_started, (code, vmi) = vm_checker.wait_started(unique_vm_name)
        assert vm_started, (code, vmi)

        assert vmi['spec']['domain']['cpu'].get('dedicatedCpuPlacement') is True, (
            f"The VM {unique_vm_name} started but CPU not pinned."
        )

    @pytest.mark.dependency(depends=["pin_cpu_on_vm"])
    def test_reboot_vm_with_cpu_pinned(self, api_client, unique_name, vm_checker):
        unique_vm_name = f"pin-cpu-{unique_name}"

        vm_started, (code, vmi) = vm_checker.wait_restarted(unique_vm_name)
        assert vm_started, (code, vmi)
        assert vmi['spec']['domain']['cpu'].get('dedicatedCpuPlacement') is True, (
            f"The VM {unique_vm_name} started but CPU not pinned."
        )

    @pytest.mark.dependency(depends=["pin_cpu_on_vm"])
    def test_migrate_vm_with_cpu_pinned(self, api_client, unique_name, vm_checker, cpu_managers):
        assert len(cpu_managers.nodes) >= len(cpu_managers.enabled) >= 2, (
            "No enough nodes enabled cpu manager for migration"
        )
        unique_vm_name = f"pin-cpu-{unique_name}"
        code, data = api_client.vms.get_status(unique_vm_name)
        host = data['status']['nodeName']

        new_target = next(n for n in cpu_managers.enabled if n != host)
        vm_migrated, (code, data) = vm_checker.wait_migrated(unique_vm_name, new_target)
        assert vm_migrated, (
            f"Failed to Migrate VM({unique_vm_name}) from {host} to {new_target}\n"
            f"API Status({code}): {data}"
        )

    @pytest.mark.negative
    @pytest.mark.dependency(depends=["pin_cpu_on_vm"])
    def test_disable_cpu_manager_when_vm_on_it(self, api_client, unique_name, wait_timeout):
        unique_vm_name = f"pin-cpu-{unique_name}"
        code, data = api_client.vms.get_status(unique_vm_name)
        host = data['status']['nodeName']

        code, data = api_client.hosts.cpu_manager(host, enable=False)
        assert 400 == code and "not be any running VM" in data, (code, data)

        # side-effect: teardown the VM
        code, data = api_client.vms.get(unique_vm_name)
        vm_spec = api_client.vms.Spec.from_dict(data)
        api_client.vms.delete(unique_vm_name)
        endtime = datetime.now() + timedelta(seconds=wait_timeout)
        while endtime > datetime.now():
            code, data = api_client.vms.get_status(unique_vm_name)
            if 404 == code:
                break
            sleep(3)
        for vol in vm_spec.volumes:
            vol_name = vol['volume']['persistentVolumeClaim']['claimName']
            api_client.volumes.delete(vol_name)

Class variables

var pytestmark

Methods

def test_disable_cpu_manager_when_vm_on_it(self, api_client, unique_name, wait_timeout)
Expand source code
@pytest.mark.negative
@pytest.mark.dependency(depends=["pin_cpu_on_vm"])
def test_disable_cpu_manager_when_vm_on_it(self, api_client, unique_name, wait_timeout):
    unique_vm_name = f"pin-cpu-{unique_name}"
    code, data = api_client.vms.get_status(unique_vm_name)
    host = data['status']['nodeName']

    code, data = api_client.hosts.cpu_manager(host, enable=False)
    assert 400 == code and "not be any running VM" in data, (code, data)

    # side-effect: teardown the VM
    code, data = api_client.vms.get(unique_vm_name)
    vm_spec = api_client.vms.Spec.from_dict(data)
    api_client.vms.delete(unique_vm_name)
    endtime = datetime.now() + timedelta(seconds=wait_timeout)
    while endtime > datetime.now():
        code, data = api_client.vms.get_status(unique_vm_name)
        if 404 == code:
            break
        sleep(3)
    for vol in vm_spec.volumes:
        vol_name = vol['volume']['persistentVolumeClaim']['claimName']
        api_client.volumes.delete(vol_name)
def test_enable_cpu_mangers(self, api_client, wait_timeout, cpu_managers)
Expand source code
@pytest.mark.dependency(name="enable_cpu_managers")
def test_enable_cpu_mangers(self, api_client, wait_timeout, cpu_managers):

    # scenario: 2+ nodes, all enabled, need to disable one of the node
    if len(cpu_managers.enabled) == len(cpu_managers.nodes) > 1:
        node = cpu_managers.enabled.pop()
        code, data = api_client.hosts.cpu_manager(node, enable=False)
        assert 204 == code, (code, data)
        cpu_managers.modified.append((node, True))
        cpu_managers.disabled.append(node)
        cpu_managers.wait_applied(node, 'false')

    # scenario: 3+ nodes, need to enable nodes to hit 2
    if len(cpu_managers.enabled) < 2 < len(cpu_managers.nodes):
        for _ in range(2 - len(cpu_managers.enabled)):
            node = cpu_managers.disabled.pop()
            code, data = api_client.hosts.cpu_manager(node, enable=True)
            assert 204 == code, (code, data)
            cpu_managers.modified.append((node, True))
            cpu_managers.enabled.append(node)
            cpu_managers.wait_applied(node, 'true')

    # scenario: single node
    if len(cpu_managers.disabled) == len(cpu_managers.nodes):
        node = cpu_managers.disabled.pop()
        code, data = api_client.hosts.cpu_manager(node, enable=True)
        assert 204 == code, (code, data)
        cpu_managers.modified.append((node, True))
        cpu_managers.enabled.append(node)
        cpu_managers.wait_applied(node, 'true')
def test_migrate_vm_with_cpu_pinned(self, api_client, unique_name, vm_checker, cpu_managers)
Expand source code
@pytest.mark.dependency(depends=["pin_cpu_on_vm"])
def test_migrate_vm_with_cpu_pinned(self, api_client, unique_name, vm_checker, cpu_managers):
    assert len(cpu_managers.nodes) >= len(cpu_managers.enabled) >= 2, (
        "No enough nodes enabled cpu manager for migration"
    )
    unique_vm_name = f"pin-cpu-{unique_name}"
    code, data = api_client.vms.get_status(unique_vm_name)
    host = data['status']['nodeName']

    new_target = next(n for n in cpu_managers.enabled if n != host)
    vm_migrated, (code, data) = vm_checker.wait_migrated(unique_vm_name, new_target)
    assert vm_migrated, (
        f"Failed to Migrate VM({unique_vm_name}) from {host} to {new_target}\n"
        f"API Status({code}): {data}"
    )
def test_pin_cpu_on_vm(self, api_client, unique_name, vm_checker, ubuntu_image)
Expand source code
@pytest.mark.dependency(name="pin_cpu_on_vm", depends=["enable_cpu_managers"])
def test_pin_cpu_on_vm(self, api_client, unique_name, vm_checker, ubuntu_image):
    cpu, mem, unique_vm_name = 1, 2, f"pin-cpu-{unique_name}"
    vm = api_client.vms.Spec(cpu, mem)
    vm.add_image("disk-0", ubuntu_image.id)
    vm.cpu_pinning = True

    code, data = api_client.vms.create(unique_vm_name, vm)
    assert 201 == code, (code, data)
    vm_started, (code, vmi) = vm_checker.wait_started(unique_vm_name)
    assert vm_started, (code, vmi)

    assert vmi['spec']['domain']['cpu'].get('dedicatedCpuPlacement') is True, (
        f"The VM {unique_vm_name} started but CPU not pinned."
    )
def test_reboot_vm_with_cpu_pinned(self, api_client, unique_name, vm_checker)
Expand source code
@pytest.mark.dependency(depends=["pin_cpu_on_vm"])
def test_reboot_vm_with_cpu_pinned(self, api_client, unique_name, vm_checker):
    unique_vm_name = f"pin-cpu-{unique_name}"

    vm_started, (code, vmi) = vm_checker.wait_restarted(unique_vm_name)
    assert vm_started, (code, vmi)
    assert vmi['spec']['domain']['cpu'].get('dedicatedCpuPlacement') is True, (
        f"The VM {unique_vm_name} started but CPU not pinned."
    )