Module harvester_e2e_tests.integrations.test_z_terraform_rancher
Functions
def cluster_resource(tf_rancher_resource, rke2_cluster, harvester, credential_resource)
-
Expand source code
@pytest.fixture(scope="module") def cluster_resource(tf_rancher_resource, rke2_cluster, harvester, credential_resource): return tf_rancher_resource.cluster_config( rke2_cluster["name"], rke2_cluster["k8s_version"], harvester["name"], credential_resource.name )
def credential_resource(unique_name, tf_rancher_resource, harvester)
-
Expand source code
@pytest.fixture(scope="module") def credential_resource(unique_name, tf_rancher_resource, harvester): return tf_rancher_resource.cloud_credential( f"cc-{unique_name}", harvester["name"] )
def harvester(api_client, rancher_api_client, unique_name, wait_timeout)
-
Expand source code
@pytest.fixture(scope='module') def harvester(api_client, rancher_api_client, unique_name, wait_timeout): """ Rancher creates Harvester entry (Import Existing) """ name = f"hvst-{unique_name}" rc, data = rancher_api_client.mgmt_clusters.create_harvester(name) assert 201 == rc, (rc, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): rc, data = rancher_api_client.mgmt_clusters.get(name) if data.get('status', {}).get('clusterName'): break else: raise AssertionError( f"Fail to get MgmtCluster with clusterName {name}\n" f"rc: {rc}, data:\n{data}" ) namespace = data['metadata']['namespace'] yield { "name": name, "namespace": namespace, "id": data['status']['clusterName'], "kubeconfig": api_client.generate_kubeconfig() } rancher_api_client.mgmt_clusters.delete(name, namespace) updates = dict(value="") api_client.settings.update("cluster-registration-url", updates)
Rancher creates Harvester entry (Import Existing)
def machine_resource(tf_rancher_resource, rke2_cluster, vlan_network, ubuntu_image)
-
Expand source code
@pytest.fixture(scope="module") def machine_resource(tf_rancher_resource, rke2_cluster, vlan_network, ubuntu_image): return tf_rancher_resource.machine_config( rke2_cluster["name"], vlan_network["id"], ubuntu_image["id"], ubuntu_image["ssh_user"] )
def rancher(rancher_api_client)
-
Expand source code
@pytest.fixture(scope="module") def rancher(rancher_api_client): access_key, secret_key = rancher_api_client.token.split(":") yield { "endpoint": rancher_api_client.endpoint, "token": rancher_api_client.token, "access_key": access_key, "secret_key": secret_key }
def rke2_cluster(unique_name, rke2_version)
-
Expand source code
@pytest.fixture(scope='module') def rke2_cluster(unique_name, rke2_version): return { "name": f"rke2-{unique_name}", "id": "", # set in test_create_rke2_cluster "k8s_version": rke2_version }
def test_create_cloud_credential(rancher_api_client, tf_rancher, credential_resource)
-
Expand source code
@pytest.mark.p0 @pytest.mark.terraform @pytest.mark.rancher @pytest.mark.dependency(name="create_cloud_credential", depends=["import_harvester"]) def test_create_cloud_credential(rancher_api_client, tf_rancher, credential_resource): spec = credential_resource tf_rancher.save_as(spec.ctx, "cloud_credential") out, err, code = tf_rancher.apply_resource(spec.type, spec.name) assert not err and 0 == code code, data = rancher_api_client.cloud_credentials.get(params={"name": spec.name}) assert 200 == code, ( f"Failed to get cloud credential {spec.name}: {code}, {data}" )
def test_create_machine_config(tf_rancher, machine_resource)
-
Expand source code
@pytest.mark.p0 @pytest.mark.terraform @pytest.mark.rancher @pytest.mark.dependency(name="create_machine_config", depends=["create_cloud_credential"]) def test_create_machine_config(tf_rancher, machine_resource): spec = machine_resource tf_rancher.save_as(spec.ctx, "machine_config") out, err, code = tf_rancher.apply_resource(spec.type, spec.name) assert not err and 0 == code
def test_create_rke2_cluster(tf_rancher, rke2_cluster, rancher_api_client, cluster_resource)
-
Expand source code
@pytest.mark.p0 @pytest.mark.terraform @pytest.mark.rancher @pytest.mark.dependency(name="create_rke2_cluster", depends=["create_machine_config"]) def test_create_rke2_cluster(tf_rancher, rke2_cluster, rancher_api_client, cluster_resource): spec = cluster_resource tf_rancher.save_as(spec.ctx, "rke2_cluster") out, err, code = tf_rancher.apply_resource(spec.type, spec.name) assert not err and 0 == code rc, data = rancher_api_client.mgmt_clusters.get(rke2_cluster['name']) cluster_state = data.get("metadata", {}).get("state", {}) assert "active" == cluster_state['name'] and \ "Ready" in cluster_state['message'] # check deployments rke2_cluster['id'] = data["status"]["clusterName"] for deployment in ["harvester-cloud-provider", "harvester-csi-driver-controllers"]: rc, data = rancher_api_client.cluster_deployments.get( rke2_cluster['id'], "kube-system", deployment ) cluster_state = data.get("metadata", {}).get("state", {}) assert 200 == rc and \ "active" == cluster_state["name"]
def test_delete_cloud_credential(tf_rancher, credential_resource)
-
Expand source code
@pytest.mark.p0 @pytest.mark.terraform @pytest.mark.rancher @pytest.mark.dependency(name="delete_cloud_credential", depends=["create_cloud_credential"]) def test_delete_cloud_credential(tf_rancher, credential_resource): spec = credential_resource out, err, code = tf_rancher.destroy_resource(spec.type, spec.name) assert not err and 0 == code
def test_delete_machine_config(tf_rancher, machine_resource)
-
Expand source code
@pytest.mark.p0 @pytest.mark.terraform @pytest.mark.rancher @pytest.mark.dependency(name="delete_machine_config", depends=["create_machine_config"]) def test_delete_machine_config(tf_rancher, machine_resource): spec = machine_resource out, err, code = tf_rancher.destroy_resource(spec.type, spec.name) assert not err and 0 == code
def test_delete_rke2_cluster(tf_rancher, rke2_cluster, rancher_api_client, cluster_resource)
-
Expand source code
@pytest.mark.p0 @pytest.mark.terraform @pytest.mark.rancher @pytest.mark.dependency(name="delete_rke2_cluster", depends=["create_rke2_cluster"]) def test_delete_rke2_cluster(tf_rancher, rke2_cluster, rancher_api_client, cluster_resource): spec = cluster_resource out, err, code = tf_rancher.destroy_resource(spec.type, spec.name) assert not err and 0 == code rc, data = rancher_api_client.mgmt_clusters.get(rke2_cluster['name']) assert 404 == rc
def test_import_harvester(api_client, rancher_api_client, harvester, wait_timeout)
-
Expand source code
@pytest.mark.p0 @pytest.mark.terraform @pytest.mark.rancher @pytest.mark.dependency(name="import_harvester") def test_import_harvester(api_client, rancher_api_client, harvester, wait_timeout): # Get cluster registration URL in Rancher's Virtualization Management endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): rc, data = rancher_api_client.cluster_registration_tokens.get(harvester['id']) if 200 == rc and data.get('manifestUrl'): break else: raise AssertionError( f"Fail to registration URL for the imported harvester {harvester['name']}\n" f"rc: {rc}, data:\n{data}" ) # Set cluster-registration-url on Harvester updates = dict(value=data['manifestUrl']) rc, data = api_client.settings.update("cluster-registration-url", updates) assert 200 == rc, ( f"Failed to update Harvester's settings `cluster-registration-url`" f"rc: {rc}, data:\n{data}" ) # Check Cluster becomes `active` in Rancher's Virtualization Management endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): rc, data = rancher_api_client.mgmt_clusters.get(harvester['name']) cluster_state = data['metadata']['state'] if "active" == cluster_state['name'] and "Ready" in cluster_state['message']: break else: raise AssertionError( f"Fail to import harvester\n" f"rc: {rc}, data:\n{data}" )
def ubuntu_image(api_client, unique_name, image_ubuntu, wait_timeout)
-
Expand source code
@pytest.fixture(scope="module") def ubuntu_image(api_client, unique_name, image_ubuntu, wait_timeout): rc, data = api_client.images.create_by_url(unique_name, image_ubuntu.url) assert 201 == rc, ( f"Failed to upload ubuntu image with error\n" f"rc: {rc}, data:\n{data}" ) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): rc, data = api_client.images.get(unique_name) if 200 == rc and image_ubuntu.url == data['spec']['url']: break else: raise AssertionError( f"Fail to create image {unique_name}\n" f"rc: {rc}, data:\n{data}" ) namespace = data['metadata']['namespace'] name = data['metadata']['name'] yield { "id": f"{namespace}/{name}", "ssh_user": "ubuntu" } api_client.images.delete(name, namespace)
def vlan_network(request, api_client)
-
Expand source code
@pytest.fixture(scope='module') def vlan_network(request, api_client): vlan_nic = request.config.getoption('--vlan-nic') vlan_id = request.config.getoption('--vlan-id') assert -1 != vlan_id, "Rancher integration test needs VLAN" api_client.clusternetworks.create(vlan_nic) api_client.clusternetworks.create_config(vlan_nic, vlan_nic, vlan_nic) network_name = f'vlan-network-{vlan_id}' code, data = api_client.networks.get(network_name) if code != 200: code, data = api_client.networks.create(network_name, vlan_id, cluster_network=vlan_nic) assert 201 == code, ( f"Failed to create network-attachment-definition {network_name}\n" f"rc: {code}, data:\n{data}" ) namespace = data['metadata']['namespace'] name = data['metadata']['name'] yield { "id": f"{namespace}/{name}" } api_client.networks.delete(network_name, namespace)