Module harvester_e2e_tests.integrations.test_9_rancher_integration
Functions
def csi_deployment(unique_name)
-
Expand source code
@pytest.fixture(scope='class') def csi_deployment(unique_name): yield { "namespace": "default", "name": f"csi-{unique_name}", "image": "nginx:latest", "pvc": f"pvc-{unique_name}" }
def harvester_cloud_credential(api_client, rancher_api_client, harvester_mgmt_cluster, unique_name)
-
Expand source code
@pytest.fixture(scope='module') def harvester_cloud_credential(api_client, rancher_api_client, harvester_mgmt_cluster, unique_name): code, data = rancher_api_client.clusters.generate_kubeconfig( harvester_mgmt_cluster['id'] ) assert 200 == code, ( f"Failed to create kubconfig with error: {code}, {data}" ) harvester_kubeconfig = data['config'] code, data = rancher_api_client.cloud_credentials.create( unique_name, harvester_kubeconfig, harvester_mgmt_cluster['id'] ) assert 201 == code, ( f"Failed to create cloud credential with error: {code}, {data}" ) code, data = rancher_api_client.cloud_credentials.get(data['id']) assert 200 == code, ( f"Failed to get cloud credential {data['id']} with error: {code}, {data}" ) yield data rancher_api_client.cloud_credentials.delete(data['id'])
def harvester_mgmt_cluster(api_client, rancher_api_client, unique_name, polling_for)
-
Expand source code
@pytest.fixture(scope='module') def harvester_mgmt_cluster(api_client, rancher_api_client, unique_name, polling_for): """ Rancher creates Harvester entry (Import Existing) """ cluster_name = f"hvst-{unique_name}" code, data = rancher_api_client.mgmt_clusters.create_harvester(cluster_name) assert 201 == code, ( f"Failed to create Harvester entry {cluster_name} with error: {code}, {data}" ) code, data = polling_for( f"finding clusterName in MgmtCluster {cluster_name}", lambda code, data: data.get('status', {}).get('clusterName'), rancher_api_client.mgmt_clusters.get, cluster_name ) yield { "name": cluster_name, "id": data['status']['clusterName'] # e.g. c-m-n6bsktxb } rancher_api_client.mgmt_clusters.delete(cluster_name) updates = dict(value="") api_client.settings.update("cluster-registration-url", updates)
Rancher creates Harvester entry (Import Existing)
def ip_pool(request, api_client, unique_name, vlan_network)
-
Expand source code
@pytest.fixture(scope="module") def ip_pool(request, api_client, unique_name, vlan_network): name = f"ippool-{unique_name}" ip_pool_subnet = request.config.getoption('--ip-pool-subnet') ip_pool_start = request.config.getoption('--ip-pool-start') ip_pool_end = request.config.getoption('--ip-pool-end') code, data = api_client.ippools.create( name, ip_pool_subnet, ip_pool_start, ip_pool_end, vlan_network["id"] ) assert 201 == code, ( f"Failed to create ip pool {name} with error: {code}, {data}" ) yield { "name": name, "subnet": ip_pool_subnet } api_client.ippools.delete(name)
def lb_service(request, api_client, unique_name, nginx_deployment, ip_pool)
-
Expand source code
@pytest.fixture(scope='function', params=["dhcp", "pool"]) def lb_service(request, api_client, unique_name, nginx_deployment, ip_pool): namespace = "default" name = f"lb-{unique_name}-{request.param}" data = { "type": "service", "metadata": { "namespace": namespace, "name": name, "annotations": { "cloudprovider.harvesterhci.io/ipam": request.param } }, "spec": { "type": "LoadBalancer", "sessionAffinity": None, "ports": [ { "name": "http", "port": 8080, "protocol": "TCP", "targetPort": 80 } ], "selector": { "name": nginx_deployment["name"] } } } yield { "namespace": namespace, "name": name, "data": data } code, data = api_client.loadbalancers.get() assert 200 == code, (code, data) lbs = data["data"] for lb in lbs: if name in lb["id"]: api_client.loadbalancers.delete(lb["id"]) break
def machine_count(request)
-
Expand source code
@pytest.fixture(scope="module", params=[1, pytest.param(3, marks=pytest.mark.skip(reason="Skip for low I/O env."))]) def machine_count(request): return request.param
def nginx_deployment(unique_name)
-
Expand source code
@pytest.fixture(scope='class') def nginx_deployment(unique_name): return { "namespace": "default", "name": f"nginx-{unique_name}", "image": "nginx:latest" }
def rke1_cluster(unique_name, rancher_api_client, machine_count, rke1_version)
-
Expand source code
@pytest.fixture(scope='class') def rke1_cluster(unique_name, rancher_api_client, machine_count, rke1_version): name = f"rke1-{unique_name}-{machine_count}" yield { "name": name, "id": "", # set in Test_RKE1::test_create_rke1, e.g. c-m-n6bsktxb "machine_count": machine_count, "k8s_version": rke1_version } rancher_api_client.mgmt_clusters.delete(name)
def rke2_cluster(unique_name, rancher_api_client, machine_count, rke2_version)
-
Expand source code
@pytest.fixture(scope='class') def rke2_cluster(unique_name, rancher_api_client, machine_count, rke2_version): name = f"rke2-{unique_name}-{machine_count}" yield { "name": name, "id": "", # set in Test_RKE2::test_create_rke2, e.g. c-m-n6bsktxb "machine_count": machine_count, "k8s_version": rke2_version } rancher_api_client.mgmt_clusters.delete(name)
def test_add_project_owner_user(api_client, rancher_api_client, unique_name, wait_timeout, harvester_mgmt_cluster)
-
Expand source code
@pytest.mark.p1 @pytest.mark.rancher @pytest.mark.dependency(depends=["import_harvester"]) def test_add_project_owner_user(api_client, rancher_api_client, unique_name, wait_timeout, harvester_mgmt_cluster): cluster_id = harvester_mgmt_cluster['id'] username, password = f"user-{unique_name}", unique_name spec = rancher_api_client.users.Spec(password) # create user code, data = rancher_api_client.users.create(username, spec) assert 201 == code, ( f"Failed to create user {username!r}\n" f"API Status({code}): {data}" ) uid, upids = data['id'], data['principalIds'] # add role `user` to user code, data = rancher_api_client.users.add_role(uid, 'user') assert 201 == code, ( f"Failed to add role 'user' for user {username!r}\n" f"API Status({code}): {data}" ) # Get `Default` project's uid cluster_api = rancher_api_client.clusters.explore(cluster_id) code, data = cluster_api.projects.get_by_name('Default') assert 200 == code, (code, data) project_id = data['id'] # add user to `Default` project as *project-owner* code, data = cluster_api.project_members.create(project_id, upids[0], "project-owner") assert 201 == code, (code, data) proj_muid = data['id'] # Login as the user endpoint = rancher_api_client.endpoint user_rapi = rancher_api_client.login(endpoint, username, password, ssl_verify=False) user_capi = user_rapi.clusters.explore(cluster_id) # Check user can only view the project he joined code, data = user_capi.projects.get() assert 200 == code, (code, data) assert 1 == len(data['data']), (code, data) # teardown cluster_api.project_members.delete(proj_muid) rancher_api_client.users.delete(uid)
def test_import_harvester(api_client, rancher_api_client, harvester_mgmt_cluster, polling_for)
-
Expand source code
@pytest.mark.p0 @pytest.mark.rancher @pytest.mark.dependency(name="import_harvester") def test_import_harvester(api_client, rancher_api_client, harvester_mgmt_cluster, polling_for): # Get cluster registration URL in Rancher's Virtualization Management code, data = polling_for( f"registration URL for the imported harvester {harvester_mgmt_cluster['name']}", lambda code, data: 200 == code and data.get('manifestUrl'), rancher_api_client.cluster_registration_tokens.get, harvester_mgmt_cluster['id'] ) # Set cluster-registration-url on Harvester updates = dict(value=data['manifestUrl']) code, data = api_client.settings.update("cluster-registration-url", updates) assert 200 == code, ( f"Failed to update Harvester's settings `cluster-registration-url`" f" with error: {code}, {data}" ) # Check Cluster becomes `active` in Rancher's Virtualization Management polling_for( "harvester to be ready", lambda code, data: "active" == data['metadata']['state']['name'] and "Ready" in data['metadata']['state']['message'], rancher_api_client.mgmt_clusters.get, harvester_mgmt_cluster['name'] )
def ubuntu_image(api_client, unique_name, image_ubuntu, polling_for)
-
Expand source code
@pytest.fixture(scope="module") def ubuntu_image(api_client, unique_name, image_ubuntu, polling_for): name = f"ubuntu-{unique_name}" code, data = api_client.images.create_by_url(name, image_ubuntu.url) assert 201 == code, ( f"Failed to upload ubuntu image with error: {code}, {data}" ) code, data = polling_for( f"image {name} to be ready", lambda code, data: data.get('status', {}).get('progress', None) == 100, api_client.images.get, name ) namespace = data['metadata']['namespace'] name = data['metadata']['name'] yield { "ssh_user": "ubuntu", "id": f"{namespace}/{name}" } api_client.images.delete(name)
def vlan_network(request, api_client)
-
Expand source code
@pytest.fixture(scope='module') def vlan_network(request, api_client): vlan_nic = request.config.getoption('--vlan-nic') vlan_id = request.config.getoption('--vlan-id') assert -1 != vlan_id, "Rancher integration test needs VLAN" api_client.clusternetworks.create(vlan_nic) api_client.clusternetworks.create_config(vlan_nic, vlan_nic, vlan_nic) network_name = f'vlan-network-{vlan_id}' code, data = api_client.networks.get(network_name) if code != 200: code, data = api_client.networks.create(network_name, vlan_id, cluster_network=vlan_nic) assert 201 == code, ( f"Failed to create network-attachment-definition {network_name} \ with error {code}, {data}" ) namespace = data['metadata']['namespace'] name = data['metadata']['name'] yield { "name": name, "id": f"{namespace}/{name}" } api_client.networks.delete(network_name)
Classes
class TestRKE1
-
Expand source code
@pytest.mark.p0 @pytest.mark.rancher @pytest.mark.rke1 @pytest.mark.usefixtures("rke1_cluster") class TestRKE1: @pytest.mark.dependency(depends=["import_harvester"], name="create_rke1") def test_create_rke1(self, rancher_api_client, unique_name, harvester_mgmt_cluster, rancher_wait_timeout, rke1_cluster, harvester_cloud_credential, ubuntu_image, vlan_network, polling_for): code, data = rancher_api_client.kube_configs.create( rke1_cluster['name'], harvester_mgmt_cluster['id'] ) assert 200 == code, f"Failed to create harvester kubeconfig with error: {code}, {data}" assert data.strip(), f"Harvester kubeconfig should not be empty: {code}, {data}" kubeconfig = data code, data = rancher_api_client.node_templates.create( name=unique_name, cpus=2, mems=4, disks=40, image_id=ubuntu_image['id'], network_id=vlan_network['name'], ssh_user=ubuntu_image['ssh_user'], cloud_credential_id=harvester_cloud_credential['id'], user_data=( "#cloud-config\n" "password: test\n" "chpasswd:\n" " expire: false\n" "ssh_pwauth: true\n" ), ) assert 201 == code, ( f"Failed to create NodeTemplate {unique_name} with error: {code}, {data}" ) node_template_id = data['id'] code, data = rancher_api_client.clusters.create( rke1_cluster['name'], rke1_cluster['k8s_version'], kubeconfig ) assert 201 == code, ( f"Failed to create cluster {rke1_cluster['name']} with error: {code}, {data}" ) # update fixture value rke1_cluster['id'] = data['id'] # check cluster created and ready for use polling_for( f"cluster {rke1_cluster['name']} to be ready", lambda code, data: 200 == code and "RKESecretsMigrated" in [c['type'] for c in data['conditions']], rancher_api_client.clusters.get, rke1_cluster['id'], timeout=rancher_wait_timeout ) code, data = rancher_api_client.node_pools.create( cluster_id=rke1_cluster['id'], node_template_id=node_template_id, hostname_prefix=f"{rke1_cluster['name']}-", quantity=rke1_cluster['machine_count'] ) assert 201 == code, ( f"Failed to create NodePools for cluster {rke1_cluster['name']}\n" f"API Status({code}): {data}" ) polling_for( f"MgmtCluster {rke1_cluster['name']} to be ready", lambda code, data: code == 200 and data.get('status', {}).get('ready', False), rancher_api_client.mgmt_clusters.get, rke1_cluster['id'], timeout=rancher_wait_timeout ) @pytest.mark.dependency(depends=["create_rke1"]) def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster, unique_name, polling_for): cluster_id = harvester_mgmt_cluster['id'] capi = rancher_api_client.clusters.explore(cluster_id) # Create PVC size = "1Gi" spec = capi.pvcs.Spec(size) code, data = capi.pvcs.create(unique_name, spec) assert 201 == code, (code, data) # Verify PVC is created code, data = polling_for( f"PVC {unique_name} to be in Bound phase", lambda code, data: "Bound" == data['status'].get('phase'), capi.pvcs.get, unique_name ) # Verify the PV for created PVC pv_code, pv_data = capi.pvs.get(data['spec']['volumeName']) assert 200 == pv_code, ( f"Relevant PV is NOT available for created PVC's PV({data['spec']['volumeName']})\n" f"Response data of PV: {data}" ) # Verify size of the PV is aligned to requested size of PVC assert size == pv_data['spec']['capacity']['storage'], ( "Size of the PV is NOT aligned to requested size of PVC," f" expected: {size}, PV's size: {pv_data['spec']['capacity']['storage']}\n" f"Response data of PV: {data}" ) # Verify PVC's size created_spec = capi.pvcs.Spec.from_dict(data) assert size == spec.size, ( f"Size is NOT correct in created PVC, expected: {size}, created: {spec.size}\n" f"Response data: {data}" ) # Verify the storage class exists sc_code, sc_data = capi.scs.get(created_spec.storage_cls) assert 200 == sc_code, ( f"Storage Class is NOT exists for created PVC\n" f"Created PVC Spec: {data}\n" f"SC Status({sc_code}): {sc_data}" ) # verify the storage class is marked `default` assert 'true' == sc_data['metadata']['annotations'][capi.scs.DEFAULT_KEY], ( f"Storage Class is NOT the DEFAULT for created PVC\n" f"Requested Storage Class: {spec.storage_cls!r}" f"Created PVC Spec: {data}\n" f"SC Status({sc_code}): {sc_data}" ) # teardown capi.pvcs.delete(unique_name) # harvester-cloud-provider @pytest.mark.dependency(depends=["create_rke1"], name="cloud_provider_chart") def test_cloud_provider_chart(self, rancher_api_client, rke1_cluster, polling_for): chart, deployment = "harvester-cloud-provider", "harvester-cloud-provider" polling_for( f"chart {chart} to be create", lambda code, data: 201 == code, rancher_api_client.charts.create, rke1_cluster['id'], "kube-system", chart, timeout=60 ) # Polling on creation for possible 500 error in Rancher Apps # * https://github.com/rancher/rancher/issues/37610 # * https://github.com/rancher/rancher/issues/43036 polling_for( f"chart {chart} to be ready", lambda code, data: 200 == code and "deployed" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.charts.get, rke1_cluster['id'], "kube-system", chart ) polling_for( f"deployment {deployment} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke1_cluster['id'], "kube-system", deployment ) @pytest.mark.dependency(depends=["cloud_provider_chart"], name="deploy_nginx") def test_deploy_nginx(self, rancher_api_client, rke1_cluster, nginx_deployment, polling_for): code, data = rancher_api_client.cluster_deployments.create( rke1_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name'], nginx_deployment['image'] ) assert 201 == code, ( f"Fail to deploy {nginx_deployment['name']} on {rke1_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"deployment {nginx_deployment['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke1_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name'] ) @pytest.mark.dependency(depends=["deploy_nginx"]) def test_load_balancer_service(self, rancher_api_client, rke1_cluster, nginx_deployment, lb_service, polling_for): # create LB service code, data = rancher_api_client.cluster_services.create( rke1_cluster['id'], lb_service["data"] ) assert 201 == code, ( f"Fail to create {lb_service['name']} for {nginx_deployment['name']}\n" f"API Response: {code}, {data}" ) # check service active code, data = polling_for( f"service {lb_service['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_services.get, rke1_cluster['id'], lb_service['name'] ) # check Nginx can be queired via LB try: ingress_ip = data["status"]["loadBalancer"]["ingress"][0]['ip'] ingress_port = data['spec']['ports'][0]['port'] ingress_url = f"http://{ingress_ip}:{ingress_port}" except Exception as e: raise AssertionError( f"Fail to get ingress info from {lb_service['name']}\n" f"Got error: {e}\n" f"Service data: {data}" ) resp = rancher_api_client.session.get(ingress_url) assert resp.ok and "Welcome to nginx" in resp.text, ( f"Fail to query load balancer {lb_service['name']}\n" f"Got error: {resp.status_code}, {resp.text}\n" f"Service data: {data}" ) # teardown rancher_api_client.cluster_services.delete(rke1_cluster['id'], lb_service["name"]) # harvester-csi-driver @pytest.mark.dependency(depends=["create_rke1"], name="csi_driver_chart") def test_csi_driver_chart(self, rancher_api_client, rke1_cluster, polling_for): chart, deployment = "harvester-csi-driver", "harvester-csi-driver-controllers" polling_for( f"chart {chart} to be create", lambda code, data: 201 == code, rancher_api_client.charts.create, rke1_cluster['id'], "kube-system", chart, timeout=60 ) # Polling on creation for possible 500 error in Rancher Apps # * https://github.com/rancher/rancher/issues/37610 # * https://github.com/rancher/rancher/issues/43036 polling_for( f"chart {chart} to be ready", lambda code, data: 200 == code and "deployed" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.charts.get, rke1_cluster['id'], "kube-system", chart ) polling_for( f"deployment {deployment} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke1_cluster['id'], "kube-system", deployment ) @pytest.mark.dependency(depends=["csi_driver_chart"], name="csi_deployment") def test_csi_deployment(self, rancher_api_client, rke1_cluster, csi_deployment, polling_for): # create pvc code, data = rancher_api_client.pvcs.create(rke1_cluster['id'], csi_deployment['pvc']) assert 201 == code, ( f"Fail to create {csi_deployment['pvc']} on {rke1_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"PVC {csi_deployment['pvc']} to be ready", lambda code, data: 200 == code and "bound" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.pvcs.get, rke1_cluster['id'], csi_deployment['pvc'] ) # deployment with csi code, data = rancher_api_client.cluster_deployments.create( rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name'], csi_deployment['image'], csi_deployment['pvc'] ) assert 201 == code, ( f"Fail to deploy {csi_deployment['name']} on {rke1_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"deployment {csi_deployment['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) @pytest.mark.dependency(depends=["csi_deployment"]) def test_delete_deployment(self, rancher_api_client, rke1_cluster, csi_deployment, polling_for): code, data = rancher_api_client.cluster_deployments.delete( rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) assert 204 == code, ( f"Failed to delete deployment {csi_deployment['name']} with error: {code}, {data}" ) polling_for( f"deployment {csi_deployment['name']} to be deleted", lambda code, data: code == 404, rancher_api_client.cluster_deployments.get, rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) # teardown rancher_api_client.pvcs.delete(rke1_cluster['id'], csi_deployment['pvc']) @pytest.mark.dependency(depends=["create_rke1"]) def test_delete_rke1(self, api_client, rancher_api_client, rke1_cluster, rancher_wait_timeout, polling_for): code, data = rancher_api_client.mgmt_clusters.delete(rke1_cluster['id']) assert 200 == code, ( f"Failed to delete RKE2 MgmtCluster {rke1_cluster['name']} with error: {code}, {data}" ) def _remaining_vm_cnt() -> int: # in RKE1, when the cluster is deleted, VMs may still in Terminating status code, data = api_client.vms.get() remaining_vm_cnt = 0 for d in data.get('data', []): vm_name = d.get('metadata', {}).get('name', "") if vm_name.startswith(f"{rke1_cluster['name']}-"): remaining_vm_cnt += 1 return remaining_vm_cnt polling_for( f"cluster {rke1_cluster['name']} to be deleted", lambda code, data: code == 404 and _remaining_vm_cnt() == 0, rancher_api_client.clusters.get, rke1_cluster['id'], timeout=rancher_wait_timeout )
Class variables
var pytestmark
Methods
def test_cloud_provider_chart(self, rancher_api_client, rke1_cluster, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["create_rke1"], name="cloud_provider_chart") def test_cloud_provider_chart(self, rancher_api_client, rke1_cluster, polling_for): chart, deployment = "harvester-cloud-provider", "harvester-cloud-provider" polling_for( f"chart {chart} to be create", lambda code, data: 201 == code, rancher_api_client.charts.create, rke1_cluster['id'], "kube-system", chart, timeout=60 ) # Polling on creation for possible 500 error in Rancher Apps # * https://github.com/rancher/rancher/issues/37610 # * https://github.com/rancher/rancher/issues/43036 polling_for( f"chart {chart} to be ready", lambda code, data: 200 == code and "deployed" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.charts.get, rke1_cluster['id'], "kube-system", chart ) polling_for( f"deployment {deployment} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke1_cluster['id'], "kube-system", deployment )
def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster, unique_name, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["create_rke1"]) def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster, unique_name, polling_for): cluster_id = harvester_mgmt_cluster['id'] capi = rancher_api_client.clusters.explore(cluster_id) # Create PVC size = "1Gi" spec = capi.pvcs.Spec(size) code, data = capi.pvcs.create(unique_name, spec) assert 201 == code, (code, data) # Verify PVC is created code, data = polling_for( f"PVC {unique_name} to be in Bound phase", lambda code, data: "Bound" == data['status'].get('phase'), capi.pvcs.get, unique_name ) # Verify the PV for created PVC pv_code, pv_data = capi.pvs.get(data['spec']['volumeName']) assert 200 == pv_code, ( f"Relevant PV is NOT available for created PVC's PV({data['spec']['volumeName']})\n" f"Response data of PV: {data}" ) # Verify size of the PV is aligned to requested size of PVC assert size == pv_data['spec']['capacity']['storage'], ( "Size of the PV is NOT aligned to requested size of PVC," f" expected: {size}, PV's size: {pv_data['spec']['capacity']['storage']}\n" f"Response data of PV: {data}" ) # Verify PVC's size created_spec = capi.pvcs.Spec.from_dict(data) assert size == spec.size, ( f"Size is NOT correct in created PVC, expected: {size}, created: {spec.size}\n" f"Response data: {data}" ) # Verify the storage class exists sc_code, sc_data = capi.scs.get(created_spec.storage_cls) assert 200 == sc_code, ( f"Storage Class is NOT exists for created PVC\n" f"Created PVC Spec: {data}\n" f"SC Status({sc_code}): {sc_data}" ) # verify the storage class is marked `default` assert 'true' == sc_data['metadata']['annotations'][capi.scs.DEFAULT_KEY], ( f"Storage Class is NOT the DEFAULT for created PVC\n" f"Requested Storage Class: {spec.storage_cls!r}" f"Created PVC Spec: {data}\n" f"SC Status({sc_code}): {sc_data}" ) # teardown capi.pvcs.delete(unique_name)
def test_create_rke1(self,
rancher_api_client,
unique_name,
harvester_mgmt_cluster,
rancher_wait_timeout,
rke1_cluster,
harvester_cloud_credential,
ubuntu_image,
vlan_network,
polling_for)-
Expand source code
@pytest.mark.dependency(depends=["import_harvester"], name="create_rke1") def test_create_rke1(self, rancher_api_client, unique_name, harvester_mgmt_cluster, rancher_wait_timeout, rke1_cluster, harvester_cloud_credential, ubuntu_image, vlan_network, polling_for): code, data = rancher_api_client.kube_configs.create( rke1_cluster['name'], harvester_mgmt_cluster['id'] ) assert 200 == code, f"Failed to create harvester kubeconfig with error: {code}, {data}" assert data.strip(), f"Harvester kubeconfig should not be empty: {code}, {data}" kubeconfig = data code, data = rancher_api_client.node_templates.create( name=unique_name, cpus=2, mems=4, disks=40, image_id=ubuntu_image['id'], network_id=vlan_network['name'], ssh_user=ubuntu_image['ssh_user'], cloud_credential_id=harvester_cloud_credential['id'], user_data=( "#cloud-config\n" "password: test\n" "chpasswd:\n" " expire: false\n" "ssh_pwauth: true\n" ), ) assert 201 == code, ( f"Failed to create NodeTemplate {unique_name} with error: {code}, {data}" ) node_template_id = data['id'] code, data = rancher_api_client.clusters.create( rke1_cluster['name'], rke1_cluster['k8s_version'], kubeconfig ) assert 201 == code, ( f"Failed to create cluster {rke1_cluster['name']} with error: {code}, {data}" ) # update fixture value rke1_cluster['id'] = data['id'] # check cluster created and ready for use polling_for( f"cluster {rke1_cluster['name']} to be ready", lambda code, data: 200 == code and "RKESecretsMigrated" in [c['type'] for c in data['conditions']], rancher_api_client.clusters.get, rke1_cluster['id'], timeout=rancher_wait_timeout ) code, data = rancher_api_client.node_pools.create( cluster_id=rke1_cluster['id'], node_template_id=node_template_id, hostname_prefix=f"{rke1_cluster['name']}-", quantity=rke1_cluster['machine_count'] ) assert 201 == code, ( f"Failed to create NodePools for cluster {rke1_cluster['name']}\n" f"API Status({code}): {data}" ) polling_for( f"MgmtCluster {rke1_cluster['name']} to be ready", lambda code, data: code == 200 and data.get('status', {}).get('ready', False), rancher_api_client.mgmt_clusters.get, rke1_cluster['id'], timeout=rancher_wait_timeout )
def test_csi_deployment(self, rancher_api_client, rke1_cluster, csi_deployment, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["csi_driver_chart"], name="csi_deployment") def test_csi_deployment(self, rancher_api_client, rke1_cluster, csi_deployment, polling_for): # create pvc code, data = rancher_api_client.pvcs.create(rke1_cluster['id'], csi_deployment['pvc']) assert 201 == code, ( f"Fail to create {csi_deployment['pvc']} on {rke1_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"PVC {csi_deployment['pvc']} to be ready", lambda code, data: 200 == code and "bound" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.pvcs.get, rke1_cluster['id'], csi_deployment['pvc'] ) # deployment with csi code, data = rancher_api_client.cluster_deployments.create( rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name'], csi_deployment['image'], csi_deployment['pvc'] ) assert 201 == code, ( f"Fail to deploy {csi_deployment['name']} on {rke1_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"deployment {csi_deployment['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] )
def test_csi_driver_chart(self, rancher_api_client, rke1_cluster, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["create_rke1"], name="csi_driver_chart") def test_csi_driver_chart(self, rancher_api_client, rke1_cluster, polling_for): chart, deployment = "harvester-csi-driver", "harvester-csi-driver-controllers" polling_for( f"chart {chart} to be create", lambda code, data: 201 == code, rancher_api_client.charts.create, rke1_cluster['id'], "kube-system", chart, timeout=60 ) # Polling on creation for possible 500 error in Rancher Apps # * https://github.com/rancher/rancher/issues/37610 # * https://github.com/rancher/rancher/issues/43036 polling_for( f"chart {chart} to be ready", lambda code, data: 200 == code and "deployed" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.charts.get, rke1_cluster['id'], "kube-system", chart ) polling_for( f"deployment {deployment} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke1_cluster['id'], "kube-system", deployment )
def test_delete_deployment(self, rancher_api_client, rke1_cluster, csi_deployment, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["csi_deployment"]) def test_delete_deployment(self, rancher_api_client, rke1_cluster, csi_deployment, polling_for): code, data = rancher_api_client.cluster_deployments.delete( rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) assert 204 == code, ( f"Failed to delete deployment {csi_deployment['name']} with error: {code}, {data}" ) polling_for( f"deployment {csi_deployment['name']} to be deleted", lambda code, data: code == 404, rancher_api_client.cluster_deployments.get, rke1_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) # teardown rancher_api_client.pvcs.delete(rke1_cluster['id'], csi_deployment['pvc'])
def test_delete_rke1(self, api_client, rancher_api_client, rke1_cluster, rancher_wait_timeout, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["create_rke1"]) def test_delete_rke1(self, api_client, rancher_api_client, rke1_cluster, rancher_wait_timeout, polling_for): code, data = rancher_api_client.mgmt_clusters.delete(rke1_cluster['id']) assert 200 == code, ( f"Failed to delete RKE2 MgmtCluster {rke1_cluster['name']} with error: {code}, {data}" ) def _remaining_vm_cnt() -> int: # in RKE1, when the cluster is deleted, VMs may still in Terminating status code, data = api_client.vms.get() remaining_vm_cnt = 0 for d in data.get('data', []): vm_name = d.get('metadata', {}).get('name', "") if vm_name.startswith(f"{rke1_cluster['name']}-"): remaining_vm_cnt += 1 return remaining_vm_cnt polling_for( f"cluster {rke1_cluster['name']} to be deleted", lambda code, data: code == 404 and _remaining_vm_cnt() == 0, rancher_api_client.clusters.get, rke1_cluster['id'], timeout=rancher_wait_timeout )
def test_deploy_nginx(self, rancher_api_client, rke1_cluster, nginx_deployment, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["cloud_provider_chart"], name="deploy_nginx") def test_deploy_nginx(self, rancher_api_client, rke1_cluster, nginx_deployment, polling_for): code, data = rancher_api_client.cluster_deployments.create( rke1_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name'], nginx_deployment['image'] ) assert 201 == code, ( f"Fail to deploy {nginx_deployment['name']} on {rke1_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"deployment {nginx_deployment['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke1_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name'] )
def test_load_balancer_service(self, rancher_api_client, rke1_cluster, nginx_deployment, lb_service, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["deploy_nginx"]) def test_load_balancer_service(self, rancher_api_client, rke1_cluster, nginx_deployment, lb_service, polling_for): # create LB service code, data = rancher_api_client.cluster_services.create( rke1_cluster['id'], lb_service["data"] ) assert 201 == code, ( f"Fail to create {lb_service['name']} for {nginx_deployment['name']}\n" f"API Response: {code}, {data}" ) # check service active code, data = polling_for( f"service {lb_service['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_services.get, rke1_cluster['id'], lb_service['name'] ) # check Nginx can be queired via LB try: ingress_ip = data["status"]["loadBalancer"]["ingress"][0]['ip'] ingress_port = data['spec']['ports'][0]['port'] ingress_url = f"http://{ingress_ip}:{ingress_port}" except Exception as e: raise AssertionError( f"Fail to get ingress info from {lb_service['name']}\n" f"Got error: {e}\n" f"Service data: {data}" ) resp = rancher_api_client.session.get(ingress_url) assert resp.ok and "Welcome to nginx" in resp.text, ( f"Fail to query load balancer {lb_service['name']}\n" f"Got error: {resp.status_code}, {resp.text}\n" f"Service data: {data}" ) # teardown rancher_api_client.cluster_services.delete(rke1_cluster['id'], lb_service["name"])
class TestRKE2
-
Expand source code
@pytest.mark.p0 @pytest.mark.rancher @pytest.mark.rke2 @pytest.mark.usefixtures("rke2_cluster") class TestRKE2: @pytest.mark.dependency(depends=["import_harvester"], name="create_rke2") def test_create_rke2(self, rancher_api_client, unique_name, harvester_mgmt_cluster, harvester_cloud_credential, rke2_cluster, ubuntu_image, vlan_network, rancher_wait_timeout, polling_for): # Create Harvester kubeconfig for this RKE2 cluster code, data = rancher_api_client.kube_configs.create( rke2_cluster['name'], harvester_mgmt_cluster['id'] ) assert 200 == code, ( f"Failed to create harvester kubeconfig for rke2 with error: {code}, {data}" ) assert "" != data, ( f"Harvester kubeconfig for rke2 should not be empty: {code}, {data}" ) kubeconfig = data # Create credential for this RKE2 cluster code, data = rancher_api_client.secrets.create( name=unique_name, data={ "credential": kubeconfig[1:-1].replace("\\n", "\n") }, annotations={ "v2prov-secret-authorized-for-cluster": rke2_cluster['name'], "v2prov-authorized-secret-deletes-on-cluster-removal": "true" } ) assert 201 == code, ( f"Failed to create secret with error: {code}, {data}" ) cloud_provider_config_id = f"{data['metadata']['namespace']}:{data['metadata']['name']}" # Create RKE2 cluster spec code, data = rancher_api_client.harvester_configs.create( name=unique_name, cpus="2", mems="4", disks="40", image_id=ubuntu_image['id'], network_id=vlan_network['name'], ssh_user=ubuntu_image['ssh_user'], user_data=( "#cloud-config\n" "password: test\n" "chpasswd:\n" " expire: false\n" "ssh_pwauth: true\n" ), ) assert 201 == code, ( f"Failed to create harvester config with error: {code}, {data}" ) # Create RKE2 cluster code, data = rancher_api_client.mgmt_clusters.create( name=rke2_cluster['name'], cloud_provider_config_id=cloud_provider_config_id, hostname_prefix=f"{rke2_cluster['name']}-", harvester_config_name=unique_name, k8s_version=rke2_cluster['k8s_version'], cloud_credential_id=harvester_cloud_credential['id'], quantity=rke2_cluster['machine_count'] ) assert 201 == code, ( f"Failed to create RKE2 MgmtCluster {unique_name} with error: {code}, {data}" ) code, data = polling_for( f"cluster {rke2_cluster['name']} to be ready", lambda code, data: "active" == data['metadata']['state']['name'] and "Ready" in data['metadata']['state']['message'], rancher_api_client.mgmt_clusters.get, rke2_cluster['name'], timeout=rancher_wait_timeout ) # update fixture value rke2_cluster['id'] = data["status"]["clusterName"] # Check deployments testees = ["harvester-cloud-provider", "harvester-csi-driver-controllers"] polling_for( f"harvester deployments on {rke2_cluster['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke2_cluster['id'], "kube-system", testees ) @pytest.mark.dependency(depends=["create_rke2"]) def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster, unique_name, polling_for): cluster_id = harvester_mgmt_cluster['id'] capi = rancher_api_client.clusters.explore(cluster_id) # Create PVC size = "1Gi" spec = capi.pvcs.Spec(size) code, data = capi.pvcs.create(unique_name, spec) assert 201 == code, (code, data) # Verify PVC is created code, data = polling_for( f"PVC {unique_name} to be in Bound phase", lambda code, data: "Bound" == data['status'].get('phase'), capi.pvcs.get, unique_name ) # Verify the PV for created PVC pv_code, pv_data = capi.pvs.get(data['spec']['volumeName']) assert 200 == pv_code, ( f"Relevant PV is NOT available for created PVC's PV({data['spec']['volumeName']})\n" f"Response data of PV: {data}" ) # Verify size of the PV is aligned to requested size of PVC assert size == pv_data['spec']['capacity']['storage'], ( "Size of the PV is NOT aligned to requested size of PVC," f" expected: {size}, PV's size: {pv_data['spec']['capacity']['storage']}\n" f"Response data of PV: {data}" ) # Verify PVC's size created_spec = capi.pvcs.Spec.from_dict(data) assert size == spec.size, ( f"Size is NOT correct in created PVC, expected: {size}, created: {spec.size}\n" f"Response data: {data}" ) # Verify the storage class exists sc_code, sc_data = capi.scs.get(created_spec.storage_cls) assert 200 == sc_code, ( f"Storage Class is NOT exists for created PVC\n" f"Created PVC Spec: {data}\n" f"SC Status({sc_code}): {sc_data}" ) # verify the storage class is marked `default` assert 'true' == sc_data['metadata']['annotations'][capi.scs.DEFAULT_KEY], ( f"Storage Class is NOT the DEFAULT for created PVC\n" f"Requested Storage Class: {spec.storage_cls!r}" f"Created PVC Spec: {data}\n" f"SC Status({sc_code}): {sc_data}" ) # teardown capi.pvcs.delete(unique_name) @pytest.mark.dependency(depends=["create_rke2"], name="csi_deployment") def test_csi_deployment(self, rancher_api_client, rke2_cluster, csi_deployment, polling_for): # create pvc code, data = rancher_api_client.pvcs.create(rke2_cluster['id'], csi_deployment['pvc']) assert 201 == code, ( f"Fail to create {csi_deployment['pvc']} on {rke2_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"PVC {csi_deployment['pvc']} to be ready", lambda code, data: 200 == code and "bound" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.pvcs.get, rke2_cluster['id'], csi_deployment['pvc'] ) # deployment with csi code, data = rancher_api_client.cluster_deployments.create( rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name'], csi_deployment['image'], csi_deployment['pvc'] ) assert 201 == code, ( f"Fail to deploy {csi_deployment['name']} on {rke2_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"deployment {csi_deployment['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) @pytest.mark.dependency(depends=["csi_deployment"]) def test_delete_deployment(self, rancher_api_client, rke2_cluster, csi_deployment, polling_for): code, data = rancher_api_client.cluster_deployments.delete( rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) assert 204 == code, ( f"Failed to delete deployment {csi_deployment['name']} with error: {code}, {data}" ) polling_for( f"deployment {csi_deployment['name']} to be deleted", lambda code, data: code == 404, rancher_api_client.cluster_deployments.get, rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) # teardown rancher_api_client.pvcs.delete(rke2_cluster['id'], csi_deployment['pvc']) @pytest.mark.dependency(depends=["create_rke2"], name="deploy_nginx") def test_deploy_nginx(self, rancher_api_client, rke2_cluster, nginx_deployment, polling_for): code, data = rancher_api_client.cluster_deployments.create( rke2_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name'], nginx_deployment['image'] ) assert 201 == code, ( f"Fail to deploy {nginx_deployment['name']} on {rke2_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"deployment {nginx_deployment['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke2_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name'] ) @pytest.mark.dependency(depends=["deploy_nginx"]) def test_load_balancer_service(self, rancher_api_client, rke2_cluster, nginx_deployment, lb_service, polling_for): # create LB service code, data = rancher_api_client.cluster_services.create( rke2_cluster['id'], lb_service["data"] ) assert 201 == code, ( f"Fail to create {lb_service['name']} for {nginx_deployment['name']}\n" f"API Response: {code}, {data}" ) # check service active code, data = polling_for( f"service {lb_service['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_services.get, rke2_cluster['id'], lb_service['name'] ) # check Nginx can be queired via LB try: ingress = data["status"]["loadBalancer"]["ingress"][0] ingress_url = f"http://{ingress['ip']}:{ingress['ports'][0]['port']}" except Exception as e: raise AssertionError( f"Fail to get ingress info from {lb_service['name']}\n" f"Got error: {e}\n" f"Service data: {data}" ) resp = rancher_api_client.session.get(ingress_url) assert resp.ok and "Welcome to nginx" in resp.text, ( f"Fail to query load balancer {lb_service['name']}\n" f"Got error: {resp.status_code}, {resp.text}\n" f"Service data: {data}" ) # teardown rancher_api_client.cluster_services.delete(rke2_cluster['id'], lb_service["name"]) @pytest.mark.dependency(depends=["create_rke2"]) def test_delete_rke2(self, api_client, rancher_api_client, rke2_cluster, rancher_wait_timeout, polling_for): code, data = rancher_api_client.mgmt_clusters.delete(rke2_cluster['name']) assert 200 == code, ( f"Failed to delete RKE2 MgmtCluster {rke2_cluster['name']} with error: {code}, {data}" ) polling_for( f"cluster {rke2_cluster['name']} to be deleted", lambda code, data: 404 == code, rancher_api_client.mgmt_clusters.get, rke2_cluster['name'], timeout=rancher_wait_timeout ) code, data = api_client.vms.get() remaining_vm_cnt = 0 for d in data.get('data', []): vm_name = d.get('metadata', {}).get('name', "") if vm_name.startswith(f"{rke2_cluster['name']}-"): remaining_vm_cnt += 1 assert 0 == remaining_vm_cnt, (f"Still have {remaining_vm_cnt} RKE2 VMs")
Class variables
var pytestmark
Methods
def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster, unique_name, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["create_rke2"]) def test_create_pvc(self, rancher_api_client, harvester_mgmt_cluster, unique_name, polling_for): cluster_id = harvester_mgmt_cluster['id'] capi = rancher_api_client.clusters.explore(cluster_id) # Create PVC size = "1Gi" spec = capi.pvcs.Spec(size) code, data = capi.pvcs.create(unique_name, spec) assert 201 == code, (code, data) # Verify PVC is created code, data = polling_for( f"PVC {unique_name} to be in Bound phase", lambda code, data: "Bound" == data['status'].get('phase'), capi.pvcs.get, unique_name ) # Verify the PV for created PVC pv_code, pv_data = capi.pvs.get(data['spec']['volumeName']) assert 200 == pv_code, ( f"Relevant PV is NOT available for created PVC's PV({data['spec']['volumeName']})\n" f"Response data of PV: {data}" ) # Verify size of the PV is aligned to requested size of PVC assert size == pv_data['spec']['capacity']['storage'], ( "Size of the PV is NOT aligned to requested size of PVC," f" expected: {size}, PV's size: {pv_data['spec']['capacity']['storage']}\n" f"Response data of PV: {data}" ) # Verify PVC's size created_spec = capi.pvcs.Spec.from_dict(data) assert size == spec.size, ( f"Size is NOT correct in created PVC, expected: {size}, created: {spec.size}\n" f"Response data: {data}" ) # Verify the storage class exists sc_code, sc_data = capi.scs.get(created_spec.storage_cls) assert 200 == sc_code, ( f"Storage Class is NOT exists for created PVC\n" f"Created PVC Spec: {data}\n" f"SC Status({sc_code}): {sc_data}" ) # verify the storage class is marked `default` assert 'true' == sc_data['metadata']['annotations'][capi.scs.DEFAULT_KEY], ( f"Storage Class is NOT the DEFAULT for created PVC\n" f"Requested Storage Class: {spec.storage_cls!r}" f"Created PVC Spec: {data}\n" f"SC Status({sc_code}): {sc_data}" ) # teardown capi.pvcs.delete(unique_name)
def test_create_rke2(self,
rancher_api_client,
unique_name,
harvester_mgmt_cluster,
harvester_cloud_credential,
rke2_cluster,
ubuntu_image,
vlan_network,
rancher_wait_timeout,
polling_for)-
Expand source code
@pytest.mark.dependency(depends=["import_harvester"], name="create_rke2") def test_create_rke2(self, rancher_api_client, unique_name, harvester_mgmt_cluster, harvester_cloud_credential, rke2_cluster, ubuntu_image, vlan_network, rancher_wait_timeout, polling_for): # Create Harvester kubeconfig for this RKE2 cluster code, data = rancher_api_client.kube_configs.create( rke2_cluster['name'], harvester_mgmt_cluster['id'] ) assert 200 == code, ( f"Failed to create harvester kubeconfig for rke2 with error: {code}, {data}" ) assert "" != data, ( f"Harvester kubeconfig for rke2 should not be empty: {code}, {data}" ) kubeconfig = data # Create credential for this RKE2 cluster code, data = rancher_api_client.secrets.create( name=unique_name, data={ "credential": kubeconfig[1:-1].replace("\\n", "\n") }, annotations={ "v2prov-secret-authorized-for-cluster": rke2_cluster['name'], "v2prov-authorized-secret-deletes-on-cluster-removal": "true" } ) assert 201 == code, ( f"Failed to create secret with error: {code}, {data}" ) cloud_provider_config_id = f"{data['metadata']['namespace']}:{data['metadata']['name']}" # Create RKE2 cluster spec code, data = rancher_api_client.harvester_configs.create( name=unique_name, cpus="2", mems="4", disks="40", image_id=ubuntu_image['id'], network_id=vlan_network['name'], ssh_user=ubuntu_image['ssh_user'], user_data=( "#cloud-config\n" "password: test\n" "chpasswd:\n" " expire: false\n" "ssh_pwauth: true\n" ), ) assert 201 == code, ( f"Failed to create harvester config with error: {code}, {data}" ) # Create RKE2 cluster code, data = rancher_api_client.mgmt_clusters.create( name=rke2_cluster['name'], cloud_provider_config_id=cloud_provider_config_id, hostname_prefix=f"{rke2_cluster['name']}-", harvester_config_name=unique_name, k8s_version=rke2_cluster['k8s_version'], cloud_credential_id=harvester_cloud_credential['id'], quantity=rke2_cluster['machine_count'] ) assert 201 == code, ( f"Failed to create RKE2 MgmtCluster {unique_name} with error: {code}, {data}" ) code, data = polling_for( f"cluster {rke2_cluster['name']} to be ready", lambda code, data: "active" == data['metadata']['state']['name'] and "Ready" in data['metadata']['state']['message'], rancher_api_client.mgmt_clusters.get, rke2_cluster['name'], timeout=rancher_wait_timeout ) # update fixture value rke2_cluster['id'] = data["status"]["clusterName"] # Check deployments testees = ["harvester-cloud-provider", "harvester-csi-driver-controllers"] polling_for( f"harvester deployments on {rke2_cluster['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke2_cluster['id'], "kube-system", testees )
def test_csi_deployment(self, rancher_api_client, rke2_cluster, csi_deployment, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["create_rke2"], name="csi_deployment") def test_csi_deployment(self, rancher_api_client, rke2_cluster, csi_deployment, polling_for): # create pvc code, data = rancher_api_client.pvcs.create(rke2_cluster['id'], csi_deployment['pvc']) assert 201 == code, ( f"Fail to create {csi_deployment['pvc']} on {rke2_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"PVC {csi_deployment['pvc']} to be ready", lambda code, data: 200 == code and "bound" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.pvcs.get, rke2_cluster['id'], csi_deployment['pvc'] ) # deployment with csi code, data = rancher_api_client.cluster_deployments.create( rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name'], csi_deployment['image'], csi_deployment['pvc'] ) assert 201 == code, ( f"Fail to deploy {csi_deployment['name']} on {rke2_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"deployment {csi_deployment['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] )
def test_delete_deployment(self, rancher_api_client, rke2_cluster, csi_deployment, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["csi_deployment"]) def test_delete_deployment(self, rancher_api_client, rke2_cluster, csi_deployment, polling_for): code, data = rancher_api_client.cluster_deployments.delete( rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) assert 204 == code, ( f"Failed to delete deployment {csi_deployment['name']} with error: {code}, {data}" ) polling_for( f"deployment {csi_deployment['name']} to be deleted", lambda code, data: code == 404, rancher_api_client.cluster_deployments.get, rke2_cluster['id'], csi_deployment['namespace'], csi_deployment['name'] ) # teardown rancher_api_client.pvcs.delete(rke2_cluster['id'], csi_deployment['pvc'])
def test_delete_rke2(self, api_client, rancher_api_client, rke2_cluster, rancher_wait_timeout, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["create_rke2"]) def test_delete_rke2(self, api_client, rancher_api_client, rke2_cluster, rancher_wait_timeout, polling_for): code, data = rancher_api_client.mgmt_clusters.delete(rke2_cluster['name']) assert 200 == code, ( f"Failed to delete RKE2 MgmtCluster {rke2_cluster['name']} with error: {code}, {data}" ) polling_for( f"cluster {rke2_cluster['name']} to be deleted", lambda code, data: 404 == code, rancher_api_client.mgmt_clusters.get, rke2_cluster['name'], timeout=rancher_wait_timeout ) code, data = api_client.vms.get() remaining_vm_cnt = 0 for d in data.get('data', []): vm_name = d.get('metadata', {}).get('name', "") if vm_name.startswith(f"{rke2_cluster['name']}-"): remaining_vm_cnt += 1 assert 0 == remaining_vm_cnt, (f"Still have {remaining_vm_cnt} RKE2 VMs")
def test_deploy_nginx(self, rancher_api_client, rke2_cluster, nginx_deployment, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["create_rke2"], name="deploy_nginx") def test_deploy_nginx(self, rancher_api_client, rke2_cluster, nginx_deployment, polling_for): code, data = rancher_api_client.cluster_deployments.create( rke2_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name'], nginx_deployment['image'] ) assert 201 == code, ( f"Fail to deploy {nginx_deployment['name']} on {rke2_cluster['name']}\n" f"API Response: {code}, {data}" ) polling_for( f"deployment {nginx_deployment['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_deployments.get, rke2_cluster['id'], nginx_deployment['namespace'], nginx_deployment['name'] )
def test_load_balancer_service(self, rancher_api_client, rke2_cluster, nginx_deployment, lb_service, polling_for)
-
Expand source code
@pytest.mark.dependency(depends=["deploy_nginx"]) def test_load_balancer_service(self, rancher_api_client, rke2_cluster, nginx_deployment, lb_service, polling_for): # create LB service code, data = rancher_api_client.cluster_services.create( rke2_cluster['id'], lb_service["data"] ) assert 201 == code, ( f"Fail to create {lb_service['name']} for {nginx_deployment['name']}\n" f"API Response: {code}, {data}" ) # check service active code, data = polling_for( f"service {lb_service['name']} to be ready", lambda code, data: 200 == code and "active" == data.get("metadata", {}).get("state", {}).get("name"), rancher_api_client.cluster_services.get, rke2_cluster['id'], lb_service['name'] ) # check Nginx can be queired via LB try: ingress = data["status"]["loadBalancer"]["ingress"][0] ingress_url = f"http://{ingress['ip']}:{ingress['ports'][0]['port']}" except Exception as e: raise AssertionError( f"Fail to get ingress info from {lb_service['name']}\n" f"Got error: {e}\n" f"Service data: {data}" ) resp = rancher_api_client.session.get(ingress_url) assert resp.ok and "Welcome to nginx" in resp.text, ( f"Fail to query load balancer {lb_service['name']}\n" f"Got error: {resp.status_code}, {resp.text}\n" f"Service data: {data}" ) # teardown rancher_api_client.cluster_services.delete(rke2_cluster['id'], lb_service["name"])