Module harvester_e2e_tests.integrations.test_9_rancher_integration

Functions

def csi_deployment(unique_name)
def harvester_cloud_credential(api_client, rancher_api_client, harvester_mgmt_cluster, unique_name)
def harvester_mgmt_cluster(api_client, rancher_api_client, unique_name, polling_for)

Rancher creates Harvester entry (Import Existing)

def ip_pool(request, api_client, unique_name, vlan_network)
def lb_service(request, api_client, unique_name, nginx_deployment, ip_pool)
def machine_count(request)
def nginx_deployment(unique_name)
def rke1_cluster(unique_name, rancher_api_client, machine_count, rke1_version)
def rke2_cluster(unique_name, rancher_api_client, machine_count, rke2_version)
def test_add_project_owner_user(api_client, rancher_api_client, unique_name, wait_timeout, harvester_mgmt_cluster)
def test_import_harvester(api_client, rancher_api_client, harvester_mgmt_cluster, polling_for)
def ubuntu_image(api_client, unique_name, image_ubuntu, polling_for)
def vlan_network(request, api_client)

Classes

class TestRKE1
Expand source code
@pytest.mark.p0
@pytest.mark.rancher
@pytest.mark.rke1
@pytest.mark.usefixtures("rke1_cluster")
class TestRKE1:
    @pytest.mark.dependency(depends=["import_harvester"], name="create_rke1")
    def test_create_rke1(self, rancher_api_client, unique_name, harvester_mgmt_cluster,
                         rancher_wait_timeout,
                         rke1_cluster, harvester_cloud_credential,
                         ubuntu_image, vlan_network, polling_for):
        code, data = rancher_api_client.kube_configs.create(
            rke1_cluster['name'],
            harvester_mgmt_cluster['id']
        )
        assert 200 == code, f"Failed to create harvester kubeconfig with error: {code}, {data}"
        assert data.strip(), f"Harvester kubeconfig should not be empty: {code}, {data}"
        kubeconfig = data

        code, data = rancher_api_client.node_templates.create(
            name=unique_name,
            cpus=2,
            mems=4,
            disks=40,
            image_id=ubuntu_image['id'],
            network_id=vlan_network['name'],
            ssh_user=ubuntu_image['ssh_user'],
            cloud_credential_id=harvester_cloud_credential['id'],
            user_data=(
                "#cloud-config\n"
                "password: test\n"
                "chpasswd:\n"
                "    expire: false\n"
                "ssh_pwauth: true\n"
            ),
        )
        assert 201 == code, (
            f"Failed to create NodeTemplate {unique_name} with error: {code}, {data}"
        )

        node_template_id = data['id']

        code, data = rancher_api_client.clusters.create(
            rke1_cluster['name'], rke1_cluster['k8s_version'], kubeconfig
        )
        assert 201 == code, (
            f"Failed to create cluster {rke1_cluster['name']} with error: {code}, {data}"
        )

        # update fixture value
        rke1_cluster['id'] = data['id']

        # check cluster created and ready for use
        polling_for(
            f"cluster {rke1_cluster['name']} to be ready",
            lambda code, data:
                200 == code and
                "RKESecretsMigrated" in [c['type'] for c in data['conditions']],
            rancher_api_client.clusters.get, rke1_cluster['id'],
            timeout=rancher_wait_timeout
        )

        code, data = rancher_api_client.node_pools.create(
            cluster_id=rke1_cluster['id'],
            node_template_id=node_template_id,
            hostname_prefix=f"{rke1_cluster['name']}-",
            quantity=rke1_cluster['machine_count']
        )
        assert 201 == code, (
            f"Failed to create NodePools for cluster {rke1_cluster['name']}\n"
            f"API Status({code}): {data}"
        )

        polling_for(
            f"MgmtCluster {rke1_cluster['name']} to be ready",
            lambda code, data: code == 200 and data.get('status', {}).get('ready', False),
            rancher_api_client.mgmt_clusters.get, rke1_cluster['id'],
            timeout=rancher_wait_timeout
        )

    @pytest.mark.dependency(depends=["create_rke1"])
    def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster,
                        unique_name, polling_for):
        cluster_id = harvester_mgmt_cluster['id']
        capi = rancher_api_client.clusters.explore(cluster_id)

        # Create PVC
        size = "1Gi"
        spec = capi.pvcs.Spec(size)
        code, data = capi.pvcs.create(unique_name, spec)
        assert 201 == code, (code, data)

        # Verify PVC is created
        code, data = polling_for(
            f"PVC {unique_name} to be in Bound phase",
            lambda code, data: "Bound" == data['status'].get('phase'),
            capi.pvcs.get, unique_name
        )

        # Verify the PV for created PVC
        pv_code, pv_data = capi.pvs.get(data['spec']['volumeName'])
        assert 200 == pv_code, (
            f"Relevant PV is NOT available for created PVC's PV({data['spec']['volumeName']})\n"
            f"Response data of PV: {data}"
        )

        # Verify size of the PV is aligned to requested size of PVC
        assert size == pv_data['spec']['capacity']['storage'], (
            "Size of the PV is NOT aligned to requested size of PVC,"
            f" expected: {size}, PV's size: {pv_data['spec']['capacity']['storage']}\n"
            f"Response data of PV: {data}"
        )

        # Verify PVC's size
        created_spec = capi.pvcs.Spec.from_dict(data)
        assert size == spec.size, (
            f"Size is NOT correct in created PVC, expected: {size}, created: {spec.size}\n"
            f"Response data: {data}"
        )

        # Verify the storage class exists
        sc_code, sc_data = capi.scs.get(created_spec.storage_cls)
        assert 200 == sc_code, (
            f"Storage Class is NOT exists for created PVC\n"
            f"Created PVC Spec: {data}\n"
            f"SC Status({sc_code}): {sc_data}"
        )

        # verify the storage class is marked `default`
        assert 'true' == sc_data['metadata']['annotations'][capi.scs.DEFAULT_KEY], (
            f"Storage Class is NOT the DEFAULT for created PVC\n"
            f"Requested Storage Class: {spec.storage_cls!r}"
            f"Created PVC Spec: {data}\n"
            f"SC Status({sc_code}): {sc_data}"
        )

        # teardown
        capi.pvcs.delete(unique_name)

    # harvester-cloud-provider
    @pytest.mark.dependency(depends=["create_rke1"], name="cloud_provider_chart")
    def test_cloud_provider_chart(self, rancher_api_client, rke1_cluster, polling_for):
        chart, deployment = "harvester-cloud-provider", "harvester-cloud-provider"
        polling_for(
            f"chart {chart} to be create",
            lambda code, data:
                201 == code,
            rancher_api_client.charts.create,
                rke1_cluster['id'], "kube-system", chart,
            timeout=60
        )
        # Polling on creation for possible 500 error in Rancher Apps
        # * https://github.com/rancher/rancher/issues/37610
        # * https://github.com/rancher/rancher/issues/43036

        polling_for(
            f"chart {chart} to be ready",
            lambda code, data:
                200 == code and
                "deployed" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.charts.get,
                rke1_cluster['id'], "kube-system", chart
        )
        polling_for(
            f"deployment {deployment} to be ready",
            lambda code, data:
                200 == code and
                "active" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.cluster_deployments.get,
                rke1_cluster['id'], "kube-system", deployment
        )

    @pytest.mark.dependency(depends=["cloud_provider_chart"], name="deploy_nginx")
    def test_deploy_nginx(self, rancher_api_client, rke1_cluster, nginx_deployment, polling_for):
        code, data = rancher_api_client.cluster_deployments.create(
            rke1_cluster['id'], nginx_deployment['namespace'],
            nginx_deployment['name'], nginx_deployment['image']
        )
        assert 201 == code, (
            f"Fail to deploy {nginx_deployment['name']} on {rke1_cluster['name']}\n"
            f"API Response: {code}, {data}"
        )

        polling_for(
            f"deployment {nginx_deployment['name']} to be ready",
            lambda code, data:
                200 == code and
                "active" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.cluster_deployments.get,
                rke1_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name']
        )

    @pytest.mark.dependency(depends=["deploy_nginx"])
    def test_load_balancer_service(self, rancher_api_client, rke1_cluster, nginx_deployment,
                                   lb_service, polling_for):
        # create LB service
        code, data = rancher_api_client.cluster_services.create(
            rke1_cluster['id'], lb_service["data"]
        )
        assert 201 == code, (
            f"Fail to create {lb_service['name']} for {nginx_deployment['name']}\n"
            f"API Response: {code}, {data}"
        )

        # check service active
        code, data = polling_for(
            f"service {lb_service['name']} to be ready",
            lambda code, data:
                200 == code and
                "active" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.cluster_services.get, rke1_cluster['id'], lb_service['name']
        )

        # check Nginx can be queired via LB
        try:
            ingress_ip = data["status"]["loadBalancer"]["ingress"][0]['ip']
            ingress_port = data['spec']['ports'][0]['port']
            ingress_url = f"http://{ingress_ip}:{ingress_port}"
        except Exception as e:
            raise AssertionError(
                f"Fail to get ingress info from {lb_service['name']}\n"
                f"Got error: {e}\n"
                f"Service data: {data}"
            )
        resp = rancher_api_client.session.get(ingress_url)
        assert resp.ok and "Welcome to nginx" in resp.text, (
            f"Fail to query load balancer {lb_service['name']}\n"
            f"Got error: {resp.status_code}, {resp.text}\n"
            f"Service data: {data}"
        )

        # teardown
        rancher_api_client.cluster_services.delete(rke1_cluster['id'], lb_service["name"])

    # harvester-csi-driver
    @pytest.mark.dependency(depends=["create_rke1"], name="csi_driver_chart")
    def test_csi_driver_chart(self, rancher_api_client, rke1_cluster, polling_for):
        chart, deployment = "harvester-csi-driver", "harvester-csi-driver-controllers"
        polling_for(
            f"chart {chart} to be create",
            lambda code, data:
                201 == code,
            rancher_api_client.charts.create,
                rke1_cluster['id'], "kube-system", chart,
            timeout=60
        )
        # Polling on creation for possible 500 error in Rancher Apps
        # * https://github.com/rancher/rancher/issues/37610
        # * https://github.com/rancher/rancher/issues/43036

        polling_for(
            f"chart {chart} to be ready",
            lambda code, data:
                200 == code and
                "deployed" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.charts.get,
                rke1_cluster['id'], "kube-system", chart
        )
        polling_for(
            f"deployment {deployment} to be ready",
            lambda code, data:
                200 == code and
                "active" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.cluster_deployments.get,
                rke1_cluster['id'], "kube-system", deployment
        )

    @pytest.mark.dependency(depends=["csi_driver_chart"], name="csi_deployment")
    def test_csi_deployment(self, rancher_api_client, rke1_cluster, csi_deployment, polling_for):
        # create pvc
        code, data = rancher_api_client.pvcs.create(rke1_cluster['id'], csi_deployment['pvc'])
        assert 201 == code, (
            f"Fail to create {csi_deployment['pvc']} on {rke1_cluster['name']}\n"
            f"API Response: {code}, {data}"
        )

        polling_for(
            f"PVC {csi_deployment['pvc']} to be ready",
            lambda code, data:
                200 == code and
                "bound" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.pvcs.get, rke1_cluster['id'], csi_deployment['pvc']
        )

        # deployment with csi
        code, data = rancher_api_client.cluster_deployments.create(
            rke1_cluster['id'], csi_deployment['namespace'],
            csi_deployment['name'], csi_deployment['image'], csi_deployment['pvc']
        )
        assert 201 == code, (
            f"Fail to deploy {csi_deployment['name']} on {rke1_cluster['name']}\n"
            f"API Response: {code}, {data}"
        )

        polling_for(
            f"deployment {csi_deployment['name']} to be ready",
            lambda code, data:
                200 == code and
                "active" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.cluster_deployments.get,
                rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name']
        )

    @pytest.mark.dependency(depends=["csi_deployment"])
    def test_delete_deployment(self, rancher_api_client, rke1_cluster, csi_deployment,
                               polling_for):
        code, data = rancher_api_client.cluster_deployments.delete(
            rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name']
        )
        assert 204 == code, (
            f"Failed to delete deployment {csi_deployment['name']} with error: {code}, {data}"
        )

        polling_for(
            f"deployment {csi_deployment['name']} to be deleted",
            lambda code, data:
                code == 404,
            rancher_api_client.cluster_deployments.get,
                rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name']
        )

        # teardown
        rancher_api_client.pvcs.delete(rke1_cluster['id'], csi_deployment['pvc'])

    @pytest.mark.dependency(depends=["create_rke1"])
    def test_delete_rke1(self, api_client, rancher_api_client, rke1_cluster,
                         rancher_wait_timeout, polling_for):
        code, data = rancher_api_client.mgmt_clusters.delete(rke1_cluster['id'])
        assert 200 == code, (
            f"Failed to delete RKE2 MgmtCluster {rke1_cluster['name']} with error: {code}, {data}"
        )

        def _remaining_vm_cnt() -> int:
            # in RKE1, when the cluster is deleted, VMs may still in Terminating status
            code, data = api_client.vms.get()
            remaining_vm_cnt = 0
            for d in data.get('data', []):
                vm_name = d.get('metadata', {}).get('name', "")
                if vm_name.startswith(f"{rke1_cluster['name']}-"):
                    remaining_vm_cnt += 1
            return remaining_vm_cnt

        polling_for(
            f"cluster {rke1_cluster['name']} to be deleted",
            lambda code, data: code == 404 and _remaining_vm_cnt() == 0,
            rancher_api_client.clusters.get, rke1_cluster['id'],
            timeout=rancher_wait_timeout
        )

Class variables

var pytestmark

Methods

def test_cloud_provider_chart(self, rancher_api_client, rke1_cluster, polling_for)
def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster, unique_name, polling_for)
def test_create_rke1(self, rancher_api_client, unique_name, harvester_mgmt_cluster, rancher_wait_timeout, rke1_cluster, harvester_cloud_credential, ubuntu_image, vlan_network, polling_for)
def test_csi_deployment(self, rancher_api_client, rke1_cluster, csi_deployment, polling_for)
def test_csi_driver_chart(self, rancher_api_client, rke1_cluster, polling_for)
def test_delete_deployment(self, rancher_api_client, rke1_cluster, csi_deployment, polling_for)
def test_delete_rke1(self, api_client, rancher_api_client, rke1_cluster, rancher_wait_timeout, polling_for)
def test_deploy_nginx(self, rancher_api_client, rke1_cluster, nginx_deployment, polling_for)
def test_load_balancer_service(self, rancher_api_client, rke1_cluster, nginx_deployment, lb_service, polling_for)
class TestRKE2
Expand source code
@pytest.mark.p0
@pytest.mark.rancher
@pytest.mark.rke2
@pytest.mark.usefixtures("rke2_cluster")
class TestRKE2:
    @pytest.mark.dependency(depends=["import_harvester"], name="create_rke2")
    def test_create_rke2(self, rancher_api_client, unique_name, harvester_mgmt_cluster,
                         harvester_cloud_credential, rke2_cluster, ubuntu_image, vlan_network,
                         rancher_wait_timeout, polling_for):
        # Create Harvester kubeconfig for this RKE2 cluster
        code, data = rancher_api_client.kube_configs.create(
            rke2_cluster['name'],
            harvester_mgmt_cluster['id']
        )
        assert 200 == code, (
            f"Failed to create harvester kubeconfig for rke2 with error: {code}, {data}"
        )
        assert "" != data, (
            f"Harvester kubeconfig for rke2 should not be empty: {code}, {data}"
        )
        kubeconfig = data

        # Create credential for this RKE2 cluster
        code, data = rancher_api_client.secrets.create(
            name=unique_name,
            data={
                "credential": kubeconfig[1:-1].replace("\\n", "\n")
            },
            annotations={
                "v2prov-secret-authorized-for-cluster": rke2_cluster['name'],
                "v2prov-authorized-secret-deletes-on-cluster-removal": "true"
            }
        )
        assert 201 == code, (
            f"Failed to create secret with error: {code}, {data}"
        )
        cloud_provider_config_id = f"{data['metadata']['namespace']}:{data['metadata']['name']}"

        # Create RKE2 cluster spec
        code, data = rancher_api_client.harvester_configs.create(
            name=unique_name,
            cpus="2",
            mems="4",
            disks="40",
            image_id=ubuntu_image['id'],
            network_id=vlan_network['name'],
            ssh_user=ubuntu_image['ssh_user'],
            user_data=(
                "#cloud-config\n"
                "password: test\n"
                "chpasswd:\n"
                "    expire: false\n"
                "ssh_pwauth: true\n"
            ),
        )
        assert 201 == code, (
            f"Failed to create harvester config with error: {code}, {data}"
        )

        # Create RKE2 cluster
        code, data = rancher_api_client.mgmt_clusters.create(
            name=rke2_cluster['name'],
            cloud_provider_config_id=cloud_provider_config_id,
            hostname_prefix=f"{rke2_cluster['name']}-",
            harvester_config_name=unique_name,
            k8s_version=rke2_cluster['k8s_version'],
            cloud_credential_id=harvester_cloud_credential['id'],
            quantity=rke2_cluster['machine_count']
        )
        assert 201 == code, (
            f"Failed to create RKE2 MgmtCluster {unique_name} with error: {code}, {data}"
        )

        code, data = polling_for(
            f"cluster {rke2_cluster['name']} to be ready",
            lambda code, data:
                "active" == data['metadata']['state']['name'] and
                "Ready" in data['metadata']['state']['message'],
            rancher_api_client.mgmt_clusters.get, rke2_cluster['name'],
            timeout=rancher_wait_timeout
        )

        # update fixture value
        rke2_cluster['id'] = data["status"]["clusterName"]

        # Check deployments
        testees = ["harvester-cloud-provider", "harvester-csi-driver-controllers"]
        polling_for(
            f"harvester deployments on {rke2_cluster['name']} to be ready",
            lambda code, data:
                200 == code and
                "active" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.cluster_deployments.get,
                rke2_cluster['id'], "kube-system", testees
        )

    @pytest.mark.dependency(depends=["create_rke2"])
    def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster,
                        unique_name, polling_for):
        cluster_id = harvester_mgmt_cluster['id']
        capi = rancher_api_client.clusters.explore(cluster_id)

        # Create PVC
        size = "1Gi"
        spec = capi.pvcs.Spec(size)
        code, data = capi.pvcs.create(unique_name, spec)
        assert 201 == code, (code, data)

        # Verify PVC is created
        code, data = polling_for(
            f"PVC {unique_name} to be in Bound phase",
            lambda code, data: "Bound" == data['status'].get('phase'),
            capi.pvcs.get, unique_name
        )

        # Verify the PV for created PVC
        pv_code, pv_data = capi.pvs.get(data['spec']['volumeName'])
        assert 200 == pv_code, (
            f"Relevant PV is NOT available for created PVC's PV({data['spec']['volumeName']})\n"
            f"Response data of PV: {data}"
        )

        # Verify size of the PV is aligned to requested size of PVC
        assert size == pv_data['spec']['capacity']['storage'], (
            "Size of the PV is NOT aligned to requested size of PVC,"
            f" expected: {size}, PV's size: {pv_data['spec']['capacity']['storage']}\n"
            f"Response data of PV: {data}"
        )

        # Verify PVC's size
        created_spec = capi.pvcs.Spec.from_dict(data)
        assert size == spec.size, (
            f"Size is NOT correct in created PVC, expected: {size}, created: {spec.size}\n"
            f"Response data: {data}"
        )

        # Verify the storage class exists
        sc_code, sc_data = capi.scs.get(created_spec.storage_cls)
        assert 200 == sc_code, (
            f"Storage Class is NOT exists for created PVC\n"
            f"Created PVC Spec: {data}\n"
            f"SC Status({sc_code}): {sc_data}"
        )

        # verify the storage class is marked `default`
        assert 'true' == sc_data['metadata']['annotations'][capi.scs.DEFAULT_KEY], (
            f"Storage Class is NOT the DEFAULT for created PVC\n"
            f"Requested Storage Class: {spec.storage_cls!r}"
            f"Created PVC Spec: {data}\n"
            f"SC Status({sc_code}): {sc_data}"
        )

        # teardown
        capi.pvcs.delete(unique_name)

    @pytest.mark.dependency(depends=["create_rke2"], name="csi_deployment")
    def test_csi_deployment(self, rancher_api_client, rke2_cluster, csi_deployment, polling_for):
        # create pvc
        code, data = rancher_api_client.pvcs.create(rke2_cluster['id'], csi_deployment['pvc'])
        assert 201 == code, (
            f"Fail to create {csi_deployment['pvc']} on {rke2_cluster['name']}\n"
            f"API Response: {code}, {data}"
        )

        polling_for(
            f"PVC {csi_deployment['pvc']} to be ready",
            lambda code, data:
                200 == code and
                "bound" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.pvcs.get, rke2_cluster['id'], csi_deployment['pvc']
        )

        # deployment with csi
        code, data = rancher_api_client.cluster_deployments.create(
            rke2_cluster['id'], csi_deployment['namespace'],
            csi_deployment['name'], csi_deployment['image'], csi_deployment['pvc']
        )
        assert 201 == code, (
            f"Fail to deploy {csi_deployment['name']} on {rke2_cluster['name']}\n"
            f"API Response: {code}, {data}"
        )

        polling_for(
            f"deployment {csi_deployment['name']} to be ready",
            lambda code, data:
                200 == code and
                "active" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.cluster_deployments.get,
                rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name']
        )

    @pytest.mark.dependency(depends=["csi_deployment"])
    def test_delete_deployment(self, rancher_api_client, rke2_cluster, csi_deployment,
                               polling_for):
        code, data = rancher_api_client.cluster_deployments.delete(
            rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name']
        )
        assert 204 == code, (
            f"Failed to delete deployment {csi_deployment['name']} with error: {code}, {data}"
        )

        polling_for(
            f"deployment {csi_deployment['name']} to be deleted",
            lambda code, data:
                code == 404,
            rancher_api_client.cluster_deployments.get,
                rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name']
        )

        # teardown
        rancher_api_client.pvcs.delete(rke2_cluster['id'], csi_deployment['pvc'])

    @pytest.mark.dependency(depends=["create_rke2"], name="deploy_nginx")
    def test_deploy_nginx(self, rancher_api_client, rke2_cluster, nginx_deployment, polling_for):
        code, data = rancher_api_client.cluster_deployments.create(
            rke2_cluster['id'], nginx_deployment['namespace'],
            nginx_deployment['name'], nginx_deployment['image']
        )
        assert 201 == code, (
            f"Fail to deploy {nginx_deployment['name']} on {rke2_cluster['name']}\n"
            f"API Response: {code}, {data}"
        )

        polling_for(
            f"deployment {nginx_deployment['name']} to be ready",
            lambda code, data:
                200 == code and
                "active" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.cluster_deployments.get,
                rke2_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name']
        )

    @pytest.mark.dependency(depends=["deploy_nginx"])
    def test_load_balancer_service(self, rancher_api_client, rke2_cluster, nginx_deployment,
                                   lb_service, polling_for):
        # create LB service
        code, data = rancher_api_client.cluster_services.create(
            rke2_cluster['id'], lb_service["data"]
        )
        assert 201 == code, (
            f"Fail to create {lb_service['name']} for {nginx_deployment['name']}\n"
            f"API Response: {code}, {data}"
        )

        # check service active
        code, data = polling_for(
            f"service {lb_service['name']} to be ready",
            lambda code, data:
                200 == code and
                "active" == data.get("metadata", {}).get("state", {}).get("name"),
            rancher_api_client.cluster_services.get, rke2_cluster['id'], lb_service['name']
        )

        # check Nginx can be queired via LB
        try:
            ingress = data["status"]["loadBalancer"]["ingress"][0]
            ingress_url = f"http://{ingress['ip']}:{ingress['ports'][0]['port']}"
        except Exception as e:
            raise AssertionError(
                f"Fail to get ingress info from {lb_service['name']}\n"
                f"Got error: {e}\n"
                f"Service data: {data}"
            )
        resp = rancher_api_client.session.get(ingress_url)
        assert resp.ok and "Welcome to nginx" in resp.text, (
            f"Fail to query load balancer {lb_service['name']}\n"
            f"Got error: {resp.status_code}, {resp.text}\n"
            f"Service data: {data}"
        )

        # teardown
        rancher_api_client.cluster_services.delete(rke2_cluster['id'], lb_service["name"])

    @pytest.mark.dependency(depends=["create_rke2"])
    def test_delete_rke2(self, api_client, rancher_api_client, rke2_cluster,
                         rancher_wait_timeout, polling_for):
        code, data = rancher_api_client.mgmt_clusters.delete(rke2_cluster['name'])
        assert 200 == code, (
            f"Failed to delete RKE2 MgmtCluster {rke2_cluster['name']} with error: {code}, {data}"
        )

        polling_for(
            f"cluster {rke2_cluster['name']} to be deleted",
            lambda code, data: 404 == code,
            rancher_api_client.mgmt_clusters.get, rke2_cluster['name'],
            timeout=rancher_wait_timeout
        )

        code, data = api_client.vms.get()
        remaining_vm_cnt = 0
        for d in data.get('data', []):
            vm_name = d.get('metadata', {}).get('name', "")
            if vm_name.startswith(f"{rke2_cluster['name']}-"):
                remaining_vm_cnt += 1
        assert 0 == remaining_vm_cnt, (f"Still have {remaining_vm_cnt} RKE2 VMs")

Class variables

var pytestmark

Methods

def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster, unique_name, polling_for)
def test_create_rke2(self, rancher_api_client, unique_name, harvester_mgmt_cluster, harvester_cloud_credential, rke2_cluster, ubuntu_image, vlan_network, rancher_wait_timeout, polling_for)
def test_csi_deployment(self, rancher_api_client, rke2_cluster, csi_deployment, polling_for)
def test_delete_deployment(self, rancher_api_client, rke2_cluster, csi_deployment, polling_for)
def test_delete_rke2(self, api_client, rancher_api_client, rke2_cluster, rancher_wait_timeout, polling_for)
def test_deploy_nginx(self, rancher_api_client, rke2_cluster, nginx_deployment, polling_for)
def test_load_balancer_service(self, rancher_api_client, rke2_cluster, nginx_deployment, lb_service, polling_for)