Module harvester_e2e_tests.integrations.test_4_vm_host_powercycle
Functions
def available_node_names(api_client)
def focal_image(api_client, unique_name, image_ubuntu, wait_timeout)
def focal_vm(api_client, focal_image, wait_timeout)
def test_delete_vm_after_host_shutdown(api_client, host_state, wait_timeout, focal_vm, available_node_names)
-
To cover test: - https://harvester.github.io/tests/manual/hosts/delete_vm_after_host_shutdown
Prerequisite
- Cluster's nodes >= 2
Steps
- Create a VM with 1 CPU 1 Memory and runStrategy is
RerunOnFailure
- Power off the node hosting the VM
- Delete the VM
- Verify the VM
Exepected Result:
- VM should created and started successfully
- Node should be unavailable after shutdown
- VM should able to be deleted
def test_maintenance_mode_trigger_vm_migrate(api_client, focal_vm, wait_timeout, available_node_names)
def test_poweroff_node_trigger_vm_reschedule(api_client, host_state, focal_vm, wait_timeout, available_node_names, vm_force_reset_policy)
-
To cover test: - https://harvester.github.io/tests/manual/hosts/vm_rescheduled_after_host_poweroff
Prerequisite
- Cluster's nodes >= 2
Steps
- Create a VM with 1 CPU 1 Memory and runStrategy is
RerunOnFailure
- Power off the node hosting the VM
- Verify the VM
Exepected Result:
- VM should created and started successfully
- Node should be unavailable after shutdown
- VM should restarted automatically
def test_verify_host_info(api_client)
def test_vm_restarted_after_host_reboot(api_client, host_state, wait_timeout, focal_vm, available_node_names)
-
To cover test: - https://harvester.github.io/tests/manual/hosts/vm_migrated_after_host_reboot
Prerequisite
- Cluster's nodes >= 2
Steps
- Create a VM with 1 CPU 1 Memory and runStrategy is
RerunOnFailure
- Reboot the node hosting the VM
- Verify the VM
Exepected Result:
- VM should created
- Node should be unavailable while rebooting
- VM should be restarted
def vm_force_reset_policy(api_client)
Classes
class TestHostState
-
Expand source code
@pytest.mark.hosts @pytest.mark.p1 class TestHostState: @pytest.mark.dependency(name="host_poweroff") def test_poweroff_state(self, api_client, host_state, wait_timeout, available_node_names): """ Test the hosts are the nodes which make the cluster Covers: hosts-01-Negative test-Verify the state for Powered down node """ assert 2 <= len(available_node_names), ( f"The cluster only have {len(available_node_names)} available node." " It's not enough for power off test." ) _, node = api_client.hosts.get(available_node_names[-1]) node_ip = next(val["address"] for val in node['status']['addresses'] if val["type"] == "InternalIP") rc, out, err = host_state.power(node['id'], node_ip, on=False) assert rc == 0, (f"Failed to PowerOff node {node['id']} with error({rc}):\n" f"stdout: {out}\n\nstderr: {err}") sleep(host_state.delay) # Wait for the node to disappear endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, metric = api_client.hosts.get_metrics(node['id']) if 404 == code: break sleep(5) else: raise AssertionError( f"Node {node['id']} still available after PowerOff script executed" f", script path: {host_state.path}" ) _, node = api_client.hosts.get(node['id']) assert node["metadata"]["state"]["name"] in ("in-progress", "unavailable") @pytest.mark.dependency(name="host_poweron", depends=["host_poweroff"]) def test_poweron_state(self, api_client, host_state, wait_timeout, available_node_names): assert 2 <= len(available_node_names), ( f"The cluster only have {len(available_node_names)} available node." " It's not enough for power on test." ) _, node = api_client.hosts.get(available_node_names[-1]) assert node['metadata']['state']['error'], ( f"The node {available_node_names[-1]} was not poweroff.\n" f"Node Status: {node['metadata']['status']}" ) node_ip = next(val["address"] for val in node['status']['addresses'] if val["type"] == "InternalIP") rc, out, err = host_state.power(node['id'], node_ip, on=True) assert rc == 0, (f"Failed to PowerOn node {node['id']} with error({rc}):\n" f"stdout: {out}\n\nstderr: {err}") sleep(host_state.delay) # Wait for the node to appear endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): _, metric = api_client.hosts.get_metrics(node['id']) state = metric.get("metadata", {}).get("state", {}) if state and not state.get("error") and state.get('name') != 'unavailable': break sleep(5) else: raise AssertionError( f"Node {node['id']} still unavailable after PowerOn script executed" f", script path: {host_state.path}" ) _, node = api_client.hosts.get(node['id']) assert "active" == node["metadata"]["state"]["name"]
Class variables
var pytestmark
Methods
def test_poweroff_state(self, api_client, host_state, wait_timeout, available_node_names)
-
Test the hosts are the nodes which make the cluster
Covers
hosts-01-Negative test-Verify the state for Powered down node
def test_poweron_state(self, api_client, host_state, wait_timeout, available_node_names)