Module harvester_e2e_tests.integrations.test_upgrade
Functions
def cluster_network(vlan_nic, api_client, unique_name, network_checker, wait_timeout)
-
Expand source code
@pytest.fixture(scope='module') def cluster_network(vlan_nic, api_client, unique_name, network_checker, wait_timeout): code, data = api_client.clusternetworks.get_config() assert 200 == code, (code, data) node_key = 'network.harvesterhci.io/matched-nodes' cnet_nodes = dict() # cluster_network: items for cfg in data['items']: if vlan_nic in cfg['spec']['uplink']['nics']: nodes = json.loads(cfg['metadata']['annotations'][node_key]) cnet_nodes.setdefault(cfg['spec']['clusterNetwork'], []).extend(nodes) code, data = api_client.hosts.get() assert 200 == code, (code, data) all_nodes = set(n['id'] for n in data['data']) try: # vlad_nic configured on specific cluster network, reuse it yield next(cnet for cnet, nodes in cnet_nodes.items() if all_nodes == set(nodes)) return None except StopIteration: configured_nodes = reduce(add, cnet_nodes.values(), []) if any(n in configured_nodes for n in all_nodes): raise AssertionError( "Not all nodes' VLAN NIC {vlan_nic} are available.\n" f"VLAN NIC configured nodes: {configured_nodes}\n" f"All nodes: {all_nodes}\n" ) # Create cluster network cnet = f"cnet-{datetime.strptime(unique_name, '%Hh%Mm%Ss%f-%m-%d').strftime('%H%M%S')}" created = [] code, data = api_client.clusternetworks.create(cnet) assert 201 == code, (code, data) while all_nodes: node = all_nodes.pop() code, data = api_client.clusternetworks.create_config(node, cnet, vlan_nic, hostname=node) assert 201 == code, ( f"Failed to create cluster config for {node}\n" f"Created: {created}\t Remaining: {all_nodes}\n" f"API Status({code}): {data}" ) cnet_config_created, (code, data) = network_checker.wait_cnet_config_created(node) assert cnet_config_created, (code, data) created.append(node) yield cnet # Teardown endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): name = created.pop(0) code, data = api_client.clusternetworks.delete_config(name) if code != 404: created.append(name) if not created: break sleep(1) else: raise AssertionError( f"Failed to remove network configs {created} after {wait_timeout}s\n" f"Last API Status({code}): {data}" ) code, data = api_client.clusternetworks.delete(cnet) assert 200 == code, (code, data)
def cluster_state(request, unique_name, api_client)
-
Expand source code
@pytest.fixture(scope="module") def cluster_state(request, unique_name, api_client): class ClusterState: vm1 = None vm2 = None vm3 = None pass state = ClusterState() if request.config.getoption('--upgrade-target-version'): state.version_verify = True state.version = request.config.getoption('--upgrade-target-version') else: state.version_verify = False state.version = f"version-{unique_name}" return state
def config_backup_target(request, api_client, wait_timeout)
-
Expand source code
@pytest.fixture(scope="class") def config_backup_target(request, api_client, wait_timeout): # multiple fixtures from `vm_backup_restore` conflict_retries = 5 nfs_endpoint = request.config.getoption('--nfs-endpoint') assert nfs_endpoint, f"NFS endpoint not configured: {nfs_endpoint}" assert nfs_endpoint.startswith("nfs://"), ( f"NFS endpoint should starts with `nfs://`, not {nfs_endpoint}" ) backup_type, config = ("NFS", dict(endpoint=nfs_endpoint)) code, data = api_client.settings.get('backup-target') origin_spec = api_client.settings.BackupTargetSpec.from_dict(data) spec = getattr(api_client.settings.BackupTargetSpec, backup_type)(**config) # ???: when switching S3 -> NFS, update backup-target will easily hit resource conflict # so we would need retries to apply the change. for _ in range(conflict_retries): code, data = api_client.settings.update('backup-target', spec) if 409 == code and "Conflict" == data['reason']: sleep(3) else: break else: raise AssertionError( f"Unable to update backup-target after {conflict_retries} retried." f"API Status({code}): {data}" ) assert 200 == code, ( f'Failed to update backup target to {backup_type} with {config}\n' f"API Status({code}): {data}" ) yield spec # remove unbound LH backupVolumes code, data = api_client.lhbackupvolumes.get() assert 200 == code, "Failed to list lhbackupvolumes" check_names = [] for volume_data in data["items"]: volume_name = volume_data["metadata"]["name"] backup_name = volume_data["status"]["lastBackupName"] if not backup_name: api_client.lhbackupvolumes.delete(volume_name) check_names.append(volume_name) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): for name in check_names[:]: code, data = api_client.lhbackupvolumes.get(name) if 404 == code: check_names.remove(name) if not check_names: break sleep(3) else: raise AssertionError( f"Failed to delete unbound lhbackupvolumes: {check_names}\n" f"Last API Status({code}): {data}" ) # restore to original backup-target and remove backups not belong to it code, data = api_client.settings.update('backup-target', origin_spec) code, data = api_client.backups.get() assert 200 == code, "Failed to list backups" check_names = [] for backup in data['data']: endpoint = backup['status']['backupTarget'].get('endpoint') if endpoint != origin_spec.value.get('endpoint'): api_client.backups.delete(backup['metadata']['name']) check_names.append(backup['metadata']['name']) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): for name in check_names[:]: code, data = api_client.backups.get(name) if 404 == code: check_names.remove(name) if not check_names: break sleep(3) else: raise AssertionError( f"Failed to delete backups: {check_names}\n" f"Last API Status({code}): {data}" )
def config_storageclass(request, api_client, unique_name, cluster_state)
-
Expand source code
@pytest.fixture(scope="module") def config_storageclass(request, api_client, unique_name, cluster_state): replicas = request.config.getoption('--upgrade-sc-replicas') or 3 code, default_sc = api_client.scs.get_default() assert 200 == code, (code, default_sc) sc_name = f"new-sc-{replicas}-{unique_name}" code, data = api_client.scs.create(sc_name, replicas) assert 201 == code, (code, data) code, data = api_client.scs.set_default(sc_name) assert 200 == code, (code, data) cluster_state.scs = (default_sc, data) yield default_sc, data code, data = api_client.scs.set_default(default_sc['metadata']['name']) assert 200 == code, (code, data)
def harvester_crds(api_client)
-
Expand source code
@pytest.fixture(scope="module") def harvester_crds(api_client): crds = { "addons.harvesterhci.io": False, "blockdevices.harvesterhci.io": False, "keypairs.harvesterhci.io": False, "preferences.harvesterhci.io": False, "settings.harvesterhci.io": False, "supportbundles.harvesterhci.io": False, "upgrades.harvesterhci.io": False, "versions.harvesterhci.io": False, "virtualmachinebackups.harvesterhci.io": False, "virtualmachineimages.harvesterhci.io": False, "virtualmachinerestores.harvesterhci.io": False, "virtualmachinetemplates.harvesterhci.io": False, "virtualmachinetemplateversions.harvesterhci.io": False, "clusternetworks.network.harvesterhci.io": False, "linkmonitors.network.harvesterhci.io": False, "nodenetworks.network.harvesterhci.io": False, "vlanconfigs.network.harvesterhci.io": False, "vlanstatuses.network.harvesterhci.io": False, "ksmtuneds.node.harvesterhci.io": False, "loadbalancers.loadbalancer.harvesterhci.io": False, } if api_client.cluster_version.release >= (1, 2, 0): # removed after `v1.2.0` (network-controller v0.3.3) # ref: https://github.com/harvester/network-controller-harvester/pull/85 crds.pop("nodenetworks.network.harvesterhci.io") return crds
def image(api_client, image_ubuntu, unique_name, wait_timeout)
-
Expand source code
@pytest.fixture(scope="module") def image(api_client, image_ubuntu, unique_name, wait_timeout): unique_image_id = f'image-{unique_name}' code, data = api_client.images.create_by_url( unique_image_id, image_ubuntu.url, display_name=f"{unique_name}-{image_ubuntu.name}" ) assert 201 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(unique_image_id) if 100 == data.get('status', {}).get('progress', 0): break sleep(3) else: raise AssertionError( "Failed to create Image with error:\n" f"Status({code}): {data}" ) yield dict(id=f"{data['metadata']['namespace']}/{unique_image_id}", user=image_ubuntu.ssh_user) code, data = api_client.images.delete(unique_image_id)
def interceptor(api_client)
-
Expand source code
@pytest.fixture(scope="module") def interceptor(api_client): from inspect import getmembers, ismethod class Interceptor: def intercepts(self): meths = getmembers(self, predicate=ismethod) return [m for name, m in meths if name.startswith("intercept_")] def check(self, data): for func in self.intercepts(): func(data) return Interceptor()
def logging_addon()
-
Expand source code
@pytest.fixture(scope="module") def logging_addon(): return SimpleNamespace( namespace="cattle-logging-system", name="rancher-logging", enable_statuses=("deployed", "AddonDeploySuccessful"), enable_toggled=False )
def stopped_vm(request, api_client, ssh_keypair, wait_timeout, unique_name, image)
-
Expand source code
@pytest.fixture def stopped_vm(request, api_client, ssh_keypair, wait_timeout, unique_name, image): unique_vm_name = f"{request.node.name.lstrip('test_').replace('_', '-')}-{unique_name}" cpu, mem = 1, 2 pub_key, pri_key = ssh_keypair vm_spec = api_client.vms.Spec(cpu, mem) vm_spec.add_image("disk-0", image['id']) vm_spec.run_strategy = "Halted" userdata = yaml.safe_load(vm_spec.user_data) userdata['ssh_authorized_keys'] = [pub_key] vm_spec.user_data = yaml.dump(userdata) code, data = api_client.vms.create(unique_vm_name, vm_spec) assert 201 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get(unique_vm_name) if "Stopped" == data.get('status', {}).get('printableStatus'): break sleep(1) yield unique_vm_name, image['user'], pri_key code, data = api_client.vms.get(unique_vm_name) vm_spec = api_client.vms.Spec.from_dict(data) api_client.vms.delete(unique_vm_name) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get_status(unique_vm_name) if 404 == code: break sleep(3) for vol in vm_spec.volumes: vol_name = vol['volume']['persistentVolumeClaim']['claimName'] api_client.volumes.delete(vol_name)
def upgrade_target(request, unique_name)
-
Expand source code
@pytest.fixture(scope="module") def upgrade_target(request, unique_name): version = request.config.getoption('--upgrade-target-version').strip() version = version or f"upgrade-{unique_name}" iso_url = request.config.getoption('--upgrade-iso-url').strip() assert iso_url, "Target ISO URL should not be empty" checksum = request.config.getoption("--upgrade-iso-checksum").strip() assert checksum, "Checksum for Target ISO should not be empty" return version, iso_url, checksum
def vm_network(api_client, unique_name, wait_timeout, cluster_network, vlan_id, cluster_state)
-
Expand source code
@pytest.fixture(scope="module") def vm_network(api_client, unique_name, wait_timeout, cluster_network, vlan_id, cluster_state): code, data = api_client.networks.create( unique_name, vlan_id, cluster_network=cluster_network ) assert 201 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.networks.get(unique_name) annotations = data['metadata'].get('annotations', {}) if 200 == code and annotations.get('network.harvesterhci.io/route'): route = json.loads(annotations['network.harvesterhci.io/route']) if route['cidr']: break sleep(3) else: raise AssertionError( "VM network created but route info not available\n" f"API Status({code}): {data}" ) cluster_state.network = data yield dict(name=unique_name, cidr=route['cidr'], namespace=data['metadata']['namespace']) code, data = api_client.networks.delete(unique_name) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.networks.get(unique_name) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to remove VM network {unique_name} after {wait_timeout}s\n" f"API Status({code}): {data}" )
Classes
class TestAnyNodesUpgrade
-
Expand source code
@pytest.mark.upgrade @pytest.mark.any_nodes class TestAnyNodesUpgrade: @pytest.mark.dependency(name="preq_setup_logging") def test_preq_logging_pods(self, api_client, logging_addon, wait_timeout, sleep_timeout): # logging is an addon instead of built-in since v1.2.0 if api_client.cluster_version.release >= (1, 2, 0): addon = "/".join([logging_addon.namespace, logging_addon.name]) code, data = api_client.addons.get(addon) assert 200 == code, (code, data) if not data.get('status', {}).get('status') in logging_addon.enable_statuses: code, data = api_client.addons.enable(addon) assert 200 == code, (code, data) assert data.get('spec', {}).get('enabled', False), (code, data) logging_addon.enable_toggled = True endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.addons.get(addon) if data.get('status', {}).get('status') in logging_addon.enable_statuses: break sleep(sleep_timeout) else: raise AssertionError( f"Failed to enable addon {addon} with {wait_timeout} timed out\n" f"API Status({code}): {data}" ) code, pods = api_client.get_pods(namespace=logging_addon.namespace) assert code == 200 and len(pods['data']) > 0, "No logging pods found" fails = [] for pod in pods['data']: phase = pod["status"]["phase"] if phase not in ("Running", "Succeeded"): fails.append((pod['metadata']['name'], phase)) else: assert not fails, ( "\n".join(f"Pod({n})'s phase({p}) is not expected." for n, p in fails) ) @pytest.mark.dependency(name="preq_setup_vmnetwork") def test_preq_setup_vmnetwork(self, vm_network): ''' Be used to trigger the fixture to setup VM network ''' @pytest.mark.dependency(name="preq_setup_storageclass") def test_preq_setup_storageclass(self, config_storageclass): """ Be used to trigger the fixture to setup storageclass""" @pytest.mark.dependency(name="preq_setup_vms") def test_preq_setup_vms( self, api_client, ssh_keypair, unique_name, vm_checker, vm_shell, vm_network, image, config_storageclass, config_backup_target, wait_timeout, cluster_state ): # create new storage class, make it default # create 3 VMs: # - having the new storage class # - the VM that have some data written, take backup # - the VM restored from the backup pub_key, pri_key = ssh_keypair old_sc, new_sc = config_storageclass unique_vm_name = f"ug-vm-{unique_name}" cpu, mem, size = 1, 2, 5 vm_spec = api_client.vms.Spec(cpu, mem, mgmt_network=False) vm_spec.add_image('disk-0', image['id'], size=size) vm_spec.add_network('nic-1', f"{vm_network['namespace']}/{vm_network['name']}") userdata = yaml.safe_load(vm_spec.user_data) userdata['ssh_authorized_keys'] = [pub_key] vm_spec.user_data = yaml.dump(userdata) code, data = api_client.vms.create(unique_vm_name, vm_spec) assert 201 == code, (code, data) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ["nic-1"]) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'nic-1') # write data into VM endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: with vm_shell.login(vm_ip, image['user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited and not err, (out, err) out, err = sh.exec_command( "dd if=/dev/urandom of=./generate_file bs=1M count=1024; sync" ) assert not out, (out, err) vm1_md5, err = sh.exec_command( "md5sum ./generate_file > ./generate_file.md5; cat ./generate_file.md5" ) assert not err, (vm1_md5, err) break except (SSHException, NoValidConnectionsError, TimeoutError): sleep(5) else: raise AssertionError("Timed out while writing data into VM") # Take backup then check it's ready code, data = api_client.vms.backup(unique_vm_name, unique_vm_name) assert 204 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, backup = api_client.backups.get(unique_vm_name) if 200 == code and backup.get('status', {}).get('readyToUse'): break sleep(3) else: raise AssertionError( f'Timed-out waiting for the backup \'{unique_vm_name}\' to be ready.' ) # restore into new VM restored_vm_name = f"r-{unique_vm_name}" spec = api_client.backups.RestoreSpec.for_new(restored_vm_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, (code, data) # Check restore VM is created endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get(restored_vm_name) if 200 == code: break sleep(3) else: raise AssertionError( f"restored VM {restored_vm_name} is not created" ) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(restored_vm_name, ["nic-1"]) assert vm_got_ips, ( f"Failed to Start VM({restored_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) # Check data consistency r_vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'nic-1') endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: with vm_shell.login(r_vm_ip, image['user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited and not err, (out, err) out, err = sh.exec_command("md5sum -c ./generate_file.md5") assert not err, (out, err) vm2_md5, err = sh.exec_command("cat ./generate_file.md5") assert not err, (vm2_md5, err) assert vm1_md5 == vm2_md5 out, err = sh.exec_command( f"ping -c1 {vm_ip} > /dev/null && echo -n success || echo -n fail" ) assert "success" == out and not err break except (SSHException, NoValidConnectionsError, ConnectionResetError, TimeoutError): sleep(5) else: raise AssertionError("Unable to login to restored VM to check data consistency") # Create VM having additional volume with new storage class vm_spec.add_volume("vol-1", 5, storage_cls=new_sc['metadata']['name']) code, data = api_client.vms.create(f"sc-{unique_vm_name}", vm_spec) assert 201 == code, (code, data) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(f"sc-{unique_vm_name}", ["nic-1"]) assert vm_got_ips, ( f"Failed to Start VM(sc-{unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) # store into cluster's state names = [unique_vm_name, f"r-{unique_vm_name}", f"sc-{unique_vm_name}"] cluster_state.vms = dict(md5=vm1_md5, names=names, ssh_user=image['user'], pkey=pri_key) @pytest.mark.dependency(name="any_nodes_upgrade") def test_perform_upgrade( self, api_client, unique_name, upgrade_target, upgrade_timeout, interceptor, upgrade_checker ): """ - perform upgrade - check all nodes upgraded """ # Check nodes counts code, data = api_client.hosts.get() assert code == 200, (code, data) nodes = len(data['data']) # create Upgrade version and start skip_version_check = {"harvesterhci.io/skip-version-check": True} # for test purpose version, url, checksum = upgrade_target version = f"{version}-{unique_name}" code, data = api_client.versions.create(version, url, checksum) assert 201 == code, f"Failed to create upgrade for {version}" version_created, (code, data) = upgrade_checker.wait_version_created(version) assert version_created, (code, data) code, data = api_client.upgrades.create(version, annotations=skip_version_check) assert 201 == code, f"Failed to start upgrade for {version}" upgrade_name = data['metadata']['name'] # Check upgrade status # TODO: check every upgrade stages endtime = datetime.now() + timedelta(seconds=upgrade_timeout * nodes) while endtime > datetime.now(): code, data = api_client.upgrades.get(upgrade_name) if 200 != code: continue interceptor.check(data) conds = dict((c['type'], c) for c in data.get('status', {}).get('conditions', [])) state = data.get('metadata', {}).get('labels', {}).get('harvesterhci.io/upgradeState') if "Succeeded" == state and "True" == conds.get('Completed', {}).get('status'): break if any("False" == c['status'] for c in conds.values()): raise AssertionError(f"Upgrade failed with conditions: {conds.values()}") sleep(30) else: raise AssertionError( f"Upgrade timed out with conditions: {conds.values()}\n" f"API Status({code}): {data}" ) @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_logging"]) def test_verify_logging_pods(self, api_client, logging_addon): """ Verify logging pods and logs Criteria: https://github.com/harvester/tests/issues/535 """ # logging is an addon instead of built-in since v1.2.0 if api_client.cluster_version.release >= (1, 2, 0): addon = "/".join([logging_addon.namespace, logging_addon.name]) code, data = api_client.addons.get(addon) assert data.get('status', {}).get('status') in logging_addon.enable_statuses code, pods = api_client.get_pods(namespace=logging_addon.namespace) assert code == 200 and len(pods['data']) > 0, "No logging pods found" fails = [] for pod in pods['data']: phase = pod["status"]["phase"] if phase not in ("Running", "Succeeded"): fails.append((pod['metadata']['name'], phase)) else: assert not fails, ( "\n".join(f"Pod({n})'s phase({p}) is not expected." for n, p in fails) ) # teardown if logging_addon.enable_toggled: api_client.addons.disable(addon) @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_audit_log(self, api_client, host_shell, wait_timeout): code, data = api_client.hosts.get() assert 200 == code, (code, data) label_main = "node-role.kubernetes.io/control-plane" masters = [n for n in data['data'] if n['metadata']['labels'].get(label_main) == "true"] assert len(masters) > 0, "No master nodes found" script = ("sudo tail /var/lib/rancher/rke2/server/logs/audit.log | awk 'END{print}' " "| jq .requestReceivedTimestamp " "| xargs -I {} date -d \"{}\" +%s") node_ips = [n["metadata"]["annotations"][NODE_INTERNAL_IP_ANNOTATION] for n in masters] cmp = dict() done = set() endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): for ip in done.symmetric_difference(node_ips): try: with host_shell.login(ip) as shell: out, err = shell.exec_command(script) timestamp = int(out) if not err and ip not in cmp: cmp[ip] = timestamp continue if not err and cmp[ip] < timestamp: done.add(ip) except (SSHException, NoValidConnectionsError, ConnectionResetError, TimeoutError): continue if not done.symmetric_difference(node_ips): break sleep(5) else: raise AssertionError( "\n".join("Node {ip} audit log is not updated." for ip in set(node_ips) ^ done) ) @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_vmnetwork"]) def test_verify_network(self, api_client, cluster_state): """ Verify cluster and VLAN networks - cluster network `mgmt` should exists - Created VLAN should exists """ code, cnets = api_client.clusternetworks.get() assert code == 200, ( "Failed to get Networks: %d, %s" % (code, cnets)) assert len(cnets["items"]) > 0, ("No Networks found") assert any(n['metadata']['name'] == "mgmt" for n in cnets['items']), ( "Cluster network mgmt not found") code, vnets = api_client.networks.get() assert code == 200, (f"Failed to get VLANs: {code}, {vnets}" % (code, vnets)) assert len(vnets["items"]) > 0, ("No VLANs found") used_vlan = cluster_state.network['metadata']['name'] assert any(used_vlan == n['metadata']['name'] for n in vnets['items']), ( f"VLAN {used_vlan} not found") @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_vms"]) def test_verify_vms(self, api_client, cluster_state, vm_shell, vm_checker, wait_timeout): """ Verify VMs' state and data Criteria: - VMs should keep in running state - data in VMs should not lost """ code, vmis = api_client.vms.get_status() assert code == 200 and len(vmis['data']), (code, vmis) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): fails, ips = list(), dict() for name in cluster_state.vms['names']: code, data = api_client.vms.get_status(name) try: assert 200 == code assert "Running" == data['status']['phase'] assert data['status']['nodeName'] ips[name] = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'nic-1') except (AssertionError, TypeError, StopIteration, KeyError) as ex: fails.append((name, (ex, code, data))) if not fails: break else: raise AssertionError("\n".join( f"VM {name} is not in expected state.\nException: {ex}\nAPI Status({code}): {data}" for (name, (ex, code, data)) in fails) ) pri_key, ssh_user = cluster_state.vms['pkey'], cluster_state.vms['ssh_user'] for name in cluster_state.vms['names'][:-1]: vm_ip = ips[name] endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: with vm_shell.login(vm_ip, ssh_user, pkey=pri_key) as sh: out, err = sh.exec_command("md5sum -c ./generate_file.md5") assert not err, (out, err) md5, err = sh.exec_command("cat ./generate_file.md5") assert not err, (md5, err) assert md5 == cluster_state.vms['md5'] break except (SSHException, NoValidConnectionsError, ConnectionResetError, TimeoutError): sleep(5) else: fails.append(f"Data in VM({name}, {vm_ip}) is inconsistent.") assert not fails, "\n".join(fails) # Teardown: remove all VMs for name in cluster_state.vms['names']: code, data = api_client.vms.get(name) spec = api_client.vms.Spec.from_dict(data) _ = vm_checker.wait_deleted(name) for vol in spec.volumes: vol_name = vol['volume']['persistentVolumeClaim']['claimName'] api_client.volumes.delete(vol_name) @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_vms"]) def test_verify_restore_vm( self, api_client, cluster_state, vm_shell, vm_checker, wait_timeout ): """ Verify VM restored from the backup Criteria: - VM should able to start - data in VM should not lost """ backup_name = cluster_state.vms['names'][0] restored_vm_name = f"new-r-{backup_name}" # Restore VM from backup and check networking is good restore_spec = api_client.backups.RestoreSpec.for_new(restored_vm_name) code, data = api_client.backups.restore(backup_name, restore_spec) assert code == 201, "Unable to restore backup {backup_name} after upgrade" # Check restore VM is created endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get(restored_vm_name) if 200 == code: break sleep(3) else: raise AssertionError( f"restored VM {restored_vm_name} is not created" ) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(restored_vm_name, ["nic-1"]) assert vm_got_ips, ( f"Failed to Start VM({restored_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) # Check data in restored VM is consistent pri_key, ssh_user = cluster_state.vms['pkey'], cluster_state.vms['ssh_user'] vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'nic-1') endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: with vm_shell.login(vm_ip, ssh_user, pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited and not err, (out, err) out, err = sh.exec_command("md5sum -c ./generate_file.md5") assert not err, (out, err) md5, err = sh.exec_command("cat ./generate_file.md5") assert not err, (md5, err) assert md5 == cluster_state.vms['md5'] break except (SSHException, NoValidConnectionsError, ConnectionResetError, TimeoutError): sleep(5) else: raise AssertionError("Unable to login to restored VM to check data consistency") # teardown: remove the VM code, data = api_client.vms.get(restored_vm_name) spec = api_client.vms.Spec.from_dict(data) _ = vm_checker.wait_deleted(restored_vm_name) for vol in spec.volumes: vol_name = vol['volume']['persistentVolumeClaim']['claimName'] api_client.volumes.delete(vol_name) @pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_storageclass"]) def test_verify_storage_class(self, api_client, cluster_state): """ Verify StorageClasses and defaults - `new_sc` should be settle as default - `longhorn` should exists """ code, scs = api_client.scs.get() assert code == 200, ("Failed to get StorageClasses: %d, %s" % (code, scs)) assert len(scs["items"]) > 0, ("No StorageClasses found") created_sc = cluster_state.scs[-1]['metadata']['name'] names = {sc['metadata']['name']: sc['metadata'].get('annotations') for sc in scs['items']} assert "longhorn" in names assert created_sc in names assert "storageclass.kubernetes.io/is-default-class" in names[created_sc] assert "true" == names[created_sc]["storageclass.kubernetes.io/is-default-class"] @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_os_version(self, request, api_client, cluster_state, host_shell): # Verify /etc/os-release on all nodes script = "cat /etc/os-release" if not cluster_state.version_verify: pytest.skip("skip verify os version") # Get all nodes code, data = api_client.hosts.get() assert 200 == code, (code, data) for node in data['data']: node_ip = node["metadata"]["annotations"][NODE_INTERNAL_IP_ANNOTATION] with host_shell.login(node_ip) as sh: lines, stderr = sh.exec_command(script, get_pty=True, splitlines=True) assert not stderr, ( f"Failed to execute {script} on {node_ip}: {stderr}") # eg: PRETTY_NAME="Harvester v1.1.0" assert cluster_state.version == re.findall(r"Harvester (.+?)\"", lines[3])[0], ( "OS version is not correct") @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_rke2_version(self, api_client, host_shell): # Verify node version on all nodes script = "cat /etc/harvester-release.yaml" label_main = "node-role.kubernetes.io/control-plane" code, data = api_client.hosts.get() assert 200 == code, (code, data) masters = [n for n in data['data'] if n['metadata']['labels'].get(label_main) == "true"] # Verify rke2 version except_rke2_version = "" for node in masters: node_ip = node["metadata"]["annotations"][NODE_INTERNAL_IP_ANNOTATION] # Get except rke2 version if except_rke2_version == "": with host_shell.login(node_ip) as sh: lines, stderr = sh.exec_command(script, get_pty=True, splitlines=True) assert not stderr, ( f"Failed to execute {script} on {node_ip}: {stderr}") for line in lines: if "kubernetes" in line: except_rke2_version = re.findall(r"kubernetes: (.*)", line.strip())[0] break assert except_rke2_version != "", ("Failed to get except rke2 version") assert node.get('status', {}).get('nodeInfo', {}).get( "kubeletVersion", "") == except_rke2_version, ( "rke2 version is not correct") @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_deployed_components_version(self, api_client): """ Verify deployed kubevirt and longhorn version Criteria: - except version(get from apps.catalog.cattle.io/harvester) should be equal to the version of kubevirt and longhorn """ kubevirt_version_existed = False engine_image_version_existed = False longhorn_manager_version_existed = False # Get expected image of kubevirt code, app = api_client.get_apps_deployments(name="virt-operator", namespace=DEFAULT_HARVESTER_NAMESPACE) assert code == 200, (code, app) kubevirt_operator_image = app['spec']['template']['spec']['containers'][0]['image'] # Get except image of longhorn code, apps = api_client.get_apps_controllerrevisions(namespace=DEFAULT_LONGHORN_NAMESPACE) assert code == 200, (code, apps) longhorn_images = { "engine-image": "", "longhorn-manager": "" } for lh_app in longhorn_images: for app in apps['data']: if app["id"].startswith(f"{DEFAULT_LONGHORN_NAMESPACE}/{lh_app}"): longhorn_images[lh_app] = ( app["data"]["spec"]["template"]["spec"]["containers"][0]["image"]) break # Verify kubevirt version code, pods = api_client.get_pods(namespace=DEFAULT_HARVESTER_NAMESPACE) assert code == 200 and len(pods['data']) > 0, ( f"Failed to get pods in namespace {DEFAULT_HARVESTER_NAMESPACE}") for pod in pods['data']: if "virt-operator" in pod['metadata']['name']: kubevirt_version_existed = ( kubevirt_operator_image == pod['spec']['containers'][0]['image']) # Verify longhorn version code, pods = api_client.get_pods(namespace=DEFAULT_LONGHORN_NAMESPACE) assert code == 200 and len(pods['data']) > 0, ( f"Failed to get pods in namespace {DEFAULT_LONGHORN_NAMESPACE}") for pod in pods['data']: if "longhorn-manager" in pod['metadata']['name']: longhorn_manager_version_existed = ( longhorn_images["longhorn-manager"] == pod['spec']['containers'][0]['image']) elif "engine-image" in pod['metadata']['name']: engine_image_version_existed = ( longhorn_images["engine-image"] == pod['spec']['containers'][0]['image']) assert kubevirt_version_existed, "kubevirt version is not correct" assert engine_image_version_existed, "longhorn engine image version is not correct" assert longhorn_manager_version_existed, "longhorn manager version is not correct" @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_crds_existed(self, api_client, harvester_crds): """ Verify crds existed Criteria: - crds should be existed """ not_existed_crds = [] exist_crds = True for crd in harvester_crds: code, _ = api_client.get_crds(name=crd) if code != 200: exist_crds = False not_existed_crds.append(crd) if not exist_crds: raise AssertionError(f"CRDs {not_existed_crds} are not existed") @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_upgradelog(self, api_client): """ Verify upgradelog pod and volume existed when upgrade with "Enable Logging" """ # pod code, data = api_client.get_pods(namespace='harvester-system') assert code == 200 and data['data'], (code, data) upgradelog_pods = [pod for pod in data['data'] if 'upgradelog' in pod['id']] assert upgradelog_pods, f"No upgradelog pod found:\n{data['data']}" for pod in upgradelog_pods: assert pod["status"]["phase"] == "Running", (code, upgradelog_pods) # volume code, data = api_client.volumes.get(namespace='harvester-system') assert code == 200 and data['data'], (code, data) upgradelog_vols = [vol for vol in data['data'] if 'upgradelog' in vol['id']] assert upgradelog_vols, f"No upgradelog volume found:\n{data['data']}" for vol in upgradelog_vols: assert not vol["metadata"]['state']['error'], (code, upgradelog_vols) assert not vol["metadata"]['state']['transitioning'], (code, upgradelog_vols) assert vol['status']['phase'] == "Bound", (code, upgradelog_vols) @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_upgrade_vm_deleted(self, api_client, wait_timeout): # max to wait 300s for the upgrade related VMs to be deleted endtime = datetime.now() + timedelta(seconds=min(wait_timeout / 5, 300)) while endtime > datetime.now(): code, data = api_client.vms.get(namespace='harvester-system') upgrade_vms = [vm for vm in data['data'] if 'upgrade' in vm['id']] if not upgrade_vms: break else: raise AssertionError(f"Upgrade related VM still available:\n{upgrade_vms}") @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_upgrade_volume_deleted(self, api_client, wait_timeout): # max to wait 300s for the upgrade related volumes to be deleted endtime = datetime.now() + timedelta(seconds=min(wait_timeout / 5, 300)) while endtime > datetime.now(): code, data = api_client.volumes.get(namespace='harvester-system') upgrade_vols = [vol for vol in data['data'] if 'upgrade' in vol['id'] and 'log-archive' not in vol['id']] if not upgrade_vols: break else: raise AssertionError(f"Upgrade related volume(s) still available:\n{upgrade_vols}") @pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_upgrade_image_deleted(self, api_client, wait_timeout): # max to wait 300s for the upgrade related volumes to be deleted endtime = datetime.now() + timedelta(seconds=min(wait_timeout / 5, 300)) while endtime > datetime.now(): code, data = api_client.images.get(namespace='harvester-system') upgrade_images = [image for image in data['items'] if 'upgrade' in image['spec']['displayName']] if not upgrade_images: break else: raise AssertionError(f"Upgrade related image(s) still available:\n{upgrade_images}")
Class variables
var pytestmark
Methods
def test_perform_upgrade(self,
api_client,
unique_name,
upgrade_target,
upgrade_timeout,
interceptor,
upgrade_checker)-
Expand source code
@pytest.mark.dependency(name="any_nodes_upgrade") def test_perform_upgrade( self, api_client, unique_name, upgrade_target, upgrade_timeout, interceptor, upgrade_checker ): """ - perform upgrade - check all nodes upgraded """ # Check nodes counts code, data = api_client.hosts.get() assert code == 200, (code, data) nodes = len(data['data']) # create Upgrade version and start skip_version_check = {"harvesterhci.io/skip-version-check": True} # for test purpose version, url, checksum = upgrade_target version = f"{version}-{unique_name}" code, data = api_client.versions.create(version, url, checksum) assert 201 == code, f"Failed to create upgrade for {version}" version_created, (code, data) = upgrade_checker.wait_version_created(version) assert version_created, (code, data) code, data = api_client.upgrades.create(version, annotations=skip_version_check) assert 201 == code, f"Failed to start upgrade for {version}" upgrade_name = data['metadata']['name'] # Check upgrade status # TODO: check every upgrade stages endtime = datetime.now() + timedelta(seconds=upgrade_timeout * nodes) while endtime > datetime.now(): code, data = api_client.upgrades.get(upgrade_name) if 200 != code: continue interceptor.check(data) conds = dict((c['type'], c) for c in data.get('status', {}).get('conditions', [])) state = data.get('metadata', {}).get('labels', {}).get('harvesterhci.io/upgradeState') if "Succeeded" == state and "True" == conds.get('Completed', {}).get('status'): break if any("False" == c['status'] for c in conds.values()): raise AssertionError(f"Upgrade failed with conditions: {conds.values()}") sleep(30) else: raise AssertionError( f"Upgrade timed out with conditions: {conds.values()}\n" f"API Status({code}): {data}" )
- perform upgrade
- check all nodes upgraded
def test_preq_logging_pods(self, api_client, logging_addon, wait_timeout, sleep_timeout)
-
Expand source code
@pytest.mark.dependency(name="preq_setup_logging") def test_preq_logging_pods(self, api_client, logging_addon, wait_timeout, sleep_timeout): # logging is an addon instead of built-in since v1.2.0 if api_client.cluster_version.release >= (1, 2, 0): addon = "/".join([logging_addon.namespace, logging_addon.name]) code, data = api_client.addons.get(addon) assert 200 == code, (code, data) if not data.get('status', {}).get('status') in logging_addon.enable_statuses: code, data = api_client.addons.enable(addon) assert 200 == code, (code, data) assert data.get('spec', {}).get('enabled', False), (code, data) logging_addon.enable_toggled = True endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.addons.get(addon) if data.get('status', {}).get('status') in logging_addon.enable_statuses: break sleep(sleep_timeout) else: raise AssertionError( f"Failed to enable addon {addon} with {wait_timeout} timed out\n" f"API Status({code}): {data}" ) code, pods = api_client.get_pods(namespace=logging_addon.namespace) assert code == 200 and len(pods['data']) > 0, "No logging pods found" fails = [] for pod in pods['data']: phase = pod["status"]["phase"] if phase not in ("Running", "Succeeded"): fails.append((pod['metadata']['name'], phase)) else: assert not fails, ( "\n".join(f"Pod({n})'s phase({p}) is not expected." for n, p in fails) )
def test_preq_setup_storageclass(self, config_storageclass)
-
Expand source code
@pytest.mark.dependency(name="preq_setup_storageclass") def test_preq_setup_storageclass(self, config_storageclass): """ Be used to trigger the fixture to setup storageclass"""
Be used to trigger the fixture to setup storageclass
def test_preq_setup_vmnetwork(self, vm_network)
-
Expand source code
@pytest.mark.dependency(name="preq_setup_vmnetwork") def test_preq_setup_vmnetwork(self, vm_network): ''' Be used to trigger the fixture to setup VM network '''
Be used to trigger the fixture to setup VM network
def test_preq_setup_vms(self,
api_client,
ssh_keypair,
unique_name,
vm_checker,
vm_shell,
vm_network,
image,
config_storageclass,
config_backup_target,
wait_timeout,
cluster_state)-
Expand source code
@pytest.mark.dependency(name="preq_setup_vms") def test_preq_setup_vms( self, api_client, ssh_keypair, unique_name, vm_checker, vm_shell, vm_network, image, config_storageclass, config_backup_target, wait_timeout, cluster_state ): # create new storage class, make it default # create 3 VMs: # - having the new storage class # - the VM that have some data written, take backup # - the VM restored from the backup pub_key, pri_key = ssh_keypair old_sc, new_sc = config_storageclass unique_vm_name = f"ug-vm-{unique_name}" cpu, mem, size = 1, 2, 5 vm_spec = api_client.vms.Spec(cpu, mem, mgmt_network=False) vm_spec.add_image('disk-0', image['id'], size=size) vm_spec.add_network('nic-1', f"{vm_network['namespace']}/{vm_network['name']}") userdata = yaml.safe_load(vm_spec.user_data) userdata['ssh_authorized_keys'] = [pub_key] vm_spec.user_data = yaml.dump(userdata) code, data = api_client.vms.create(unique_vm_name, vm_spec) assert 201 == code, (code, data) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ["nic-1"]) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'nic-1') # write data into VM endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: with vm_shell.login(vm_ip, image['user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited and not err, (out, err) out, err = sh.exec_command( "dd if=/dev/urandom of=./generate_file bs=1M count=1024; sync" ) assert not out, (out, err) vm1_md5, err = sh.exec_command( "md5sum ./generate_file > ./generate_file.md5; cat ./generate_file.md5" ) assert not err, (vm1_md5, err) break except (SSHException, NoValidConnectionsError, TimeoutError): sleep(5) else: raise AssertionError("Timed out while writing data into VM") # Take backup then check it's ready code, data = api_client.vms.backup(unique_vm_name, unique_vm_name) assert 204 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, backup = api_client.backups.get(unique_vm_name) if 200 == code and backup.get('status', {}).get('readyToUse'): break sleep(3) else: raise AssertionError( f'Timed-out waiting for the backup \'{unique_vm_name}\' to be ready.' ) # restore into new VM restored_vm_name = f"r-{unique_vm_name}" spec = api_client.backups.RestoreSpec.for_new(restored_vm_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, (code, data) # Check restore VM is created endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get(restored_vm_name) if 200 == code: break sleep(3) else: raise AssertionError( f"restored VM {restored_vm_name} is not created" ) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(restored_vm_name, ["nic-1"]) assert vm_got_ips, ( f"Failed to Start VM({restored_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) # Check data consistency r_vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'nic-1') endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: with vm_shell.login(r_vm_ip, image['user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited and not err, (out, err) out, err = sh.exec_command("md5sum -c ./generate_file.md5") assert not err, (out, err) vm2_md5, err = sh.exec_command("cat ./generate_file.md5") assert not err, (vm2_md5, err) assert vm1_md5 == vm2_md5 out, err = sh.exec_command( f"ping -c1 {vm_ip} > /dev/null && echo -n success || echo -n fail" ) assert "success" == out and not err break except (SSHException, NoValidConnectionsError, ConnectionResetError, TimeoutError): sleep(5) else: raise AssertionError("Unable to login to restored VM to check data consistency") # Create VM having additional volume with new storage class vm_spec.add_volume("vol-1", 5, storage_cls=new_sc['metadata']['name']) code, data = api_client.vms.create(f"sc-{unique_vm_name}", vm_spec) assert 201 == code, (code, data) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(f"sc-{unique_vm_name}", ["nic-1"]) assert vm_got_ips, ( f"Failed to Start VM(sc-{unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) # store into cluster's state names = [unique_vm_name, f"r-{unique_vm_name}", f"sc-{unique_vm_name}"] cluster_state.vms = dict(md5=vm1_md5, names=names, ssh_user=image['user'], pkey=pri_key)
def test_upgrade_image_deleted(self, api_client, wait_timeout)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_upgrade_image_deleted(self, api_client, wait_timeout): # max to wait 300s for the upgrade related volumes to be deleted endtime = datetime.now() + timedelta(seconds=min(wait_timeout / 5, 300)) while endtime > datetime.now(): code, data = api_client.images.get(namespace='harvester-system') upgrade_images = [image for image in data['items'] if 'upgrade' in image['spec']['displayName']] if not upgrade_images: break else: raise AssertionError(f"Upgrade related image(s) still available:\n{upgrade_images}")
def test_upgrade_vm_deleted(self, api_client, wait_timeout)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_upgrade_vm_deleted(self, api_client, wait_timeout): # max to wait 300s for the upgrade related VMs to be deleted endtime = datetime.now() + timedelta(seconds=min(wait_timeout / 5, 300)) while endtime > datetime.now(): code, data = api_client.vms.get(namespace='harvester-system') upgrade_vms = [vm for vm in data['data'] if 'upgrade' in vm['id']] if not upgrade_vms: break else: raise AssertionError(f"Upgrade related VM still available:\n{upgrade_vms}")
def test_upgrade_volume_deleted(self, api_client, wait_timeout)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_upgrade_volume_deleted(self, api_client, wait_timeout): # max to wait 300s for the upgrade related volumes to be deleted endtime = datetime.now() + timedelta(seconds=min(wait_timeout / 5, 300)) while endtime > datetime.now(): code, data = api_client.volumes.get(namespace='harvester-system') upgrade_vols = [vol for vol in data['data'] if 'upgrade' in vol['id'] and 'log-archive' not in vol['id']] if not upgrade_vols: break else: raise AssertionError(f"Upgrade related volume(s) still available:\n{upgrade_vols}")
def test_verify_audit_log(self, api_client, host_shell, wait_timeout)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_audit_log(self, api_client, host_shell, wait_timeout): code, data = api_client.hosts.get() assert 200 == code, (code, data) label_main = "node-role.kubernetes.io/control-plane" masters = [n for n in data['data'] if n['metadata']['labels'].get(label_main) == "true"] assert len(masters) > 0, "No master nodes found" script = ("sudo tail /var/lib/rancher/rke2/server/logs/audit.log | awk 'END{print}' " "| jq .requestReceivedTimestamp " "| xargs -I {} date -d \"{}\" +%s") node_ips = [n["metadata"]["annotations"][NODE_INTERNAL_IP_ANNOTATION] for n in masters] cmp = dict() done = set() endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): for ip in done.symmetric_difference(node_ips): try: with host_shell.login(ip) as shell: out, err = shell.exec_command(script) timestamp = int(out) if not err and ip not in cmp: cmp[ip] = timestamp continue if not err and cmp[ip] < timestamp: done.add(ip) except (SSHException, NoValidConnectionsError, ConnectionResetError, TimeoutError): continue if not done.symmetric_difference(node_ips): break sleep(5) else: raise AssertionError( "\n".join("Node {ip} audit log is not updated." for ip in set(node_ips) ^ done) )
def test_verify_crds_existed(self, api_client, harvester_crds)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_crds_existed(self, api_client, harvester_crds): """ Verify crds existed Criteria: - crds should be existed """ not_existed_crds = [] exist_crds = True for crd in harvester_crds: code, _ = api_client.get_crds(name=crd) if code != 200: exist_crds = False not_existed_crds.append(crd) if not exist_crds: raise AssertionError(f"CRDs {not_existed_crds} are not existed")
Verify crds existed Criteria: - crds should be existed
def test_verify_deployed_components_version(self, api_client)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_deployed_components_version(self, api_client): """ Verify deployed kubevirt and longhorn version Criteria: - except version(get from apps.catalog.cattle.io/harvester) should be equal to the version of kubevirt and longhorn """ kubevirt_version_existed = False engine_image_version_existed = False longhorn_manager_version_existed = False # Get expected image of kubevirt code, app = api_client.get_apps_deployments(name="virt-operator", namespace=DEFAULT_HARVESTER_NAMESPACE) assert code == 200, (code, app) kubevirt_operator_image = app['spec']['template']['spec']['containers'][0]['image'] # Get except image of longhorn code, apps = api_client.get_apps_controllerrevisions(namespace=DEFAULT_LONGHORN_NAMESPACE) assert code == 200, (code, apps) longhorn_images = { "engine-image": "", "longhorn-manager": "" } for lh_app in longhorn_images: for app in apps['data']: if app["id"].startswith(f"{DEFAULT_LONGHORN_NAMESPACE}/{lh_app}"): longhorn_images[lh_app] = ( app["data"]["spec"]["template"]["spec"]["containers"][0]["image"]) break # Verify kubevirt version code, pods = api_client.get_pods(namespace=DEFAULT_HARVESTER_NAMESPACE) assert code == 200 and len(pods['data']) > 0, ( f"Failed to get pods in namespace {DEFAULT_HARVESTER_NAMESPACE}") for pod in pods['data']: if "virt-operator" in pod['metadata']['name']: kubevirt_version_existed = ( kubevirt_operator_image == pod['spec']['containers'][0]['image']) # Verify longhorn version code, pods = api_client.get_pods(namespace=DEFAULT_LONGHORN_NAMESPACE) assert code == 200 and len(pods['data']) > 0, ( f"Failed to get pods in namespace {DEFAULT_LONGHORN_NAMESPACE}") for pod in pods['data']: if "longhorn-manager" in pod['metadata']['name']: longhorn_manager_version_existed = ( longhorn_images["longhorn-manager"] == pod['spec']['containers'][0]['image']) elif "engine-image" in pod['metadata']['name']: engine_image_version_existed = ( longhorn_images["engine-image"] == pod['spec']['containers'][0]['image']) assert kubevirt_version_existed, "kubevirt version is not correct" assert engine_image_version_existed, "longhorn engine image version is not correct" assert longhorn_manager_version_existed, "longhorn manager version is not correct"
Verify deployed kubevirt and longhorn version Criteria: - except version(get from apps.catalog.cattle.io/harvester) should be equal to the version of kubevirt and longhorn
def test_verify_logging_pods(self, api_client, logging_addon)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_logging"]) def test_verify_logging_pods(self, api_client, logging_addon): """ Verify logging pods and logs Criteria: https://github.com/harvester/tests/issues/535 """ # logging is an addon instead of built-in since v1.2.0 if api_client.cluster_version.release >= (1, 2, 0): addon = "/".join([logging_addon.namespace, logging_addon.name]) code, data = api_client.addons.get(addon) assert data.get('status', {}).get('status') in logging_addon.enable_statuses code, pods = api_client.get_pods(namespace=logging_addon.namespace) assert code == 200 and len(pods['data']) > 0, "No logging pods found" fails = [] for pod in pods['data']: phase = pod["status"]["phase"] if phase not in ("Running", "Succeeded"): fails.append((pod['metadata']['name'], phase)) else: assert not fails, ( "\n".join(f"Pod({n})'s phase({p}) is not expected." for n, p in fails) ) # teardown if logging_addon.enable_toggled: api_client.addons.disable(addon)
Verify logging pods and logs Criteria: https://github.com/harvester/tests/issues/535
def test_verify_network(self, api_client, cluster_state)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_vmnetwork"]) def test_verify_network(self, api_client, cluster_state): """ Verify cluster and VLAN networks - cluster network `mgmt` should exists - Created VLAN should exists """ code, cnets = api_client.clusternetworks.get() assert code == 200, ( "Failed to get Networks: %d, %s" % (code, cnets)) assert len(cnets["items"]) > 0, ("No Networks found") assert any(n['metadata']['name'] == "mgmt" for n in cnets['items']), ( "Cluster network mgmt not found") code, vnets = api_client.networks.get() assert code == 200, (f"Failed to get VLANs: {code}, {vnets}" % (code, vnets)) assert len(vnets["items"]) > 0, ("No VLANs found") used_vlan = cluster_state.network['metadata']['name'] assert any(used_vlan == n['metadata']['name'] for n in vnets['items']), ( f"VLAN {used_vlan} not found")
Verify cluster and VLAN networks - cluster network
mgmt
should exists - Created VLAN should exists def test_verify_os_version(self, request, api_client, cluster_state, host_shell)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_os_version(self, request, api_client, cluster_state, host_shell): # Verify /etc/os-release on all nodes script = "cat /etc/os-release" if not cluster_state.version_verify: pytest.skip("skip verify os version") # Get all nodes code, data = api_client.hosts.get() assert 200 == code, (code, data) for node in data['data']: node_ip = node["metadata"]["annotations"][NODE_INTERNAL_IP_ANNOTATION] with host_shell.login(node_ip) as sh: lines, stderr = sh.exec_command(script, get_pty=True, splitlines=True) assert not stderr, ( f"Failed to execute {script} on {node_ip}: {stderr}") # eg: PRETTY_NAME="Harvester v1.1.0" assert cluster_state.version == re.findall(r"Harvester (.+?)\"", lines[3])[0], ( "OS version is not correct")
def test_verify_restore_vm(self, api_client, cluster_state, vm_shell, vm_checker, wait_timeout)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_vms"]) def test_verify_restore_vm( self, api_client, cluster_state, vm_shell, vm_checker, wait_timeout ): """ Verify VM restored from the backup Criteria: - VM should able to start - data in VM should not lost """ backup_name = cluster_state.vms['names'][0] restored_vm_name = f"new-r-{backup_name}" # Restore VM from backup and check networking is good restore_spec = api_client.backups.RestoreSpec.for_new(restored_vm_name) code, data = api_client.backups.restore(backup_name, restore_spec) assert code == 201, "Unable to restore backup {backup_name} after upgrade" # Check restore VM is created endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get(restored_vm_name) if 200 == code: break sleep(3) else: raise AssertionError( f"restored VM {restored_vm_name} is not created" ) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(restored_vm_name, ["nic-1"]) assert vm_got_ips, ( f"Failed to Start VM({restored_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) # Check data in restored VM is consistent pri_key, ssh_user = cluster_state.vms['pkey'], cluster_state.vms['ssh_user'] vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'nic-1') endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: with vm_shell.login(vm_ip, ssh_user, pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited and not err, (out, err) out, err = sh.exec_command("md5sum -c ./generate_file.md5") assert not err, (out, err) md5, err = sh.exec_command("cat ./generate_file.md5") assert not err, (md5, err) assert md5 == cluster_state.vms['md5'] break except (SSHException, NoValidConnectionsError, ConnectionResetError, TimeoutError): sleep(5) else: raise AssertionError("Unable to login to restored VM to check data consistency") # teardown: remove the VM code, data = api_client.vms.get(restored_vm_name) spec = api_client.vms.Spec.from_dict(data) _ = vm_checker.wait_deleted(restored_vm_name) for vol in spec.volumes: vol_name = vol['volume']['persistentVolumeClaim']['claimName'] api_client.volumes.delete(vol_name)
Verify VM restored from the backup Criteria: - VM should able to start - data in VM should not lost
def test_verify_rke2_version(self, api_client, host_shell)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_rke2_version(self, api_client, host_shell): # Verify node version on all nodes script = "cat /etc/harvester-release.yaml" label_main = "node-role.kubernetes.io/control-plane" code, data = api_client.hosts.get() assert 200 == code, (code, data) masters = [n for n in data['data'] if n['metadata']['labels'].get(label_main) == "true"] # Verify rke2 version except_rke2_version = "" for node in masters: node_ip = node["metadata"]["annotations"][NODE_INTERNAL_IP_ANNOTATION] # Get except rke2 version if except_rke2_version == "": with host_shell.login(node_ip) as sh: lines, stderr = sh.exec_command(script, get_pty=True, splitlines=True) assert not stderr, ( f"Failed to execute {script} on {node_ip}: {stderr}") for line in lines: if "kubernetes" in line: except_rke2_version = re.findall(r"kubernetes: (.*)", line.strip())[0] break assert except_rke2_version != "", ("Failed to get except rke2 version") assert node.get('status', {}).get('nodeInfo', {}).get( "kubeletVersion", "") == except_rke2_version, ( "rke2 version is not correct")
def test_verify_storage_class(self, api_client, cluster_state)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_storageclass"]) def test_verify_storage_class(self, api_client, cluster_state): """ Verify StorageClasses and defaults - `new_sc` should be settle as default - `longhorn` should exists """ code, scs = api_client.scs.get() assert code == 200, ("Failed to get StorageClasses: %d, %s" % (code, scs)) assert len(scs["items"]) > 0, ("No StorageClasses found") created_sc = cluster_state.scs[-1]['metadata']['name'] names = {sc['metadata']['name']: sc['metadata'].get('annotations') for sc in scs['items']} assert "longhorn" in names assert created_sc in names assert "storageclass.kubernetes.io/is-default-class" in names[created_sc] assert "true" == names[created_sc]["storageclass.kubernetes.io/is-default-class"]
Verify StorageClasses and defaults -
new_sc
should be settle as default -longhorn
should exists def test_verify_upgradelog(self, api_client)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade"]) def test_verify_upgradelog(self, api_client): """ Verify upgradelog pod and volume existed when upgrade with "Enable Logging" """ # pod code, data = api_client.get_pods(namespace='harvester-system') assert code == 200 and data['data'], (code, data) upgradelog_pods = [pod for pod in data['data'] if 'upgradelog' in pod['id']] assert upgradelog_pods, f"No upgradelog pod found:\n{data['data']}" for pod in upgradelog_pods: assert pod["status"]["phase"] == "Running", (code, upgradelog_pods) # volume code, data = api_client.volumes.get(namespace='harvester-system') assert code == 200 and data['data'], (code, data) upgradelog_vols = [vol for vol in data['data'] if 'upgradelog' in vol['id']] assert upgradelog_vols, f"No upgradelog volume found:\n{data['data']}" for vol in upgradelog_vols: assert not vol["metadata"]['state']['error'], (code, upgradelog_vols) assert not vol["metadata"]['state']['transitioning'], (code, upgradelog_vols) assert vol['status']['phase'] == "Bound", (code, upgradelog_vols)
Verify upgradelog pod and volume existed when upgrade with "Enable Logging"
def test_verify_vms(self, api_client, cluster_state, vm_shell, vm_checker, wait_timeout)
-
Expand source code
@pytest.mark.dependency(depends=["any_nodes_upgrade", "preq_setup_vms"]) def test_verify_vms(self, api_client, cluster_state, vm_shell, vm_checker, wait_timeout): """ Verify VMs' state and data Criteria: - VMs should keep in running state - data in VMs should not lost """ code, vmis = api_client.vms.get_status() assert code == 200 and len(vmis['data']), (code, vmis) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): fails, ips = list(), dict() for name in cluster_state.vms['names']: code, data = api_client.vms.get_status(name) try: assert 200 == code assert "Running" == data['status']['phase'] assert data['status']['nodeName'] ips[name] = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'nic-1') except (AssertionError, TypeError, StopIteration, KeyError) as ex: fails.append((name, (ex, code, data))) if not fails: break else: raise AssertionError("\n".join( f"VM {name} is not in expected state.\nException: {ex}\nAPI Status({code}): {data}" for (name, (ex, code, data)) in fails) ) pri_key, ssh_user = cluster_state.vms['pkey'], cluster_state.vms['ssh_user'] for name in cluster_state.vms['names'][:-1]: vm_ip = ips[name] endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: with vm_shell.login(vm_ip, ssh_user, pkey=pri_key) as sh: out, err = sh.exec_command("md5sum -c ./generate_file.md5") assert not err, (out, err) md5, err = sh.exec_command("cat ./generate_file.md5") assert not err, (md5, err) assert md5 == cluster_state.vms['md5'] break except (SSHException, NoValidConnectionsError, ConnectionResetError, TimeoutError): sleep(5) else: fails.append(f"Data in VM({name}, {vm_ip}) is inconsistent.") assert not fails, "\n".join(fails) # Teardown: remove all VMs for name in cluster_state.vms['names']: code, data = api_client.vms.get(name) spec = api_client.vms.Spec.from_dict(data) _ = vm_checker.wait_deleted(name) for vol in spec.volumes: vol_name = vol['volume']['persistentVolumeClaim']['claimName'] api_client.volumes.delete(vol_name)
Verify VMs' state and data Criteria: - VMs should keep in running state - data in VMs should not lost
class TestInvalidUpgrade
-
Expand source code
@pytest.mark.upgrade @pytest.mark.negative @pytest.mark.any_nodes class TestInvalidUpgrade: def test_iso_url(self, api_client, unique_name, upgrade_checker): """ Steps: 1. Create an invalid manifest. 2. Try to upgrade with the invalid manifest. 3. Upgrade should not start and fail. """ version, url = unique_name, "https://invalid_iso_url" checksum = sha512(b'not_a_valid_checksum').hexdigest() code, data = api_client.versions.get(version) if code != 200: code, data = api_client.versions.create(version, url, checksum) assert code == 201, f"Failed to create invalid version: {data}" version_created, (code, data) = upgrade_checker.wait_version_created(version) assert version_created, (code, data) code, data = api_client.upgrades.create(version) assert code == 201, f"Failed to create invalid upgrade: {data}" upgrade_name = data['metadata']['name'] upgrade_fail_by_invalid_iso_url, (code, data) = \ upgrade_checker.wait_upgrade_fail_by_invalid_iso_url(upgrade_name) assert upgrade_fail_by_invalid_iso_url, (code, data) # teardown api_client.upgrades.delete(upgrade_name) api_client.versions.delete(version) @pytest.mark.parametrize( "resort", [slice(None, None, -1), slice(None, None, 2)], ids=("mismatched", "invalid") ) def test_checksum(self, api_client, unique_name, upgrade_target, resort, upgrade_checker): version, url, checksum = upgrade_target version = f"{version}-{unique_name}" if resort.step == 2: # ref: https://github.com/harvester/harvester/issues/5480 code, data = api_client.versions.create(version, url, checksum[resort]) try: assert 400 == code, (code, data) finally: return api_client.versions.delete(version) code, data = api_client.versions.create(version, url, checksum[resort]) assert 201 == code, f"Failed to create upgrade for {version}" version_created, (code, data) = upgrade_checker.wait_version_created(version) assert version_created, (code, data) code, data = api_client.upgrades.create(version) assert 201 == code, f"Failed to start upgrade for {version}" upgrade_name = data['metadata']['name'] upgrade_fail_by_invalid_checksum, (code, data) = \ upgrade_checker.wait_upgrade_fail_by_invalid_checksum(upgrade_name) assert upgrade_fail_by_invalid_checksum, (code, data) # teardown api_client.upgrades.delete(upgrade_name) api_client.versions.delete(version) @pytest.mark.skip("https://github.com/harvester/harvester/issues/5494") def test_version_compatibility( self, api_client, unique_name, upgrade_target, upgrade_timeout ): version, url, checksum = upgrade_target version = f"{version}-{unique_name}" code, data = api_client.versions.create(version, url, checksum) assert 201 == code, f"Failed to create upgrade for {version}" code, data = api_client.upgrades.create(version) assert 201 == code, f"Failed to start upgrade for {version}" endtime = datetime.now() + timedelta(seconds=upgrade_timeout) while endtime > datetime.now(): code, data = api_client.upgrades.get(data['metadata']['name']) conds = dict((c['type'], c) for c in data.get('status', {}).get('conditions', [])) verified = [] # TODO if all(verified): break else: raise AssertionError(f"Upgrade NOT failed in expected conditions: {conds}") # teardown api_client.upgrades.delete(data['metadata']['name']) api_client.versions.delete(version) def test_degraded_volume( self, api_client, unique_name, vm_shell_from_host, upgrade_target, stopped_vm, vm_checker, volume_checker, upgrade_checker ): """ Criteria: create upgrade should fails if there are any degraded volumes Steps: 1. Create a VM using a volume with 3 replicas. 2. Delete one replica of the volume. Let the volume stay in degraded state. 3. Immediately upgrade Harvester. 4. Upgrade should fail. """ # https://github.com/harvester/harvester/issues/6425 code, data = api_client.hosts.get() assert 200 == code, (code, data) if (cluster_size := len(data['data'])) < 3: pytest.skip( f"Degraded volumes only checked on 3+ nodes cluster, skip on {cluster_size}." ) vm_name, ssh_user, pri_key = stopped_vm vm_started, (code, vmi) = vm_checker.wait_started(vm_name) assert vm_started, (code, vmi) # Write date into VM vm_ip = next(iface['ipAddress'] for iface in vmi['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(vmi['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') with vm_shell_from_host(host_ip, vm_ip, ssh_user, pkey=pri_key) as sh: stdout, stderr = sh.exec_command( "dd if=/dev/urandom of=./generate_file bs=1M count=1024; sync" ) assert not stdout, (stdout, stderr) # Get pv name of the volume claim_name = vmi["spec"]["volumes"][0]["persistentVolumeClaim"]["claimName"] code, data = api_client.volumes.get(name=claim_name) assert code == 200, f"Failed to get volume {claim_name}: {data}" pv_name = data["spec"]["volumeName"] # Make the volume becomes degraded code, data = api_client.lhreplicas.get() assert code == 200 and data['items'], f"Failed to get longhorn replicas ({code}): {data}" replica = next(r for r in data["items"] if pv_name == r['spec']['volumeName']) api_client.lhreplicas.delete(name=replica['metadata']['name']) lhvolume_degraded = volume_checker.wait_lhvolume_degraded(pv_name) assert lhvolume_degraded, (code, data) # create upgrade and verify it is not allowed version, url, checksum = upgrade_target version = f"{version}-{unique_name}" code, data = api_client.versions.create(version, url, checksum) assert code == 201, f"Failed to create version {version}: {data}" version_created, (code, data) = upgrade_checker.wait_version_created(version) assert version_created, (code, data) code, data = api_client.upgrades.create(version) assert code == 400, f"Failed to verify degraded volume: {code}, {data}" # Teardown invalid upgrade api_client.versions.delete(version)
Class variables
var pytestmark
Methods
def test_checksum(self, api_client, unique_name, upgrade_target, resort, upgrade_checker)
-
Expand source code
@pytest.mark.parametrize( "resort", [slice(None, None, -1), slice(None, None, 2)], ids=("mismatched", "invalid") ) def test_checksum(self, api_client, unique_name, upgrade_target, resort, upgrade_checker): version, url, checksum = upgrade_target version = f"{version}-{unique_name}" if resort.step == 2: # ref: https://github.com/harvester/harvester/issues/5480 code, data = api_client.versions.create(version, url, checksum[resort]) try: assert 400 == code, (code, data) finally: return api_client.versions.delete(version) code, data = api_client.versions.create(version, url, checksum[resort]) assert 201 == code, f"Failed to create upgrade for {version}" version_created, (code, data) = upgrade_checker.wait_version_created(version) assert version_created, (code, data) code, data = api_client.upgrades.create(version) assert 201 == code, f"Failed to start upgrade for {version}" upgrade_name = data['metadata']['name'] upgrade_fail_by_invalid_checksum, (code, data) = \ upgrade_checker.wait_upgrade_fail_by_invalid_checksum(upgrade_name) assert upgrade_fail_by_invalid_checksum, (code, data) # teardown api_client.upgrades.delete(upgrade_name) api_client.versions.delete(version)
def test_degraded_volume(self,
api_client,
unique_name,
vm_shell_from_host,
upgrade_target,
stopped_vm,
vm_checker,
volume_checker,
upgrade_checker)-
Expand source code
def test_degraded_volume( self, api_client, unique_name, vm_shell_from_host, upgrade_target, stopped_vm, vm_checker, volume_checker, upgrade_checker ): """ Criteria: create upgrade should fails if there are any degraded volumes Steps: 1. Create a VM using a volume with 3 replicas. 2. Delete one replica of the volume. Let the volume stay in degraded state. 3. Immediately upgrade Harvester. 4. Upgrade should fail. """ # https://github.com/harvester/harvester/issues/6425 code, data = api_client.hosts.get() assert 200 == code, (code, data) if (cluster_size := len(data['data'])) < 3: pytest.skip( f"Degraded volumes only checked on 3+ nodes cluster, skip on {cluster_size}." ) vm_name, ssh_user, pri_key = stopped_vm vm_started, (code, vmi) = vm_checker.wait_started(vm_name) assert vm_started, (code, vmi) # Write date into VM vm_ip = next(iface['ipAddress'] for iface in vmi['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(vmi['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') with vm_shell_from_host(host_ip, vm_ip, ssh_user, pkey=pri_key) as sh: stdout, stderr = sh.exec_command( "dd if=/dev/urandom of=./generate_file bs=1M count=1024; sync" ) assert not stdout, (stdout, stderr) # Get pv name of the volume claim_name = vmi["spec"]["volumes"][0]["persistentVolumeClaim"]["claimName"] code, data = api_client.volumes.get(name=claim_name) assert code == 200, f"Failed to get volume {claim_name}: {data}" pv_name = data["spec"]["volumeName"] # Make the volume becomes degraded code, data = api_client.lhreplicas.get() assert code == 200 and data['items'], f"Failed to get longhorn replicas ({code}): {data}" replica = next(r for r in data["items"] if pv_name == r['spec']['volumeName']) api_client.lhreplicas.delete(name=replica['metadata']['name']) lhvolume_degraded = volume_checker.wait_lhvolume_degraded(pv_name) assert lhvolume_degraded, (code, data) # create upgrade and verify it is not allowed version, url, checksum = upgrade_target version = f"{version}-{unique_name}" code, data = api_client.versions.create(version, url, checksum) assert code == 201, f"Failed to create version {version}: {data}" version_created, (code, data) = upgrade_checker.wait_version_created(version) assert version_created, (code, data) code, data = api_client.upgrades.create(version) assert code == 400, f"Failed to verify degraded volume: {code}, {data}" # Teardown invalid upgrade api_client.versions.delete(version)
Criteria: create upgrade should fails if there are any degraded volumes Steps: 1. Create a VM using a volume with 3 replicas. 2. Delete one replica of the volume. Let the volume stay in degraded state. 3. Immediately upgrade Harvester. 4. Upgrade should fail.
def test_iso_url(self, api_client, unique_name, upgrade_checker)
-
Expand source code
def test_iso_url(self, api_client, unique_name, upgrade_checker): """ Steps: 1. Create an invalid manifest. 2. Try to upgrade with the invalid manifest. 3. Upgrade should not start and fail. """ version, url = unique_name, "https://invalid_iso_url" checksum = sha512(b'not_a_valid_checksum').hexdigest() code, data = api_client.versions.get(version) if code != 200: code, data = api_client.versions.create(version, url, checksum) assert code == 201, f"Failed to create invalid version: {data}" version_created, (code, data) = upgrade_checker.wait_version_created(version) assert version_created, (code, data) code, data = api_client.upgrades.create(version) assert code == 201, f"Failed to create invalid upgrade: {data}" upgrade_name = data['metadata']['name'] upgrade_fail_by_invalid_iso_url, (code, data) = \ upgrade_checker.wait_upgrade_fail_by_invalid_iso_url(upgrade_name) assert upgrade_fail_by_invalid_iso_url, (code, data) # teardown api_client.upgrades.delete(upgrade_name) api_client.versions.delete(version)
Steps: 1. Create an invalid manifest. 2. Try to upgrade with the invalid manifest. 3. Upgrade should not start and fail.
def test_version_compatibility(self, api_client, unique_name, upgrade_target, upgrade_timeout)
-
Expand source code
@pytest.mark.skip("https://github.com/harvester/harvester/issues/5494") def test_version_compatibility( self, api_client, unique_name, upgrade_target, upgrade_timeout ): version, url, checksum = upgrade_target version = f"{version}-{unique_name}" code, data = api_client.versions.create(version, url, checksum) assert 201 == code, f"Failed to create upgrade for {version}" code, data = api_client.upgrades.create(version) assert 201 == code, f"Failed to start upgrade for {version}" endtime = datetime.now() + timedelta(seconds=upgrade_timeout) while endtime > datetime.now(): code, data = api_client.upgrades.get(data['metadata']['name']) conds = dict((c['type'], c) for c in data.get('status', {}).get('conditions', [])) verified = [] # TODO if all(verified): break else: raise AssertionError(f"Upgrade NOT failed in expected conditions: {conds}") # teardown api_client.upgrades.delete(data['metadata']['name']) api_client.versions.delete(version)