Module harvester_e2e_tests.integrations.test_5_vm_networks

Functions

def cluster_network(vlan_nic, api_client, unique_name)
Expand source code
@pytest.fixture(scope='module')
def cluster_network(vlan_nic, api_client, unique_name):
    code, data = api_client.clusternetworks.get_config()
    assert 200 == code, (code, data)

    node_key = 'network.harvesterhci.io/matched-nodes'
    cnet_nodes = dict()  # cluster_network: items
    for cfg in data['items']:
        if vlan_nic in cfg['spec']['uplink']['nics']:
            nodes = json.loads(cfg['metadata']['annotations'][node_key])
            cnet_nodes.setdefault(cfg['spec']['clusterNetwork'], []).extend(nodes)

    code, data = api_client.hosts.get()
    assert 200 == code, (code, data)
    all_nodes = set(n['id'] for n in data['data'])
    try:
        # vlad_nic configured on specific cluster network, reuse it
        yield next(cnet for cnet, nodes in cnet_nodes.items() if all_nodes == set(nodes))
        return None
    except StopIteration:
        configured_nodes = reduce(add, cnet_nodes.values(), [])
        if any(n in configured_nodes for n in all_nodes):
            raise AssertionError(
                "Not all nodes' VLAN NIC {vlan_nic} are available.\n"
                f"VLAN NIC configured nodes: {configured_nodes}\n"
                f"All nodes: {all_nodes}\n"
            )

    # Create cluster network
    cnet = f"cnet-{datetime.strptime(unique_name, '%Hh%Mm%Ss%f-%m-%d').strftime('%H%M%S')}"
    created = []
    code, data = api_client.clusternetworks.create(cnet)
    assert 201 == code, (code, data)
    while all_nodes:
        node = all_nodes.pop()
        code, data = api_client.clusternetworks.create_config(node, cnet, vlan_nic, hostname=node)
        assert 201 == code, (
            f"Failed to create cluster config for {node}\n"
            f"Created: {created}\t Remaining: {all_nodes}\n"
            f"API Status({code}): {data}"
        )
        created.append(node)

    yield cnet

    # Teardown
    deleted = {name: api_client.clusternetworks.delete_config(name) for name in created}
    failed = [(name, code, data) for name, (code, data) in deleted.items() if 200 != code]
    if failed:
        fmt = "Unable to delete VLAN Config {} with error ({}): {}"
        raise AssertionError(
            "\n".join(fmt.format(name, code, data) for (name, code, data) in failed)
        )

    code, data = api_client.clusternetworks.delete(cnet)
    assert 200 == code, (code, data)
def gen_ifconfig()
Expand source code
@pytest.fixture(scope="session")
def gen_ifconfig():
    # eth/eno/ens(idx) | enp(idx)s[0-9]
    pattern = r"(?:(e(?:th|no|ns))(\d+)|(enp)(\d+)(s\d+))"

    def replace_to(idx):
        def _repl(match):
            p1, idx1, p2, idx2, tail = match.groups()
            return f"{p1}{int(idx1)+idx}" if not tail else f"{p2}{int(idx2)+idx}{tail}"
        return _repl

    def generate_ifconfig(ifname, idx=0):
        return {
            "type": "physical",
            "name": re.sub(pattern, replace_to(idx), ifname),
            "subnets": [dict(type="dhcp")]
        }

    return generate_ifconfig
def image(api_client, image_opensuse, unique_name, wait_timeout)
Expand source code
@pytest.fixture(scope="module")
def image(api_client, image_opensuse, unique_name, wait_timeout):
    unique_image_id = f'image-{unique_name}'
    code, data = api_client.images.create_by_url(
        unique_image_id, image_opensuse.url, display_name=f"{unique_name}-{image_opensuse.name}"
    )

    assert 201 == code, (code, data)

    endtime = datetime.now() + timedelta(seconds=wait_timeout)
    while endtime > datetime.now():
        code, data = api_client.images.get(unique_image_id)
        if 100 == data.get('status', {}).get('progress', 0):
            break
        sleep(3)
    else:
        raise AssertionError(
            "Failed to create Image with error:\n"
            f"Status({code}): {data}"
        )

    yield dict(id=f"{data['metadata']['namespace']}/{unique_image_id}",
               user=image_opensuse.ssh_user)

    code, data = api_client.images.delete(unique_image_id)
def minimal_vm(api_client, ssh_keypair, wait_timeout, unique_name, vm_checker, image)
Expand source code
@pytest.fixture(scope="class")
def minimal_vm(api_client, ssh_keypair, wait_timeout, unique_name, vm_checker, image):
    unique_vm_name = f"{datetime.now().strftime('%m%S%f')}-{unique_name}"
    cpu, mem = 1, 2
    pub_key, _ = ssh_keypair
    vm_spec = api_client.vms.Spec(cpu, mem)
    vm_spec.add_image("disk-0", image['id'])

    userdata = yaml.safe_load(vm_spec.user_data)
    userdata['ssh_authorized_keys'] = [pub_key]
    vm_spec.user_data = yaml.dump(userdata)

    code, data = api_client.vms.create(unique_vm_name, vm_spec)
    assert 201 == code, (code, data)
    vm_started, (code, data) = vm_checker.wait_interfaces(unique_vm_name)

    yield unique_vm_name, image['user']

    code, data = api_client.vms.get(unique_vm_name)
    vm_spec = api_client.vms.Spec.from_dict(data)
    vm_deleted, (code, data) = vm_checker.wait_deleted(unique_vm_name)
    for vol in vm_spec.volumes:
        vol_name = vol['volume']['persistentVolumeClaim']['claimName']
        api_client.volumes.delete(vol_name)
def two_mirror_vms(api_client, ssh_keypair, unique_name, vm_checker, image, vm_network)
Expand source code
@pytest.fixture(scope="class")
def two_mirror_vms(api_client, ssh_keypair, unique_name, vm_checker, image, vm_network):
    cpu, mem = 1, 2
    pub_key, pri_key = ssh_keypair
    vm_spec = api_client.vms.Spec(cpu, mem)
    vm_spec.add_image("disk-0", image['id'])
    vm_spec.mgmt_network = False
    vm_spec.add_network('nic-1', f"{vm_network['namespace']}/{vm_network['name']}")

    userdata = yaml.safe_load(vm_spec.user_data)
    userdata['ssh_authorized_keys'] = [pub_key]
    vm_spec.user_data = yaml.dump(userdata)
    vm_names = [f"vm{idx}-{unique_name}" for idx in range(1, 3)]

    for vm_name in vm_names:
        code, data = api_client.vms.create(vm_name, vm_spec)
        assert 201 == code, (code, data)

    yield vm_names, image['user']

    params = dict(removedDisks="disk-0", propagationPolicy="Foreground")
    for vm_name in vm_names:
        vm_checker.wait_deleted(vm_name, params=params)
def vm_network(api_client, unique_name, wait_timeout, cluster_network, vlan_id)
Expand source code
@pytest.fixture(scope="module")
def vm_network(api_client, unique_name, wait_timeout, cluster_network, vlan_id):
    code, data = api_client.networks.create(
        unique_name, vlan_id, cluster_network=cluster_network
    )
    assert 201 == code, (code, data)
    endtime = datetime.now() + timedelta(seconds=wait_timeout)
    while endtime > datetime.now():
        code, data = api_client.networks.get(unique_name)
        annotations = data['metadata'].get('annotations', {})
        if 200 == code and annotations.get('network.harvesterhci.io/route'):
            route = json.loads(annotations['network.harvesterhci.io/route'])
            if route['cidr']:
                break
        sleep(3)
    else:
        raise AssertionError(
            "VM network created but route info not available\n"
            f"API Status({code}): {data}"
        )

    yield dict(name=unique_name, cidr=route['cidr'], namespace=data['metadata']['namespace'])

    code, data = api_client.networks.delete(unique_name)
    endtime = datetime.now() + timedelta(seconds=wait_timeout)
    while endtime > datetime.now():
        code, data = api_client.networks.get(unique_name)
        if 404 == code:
            break
        sleep(3)
    else:
        raise AssertionError(
            f"Failed to remote VM network {unique_name} after {wait_timeout}s\n"
            f"API Status({code}): {data}"
        )

Classes

class TestVMNetwork
Expand source code
@pytest.mark.p0
@pytest.mark.networks
@pytest.mark.virtualmachines
class TestVMNetwork:
    @pytest.mark.dependency(name="add_vlan")
    def test_add_vlan(
        self, api_client, ssh_keypair, vm_mgmt_static, vm_checker, vm_shell_from_host, vm_network,
        minimal_vm, gen_ifconfig
    ):
        # clean cloud-init for rerun, and get the correct ifname
        (unique_vm_name, ssh_user), (_, pri_key) = minimal_vm, ssh_keypair
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')
        with vm_shell_from_host(host_ip, vm_ip, ssh_user, pkey=pri_key) as sh:
            cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh)
            assert cloud_inited, (out, err)
            out, err = sh.exec_command("sudo cloud-init clean")
            out, err = sh.exec_command("sudo cloud-init status")
            assert "not run" in out, (out, err)
            out, err = sh.exec_command("ip --json a s")
            assert not err
        ifname = next(i['ifname'] for i in json.loads(out) if i['link_type'] != 'loopback')
        # https://cloudinit.readthedocs.io/en/22.4.2/topics/network-config-format-v1.html#subnet-ip
        # and https://harvesterhci.io/kb/multiple-nics-vm-connectivity/#cloud-init-config
        nic_config = [gen_ifconfig(ifname, idx=i) for i in range(2)]
        nic_config[0]['subnets'] = [vm_mgmt_static]

        # add vlan NIC and network data then restart VM
        code, data = api_client.vms.get(unique_vm_name)
        vm_spec = api_client.vms.Spec.from_dict(data)
        vm_spec.add_network('nic-1', f"{vm_network['namespace']}/{vm_network['name']}")
        vm_spec.network_data = "#cloud-config\n" + yaml.dump({
            "version": 1,
            "config": nic_config
        })
        code, data = api_client.vms.update(unique_vm_name, vm_spec)
        assert 200 == code, (code, data)
        vm_restarted, ctx = vm_checker.wait_restarted(unique_vm_name)
        assert vm_restarted, (
            f"Failed to Restart VM({unique_vm_name}),"
            f" timed out while executing {ctx.callee!r}"
        )
        vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
        assert vm_got_ips, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] == 'default')
        code, data = api_client.hosts.get(data['status']['nodeName'])
        host_ip = next(addr['address'] for addr in data['status']['addresses']
                       if addr['type'] == 'InternalIP')
        with vm_shell_from_host(host_ip, vm_ip, ssh_user, pkey=pri_key) as sh:
            cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh)
            assert cloud_inited and not err, (out, err)
            out, err = sh.exec_command("ip --json -4 a s")
        ips = [j['local'] for i in json.loads(out) for j in i['addr_info']]
        vlan_ip_range = ip_network(vm_network['cidr'])

        def get_vlan_ip(ctx):
            if ctx.callee == 'vm.get_status':
                return all(iface.get('ipAddress') for iface in ctx.data['status']['interfaces']
                           if iface['name'] != 'default')
            return True
        # ???: status data from API will have delay a bit
        vm_got_ips, (code, data) = vm_checker.wait_interfaces(unique_vm_name, callback=get_vlan_ip)
        assert vm_got_ips, (code, data)
        vm_vlan_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                          if iface['name'] != 'default')
        assert ip_address(vm_vlan_ip) in vlan_ip_range and vm_vlan_ip in ips

    @pytest.mark.dependency(depends=["add_vlan"])
    def test_ssh_connection(
        self, api_client, ssh_keypair, vm_checker, vm_network, minimal_vm
    ):
        (unique_vm_name, ssh_user), (_, pri_key) = minimal_vm, ssh_keypair
        vm_started, (code, data) = vm_checker.wait_interfaces(unique_vm_name)
        assert vm_started, (
            f"Failed to Start VM({unique_vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] != 'default')
        try:
            with vm_checker.wait_ssh_connected(vm_ip, ssh_user, pkey=pri_key) as sh:
                out, err = sh.exec_command("ip -brief a s")
                assert vm_ip in out and not err
        except AssertionError as ex:
            raise ex
        except Exception as ex:
            raise AssertionError(
                f"Unable to login to VM via VLAN IP {vm_ip}"
            ) from ex

    def test_vms_on_same_vlan(
        self, api_client, ssh_keypair, vm_checker, vm_network, two_mirror_vms
    ):
        _, pri_key = ssh_keypair
        vm_names, ssh_user = two_mirror_vms

        def get_vlan_ip(ctx):
            if ctx.callee == 'vm.get_status':
                return all(iface.get('ipAddress') for iface in ctx.data['status']['interfaces']
                           if iface['name'] != 'default')
            return True
        # Verify VM having IP which belongs to VLAN
        vm_info, vlan_ip_range = [], ip_network(vm_network['cidr'])
        for vm_name in vm_names:
            vm_got_ips, (code, data) = vm_checker.wait_interfaces(vm_name, callback=get_vlan_ip)
            assert vm_got_ips, (
                f"Failed to Start VM({vm_name}) with errors:\n"
                f"Status: {data.get('status')}\n"
                f"API Status({code}): {data}"
            )
            vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                         if iface['name'] != 'default')
            assert ip_address(vm_ip) in vlan_ip_range
            vm_info.append((vm_name, vm_ip))

        # verify Ping from each
        for (src_name, src_ip), (dst_name, dst_ip) in zip(vm_info, vm_info[::-1]):
            try:
                with vm_checker.wait_ssh_connected(src_ip, ssh_user, pkey=pri_key) as sh:
                    out, err = sh.exec_command(f"ping -c5 {dst_ip}")
                    assert '100% packet loss' not in out, (
                        f"Failed to ping VM({dst_name!r}, {dst_ip}) <- VM({src_name!r}, {src_ip})"
                    )
            except AssertionError as ex:
                raise ex
            except Exception as ex:
                raise AssertionError(
                    f"Unable to login to VM via VLAN IP {src_ip}"
                ) from ex

Class variables

var pytestmark

Methods

def test_add_vlan(self,
api_client,
ssh_keypair,
vm_mgmt_static,
vm_checker,
vm_shell_from_host,
vm_network,
minimal_vm,
gen_ifconfig)
Expand source code
@pytest.mark.dependency(name="add_vlan")
def test_add_vlan(
    self, api_client, ssh_keypair, vm_mgmt_static, vm_checker, vm_shell_from_host, vm_network,
    minimal_vm, gen_ifconfig
):
    # clean cloud-init for rerun, and get the correct ifname
    (unique_vm_name, ssh_user), (_, pri_key) = minimal_vm, ssh_keypair
    vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
    assert vm_got_ips, (
        f"Failed to Start VM({unique_vm_name}) with errors:\n"
        f"Status: {data.get('status')}\n"
        f"API Status({code}): {data}"
    )
    vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                 if iface['name'] == 'default')
    code, data = api_client.hosts.get(data['status']['nodeName'])
    host_ip = next(addr['address'] for addr in data['status']['addresses']
                   if addr['type'] == 'InternalIP')
    with vm_shell_from_host(host_ip, vm_ip, ssh_user, pkey=pri_key) as sh:
        cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh)
        assert cloud_inited, (out, err)
        out, err = sh.exec_command("sudo cloud-init clean")
        out, err = sh.exec_command("sudo cloud-init status")
        assert "not run" in out, (out, err)
        out, err = sh.exec_command("ip --json a s")
        assert not err
    ifname = next(i['ifname'] for i in json.loads(out) if i['link_type'] != 'loopback')
    # https://cloudinit.readthedocs.io/en/22.4.2/topics/network-config-format-v1.html#subnet-ip
    # and https://harvesterhci.io/kb/multiple-nics-vm-connectivity/#cloud-init-config
    nic_config = [gen_ifconfig(ifname, idx=i) for i in range(2)]
    nic_config[0]['subnets'] = [vm_mgmt_static]

    # add vlan NIC and network data then restart VM
    code, data = api_client.vms.get(unique_vm_name)
    vm_spec = api_client.vms.Spec.from_dict(data)
    vm_spec.add_network('nic-1', f"{vm_network['namespace']}/{vm_network['name']}")
    vm_spec.network_data = "#cloud-config\n" + yaml.dump({
        "version": 1,
        "config": nic_config
    })
    code, data = api_client.vms.update(unique_vm_name, vm_spec)
    assert 200 == code, (code, data)
    vm_restarted, ctx = vm_checker.wait_restarted(unique_vm_name)
    assert vm_restarted, (
        f"Failed to Restart VM({unique_vm_name}),"
        f" timed out while executing {ctx.callee!r}"
    )
    vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default'])
    assert vm_got_ips, (
        f"Failed to Start VM({unique_vm_name}) with errors:\n"
        f"Status: {data.get('status')}\n"
        f"API Status({code}): {data}"
    )
    vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                 if iface['name'] == 'default')
    code, data = api_client.hosts.get(data['status']['nodeName'])
    host_ip = next(addr['address'] for addr in data['status']['addresses']
                   if addr['type'] == 'InternalIP')
    with vm_shell_from_host(host_ip, vm_ip, ssh_user, pkey=pri_key) as sh:
        cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh)
        assert cloud_inited and not err, (out, err)
        out, err = sh.exec_command("ip --json -4 a s")
    ips = [j['local'] for i in json.loads(out) for j in i['addr_info']]
    vlan_ip_range = ip_network(vm_network['cidr'])

    def get_vlan_ip(ctx):
        if ctx.callee == 'vm.get_status':
            return all(iface.get('ipAddress') for iface in ctx.data['status']['interfaces']
                       if iface['name'] != 'default')
        return True
    # ???: status data from API will have delay a bit
    vm_got_ips, (code, data) = vm_checker.wait_interfaces(unique_vm_name, callback=get_vlan_ip)
    assert vm_got_ips, (code, data)
    vm_vlan_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                      if iface['name'] != 'default')
    assert ip_address(vm_vlan_ip) in vlan_ip_range and vm_vlan_ip in ips
def test_ssh_connection(self, api_client, ssh_keypair, vm_checker, vm_network, minimal_vm)
Expand source code
@pytest.mark.dependency(depends=["add_vlan"])
def test_ssh_connection(
    self, api_client, ssh_keypair, vm_checker, vm_network, minimal_vm
):
    (unique_vm_name, ssh_user), (_, pri_key) = minimal_vm, ssh_keypair
    vm_started, (code, data) = vm_checker.wait_interfaces(unique_vm_name)
    assert vm_started, (
        f"Failed to Start VM({unique_vm_name}) with errors:\n"
        f"Status: {data.get('status')}\n"
        f"API Status({code}): {data}"
    )
    vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                 if iface['name'] != 'default')
    try:
        with vm_checker.wait_ssh_connected(vm_ip, ssh_user, pkey=pri_key) as sh:
            out, err = sh.exec_command("ip -brief a s")
            assert vm_ip in out and not err
    except AssertionError as ex:
        raise ex
    except Exception as ex:
        raise AssertionError(
            f"Unable to login to VM via VLAN IP {vm_ip}"
        ) from ex
def test_vms_on_same_vlan(self, api_client, ssh_keypair, vm_checker, vm_network, two_mirror_vms)
Expand source code
def test_vms_on_same_vlan(
    self, api_client, ssh_keypair, vm_checker, vm_network, two_mirror_vms
):
    _, pri_key = ssh_keypair
    vm_names, ssh_user = two_mirror_vms

    def get_vlan_ip(ctx):
        if ctx.callee == 'vm.get_status':
            return all(iface.get('ipAddress') for iface in ctx.data['status']['interfaces']
                       if iface['name'] != 'default')
        return True
    # Verify VM having IP which belongs to VLAN
    vm_info, vlan_ip_range = [], ip_network(vm_network['cidr'])
    for vm_name in vm_names:
        vm_got_ips, (code, data) = vm_checker.wait_interfaces(vm_name, callback=get_vlan_ip)
        assert vm_got_ips, (
            f"Failed to Start VM({vm_name}) with errors:\n"
            f"Status: {data.get('status')}\n"
            f"API Status({code}): {data}"
        )
        vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces']
                     if iface['name'] != 'default')
        assert ip_address(vm_ip) in vlan_ip_range
        vm_info.append((vm_name, vm_ip))

    # verify Ping from each
    for (src_name, src_ip), (dst_name, dst_ip) in zip(vm_info, vm_info[::-1]):
        try:
            with vm_checker.wait_ssh_connected(src_ip, ssh_user, pkey=pri_key) as sh:
                out, err = sh.exec_command(f"ping -c5 {dst_ip}")
                assert '100% packet loss' not in out, (
                    f"Failed to ping VM({dst_name!r}, {dst_ip}) <- VM({src_name!r}, {src_ip})"
                )
        except AssertionError as ex:
            raise ex
        except Exception as ex:
            raise AssertionError(
                f"Unable to login to VM via VLAN IP {src_ip}"
            ) from ex