Module harvester_e2e_tests.integrations.test_4_vm_backup_restore
Functions
def NFS_config(request)
-
Expand source code
@pytest.fixture(scope='module') def NFS_config(request): nfs_endpoint = request.config.getoption('--nfs-endpoint') assert nfs_endpoint, f"NFS endpoint not configured: {nfs_endpoint}" assert nfs_endpoint.startswith("nfs://"), ( f"NFS endpoint should starts with `nfs://`, not {nfs_endpoint}" ) return ("NFS", dict(endpoint=nfs_endpoint))
def S3_config(request)
-
Expand source code
@pytest.fixture(scope='module') def S3_config(request): config = { "bucket": request.config.getoption('--bucketName'), "region": request.config.getoption('--region'), "access_id": request.config.getoption('--accessKeyId'), "access_secret": request.config.getoption('--secretAccessKey') } empty_options = ', '.join(k for k, v in config.items() if not v) assert not empty_options, ( f"S3 configuration missing, `{empty_options}` should not be empty." ) config['endpoint'] = request.config.getoption('--s3-endpoint') return ("S3", config)
def backup_config(request)
-
Expand source code
@pytest.fixture(scope="class") def backup_config(request): return request.getfixturevalue(f"{request.param}_config")
def base_vm(api_client, ssh_keypair, unique_name, vm_checker, image, backup_config)
-
Expand source code
@pytest.fixture(scope="class") def base_vm(api_client, ssh_keypair, unique_name, vm_checker, image, backup_config): unique_vm_name = f"{datetime.now().strftime('%m%S%f')}-{unique_name}" cpu, mem = 1, 2 pub_key, pri_key = ssh_keypair vm_spec = api_client.vms.Spec(cpu, mem) vm_spec.add_image("disk-0", image['id']) userdata = yaml.safe_load(vm_spec.user_data) userdata['ssh_authorized_keys'] = [pub_key] userdata['password'] = 'password' userdata['chpasswd'] = dict(expire=False) userdata['sshpwauth'] = True vm_spec.user_data = yaml.dump(userdata) code, data = api_client.vms.create(unique_vm_name, vm_spec) # Check VM started and get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') yield { "name": unique_vm_name, "host_ip": host_ip, "vm_ip": vm_ip, "ssh_user": image['user'], } # remove created VM code, data = api_client.vms.get(unique_vm_name) vm_spec = api_client.vms.Spec.from_dict(data) vm_deleted, (code, data) = vm_checker.wait_deleted(unique_vm_name) for vol in vm_spec.volumes: vol_name = vol['volume']['persistentVolumeClaim']['claimName'] api_client.volumes.delete(vol_name)
def base_vm_migrated(api_client, vm_checker, backup_config, base_vm)
-
Expand source code
@pytest.fixture(scope="class") def base_vm_migrated(api_client, vm_checker, backup_config, base_vm): unique_vm_name = base_vm['name'] code, host_data = api_client.hosts.get() assert 200 == code, (code, host_data) code, data = api_client.vms.get_status(unique_vm_name) cur_host = data['status'].get('nodeName') assert cur_host, ( f"VMI exists but `nodeName` is empty.\n" f"{data}" ) new_host = next(h['id'] for h in host_data['data'] if cur_host != h['id'] and not h['spec'].get('taint')) vm_migrated, (code, data) = vm_checker.wait_migrated(unique_vm_name, new_host) assert vm_migrated, ( f"Failed to Migrate VM({unique_vm_name}) from {cur_host} to {new_host}\n" f"API Status({code}): {data}" ) # update for new IPs vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') base_vm['vm_ip'] = vm_ip base_vm['host_ip'] = host_ip return (cur_host, new_host)
def base_vm_with_data(api_client,
vm_shell_from_host,
ssh_keypair,
wait_timeout,
vm_checker,
backup_config,
base_vm)-
Expand source code
@pytest.fixture(scope="class") def base_vm_with_data( api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker, backup_config, base_vm ): pub_key, pri_key = ssh_keypair unique_vm_name = base_vm['name'] # Log into VM to make some data with vm_shell_from_host( base_vm['host_ip'], base_vm['vm_ip'], base_vm['ssh_user'], pkey=pri_key ) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited, ( f"VM {unique_vm_name} Started {vm_checker.wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f'echo {unique_vm_name!r} > ~/vmname') assert not err, (out, err) sh.exec_command('sync') yield { "name": unique_vm_name, "host_ip": base_vm['host_ip'], "vm_ip": base_vm['vm_ip'], "ssh_user": base_vm['ssh_user'], "data": dict(path="~/vmname", content=f'{unique_vm_name}') } # remove backups link to the VM and is ready code, data = api_client.backups.get() check_names = [] for backup in data['data']: if (backup['status'].get('readyToUse') and unique_vm_name == backup['spec']['source']['name']): api_client.backups.delete(backup['metadata']['name']) check_names.append(backup['metadata']['name']) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): for name in check_names[:]: code, data = api_client.backups.get(name) if 404 == code: check_names.remove(name) if not check_names: break sleep(3) else: raise AssertionError( f"Failed to delete backups: {check_names}\n" f"Last API Status({code}): {data}" )
def config_backup_target(api_client, conflict_retries, backup_config, wait_timeout)
-
Expand source code
@pytest.fixture(scope="class") def config_backup_target(api_client, conflict_retries, backup_config, wait_timeout): backup_type, config = backup_config code, data = api_client.settings.get('backup-target') origin_spec = api_client.settings.BackupTargetSpec.from_dict(data) spec = getattr(api_client.settings.BackupTargetSpec, backup_type)(**config) # ???: when switching S3 -> NFS, update backup-target will easily hit resource conflict # so we would need retries to apply the change. for _ in range(conflict_retries): code, data = api_client.settings.update('backup-target', spec) if 409 == code and "Conflict" == data['reason']: sleep(3) else: break else: raise AssertionError( f"Unable to update backup-target after {conflict_retries} retried." f"API Status({code}): {data}" ) assert 200 == code, ( f'Failed to update backup target to {backup_type} with {config}\n' f"API Status({code}): {data}" ) yield spec # remove unbound LH backupVolumes code, data = api_client.lhbackupvolumes.get() assert 200 == code, "Failed to list lhbackupvolumes" check_names = [] for volume_data in data["items"]: volume_name = volume_data["metadata"]["name"] backup_name = volume_data["status"]["lastBackupName"] if not backup_name: api_client.lhbackupvolumes.delete(volume_name) check_names.append(volume_name) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): for name in check_names[:]: code, data = api_client.lhbackupvolumes.get(name) if 404 == code: check_names.remove(name) if not check_names: break sleep(3) else: raise AssertionError( f"Failed to delete unbound lhbackupvolumes: {check_names}\n" f"Last API Status({code}): {data}" ) # restore to original backup-target and remove backups not belong to it code, data = api_client.settings.update('backup-target', origin_spec) code, data = api_client.backups.get() assert 200 == code, "Failed to list backups" check_names = [] for backup in data['data']: endpoint = backup['status']['backupTarget'].get('endpoint') if endpoint != origin_spec.value.get('endpoint'): api_client.backups.delete(backup['metadata']['name']) check_names.append(backup['metadata']['name']) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): for name in check_names[:]: code, data = api_client.backups.get(name) if 404 == code: check_names.remove(name) if not check_names: break sleep(3) else: raise AssertionError( f"Failed to delete backups: {check_names}\n" f"Last API Status({code}): {data}" )
def conflict_retries()
-
Expand source code
@pytest.fixture(scope="module") def conflict_retries(): # This might be able to moved to config options in need. return 5
def image(api_client, unique_name, wait_timeout, image_opensuse)
-
Expand source code
@pytest.fixture(scope="module") def image(api_client, unique_name, wait_timeout, image_opensuse): unique_image_id = f'image-{unique_name}' code, data = api_client.images.create_by_url( unique_image_id, image_opensuse.url, display_name=f"{unique_name}-{image_opensuse.name}" ) assert 201 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(unique_image_id) if 100 == data.get('status', {}).get('progress', 0): break sleep(3) else: raise AssertionError( "Failed to create Image with error:\n" f"Status({code}): {data}" ) yield dict(id=f"{data['metadata']['namespace']}/{unique_image_id}", user=image_opensuse.ssh_user) code, data = api_client.images.delete(unique_image_id)
Classes
class TestBackupRestore
-
Expand source code
@pytest.mark.p0 @pytest.mark.backup_target @pytest.mark.parametrize( "backup_config", [ pytest.param("S3", marks=pytest.mark.S3), pytest.param("NFS", marks=pytest.mark.NFS) ], indirect=True) class TestBackupRestore: @pytest.mark.dependency() def test_connection(self, api_client, backup_config, config_backup_target): code, data = api_client.settings.backup_target_test_connection() assert 200 == code, f'Failed to test backup target connection: {data}' @pytest.mark.dependency(depends=["TestBackupRestore::test_connection"], param=True) def tests_backup_vm(self, api_client, wait_timeout, backup_config, base_vm_with_data): unique_vm_name = base_vm_with_data['name'] # Create backup with the name as VM's name code, data = api_client.vms.backup(unique_vm_name, unique_vm_name) assert 204 == code, (code, data) # Check backup is ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, backup = api_client.backups.get(unique_vm_name) if 200 == code and backup.get('status', {}).get('readyToUse'): break sleep(3) else: raise AssertionError( f'Timed-out waiting for the backup \'{unique_vm_name}\' to be ready.' ) @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_update_backup_by_yaml( self, api_client, wait_timeout, backup_config, base_vm_with_data ): backup_name = base_vm_with_data['name'] # Get backup as yaml req_yaml = dict(Accept='application/yaml') resp = api_client.backups.get(backup_name, headers=req_yaml, raw=True) assert 200 == resp.status_code, (resp.status_code, resp.text) # update annotation yaml_header = {'Content-Type': 'application/yaml'} customized_annotations = {'test.harvesterhci.io': 'for-test-update'} data = yaml.safe_load(resp.text) data['metadata'].setdefault('annotations', {}).update(customized_annotations) yaml_data = yaml.safe_dump(data) code, data = api_client.backups.update(backup_name, yaml_data, as_json=False, headers=yaml_header) assert 200 == code, (code, data) # Verify annotation updated code, data = api_client.backups.get(backup_name) all_updated = all( True for key, val in data['metadata']['annotations'].items() if customized_annotations.get(key, "") == val ) assert all_updated, f"Failed to update annotations: {customized_annotations!r}" @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_with_new_vm( self, api_client, vm_shell_from_host, vm_checker, ssh_keypair, wait_timeout, backup_config, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair # mess up the existing data with vm_shell_from_host( base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'], base_vm_with_data['ssh_user'], pkey=pri_key ) as sh: out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}") assert not err, (out, err) sh.exec_command('sync') # Restore VM into new restored_vm_name = f"{backup_config[0].lower()}-restore-{unique_vm_name}" spec = api_client.backups.RestoreSpec.for_new(restored_vm_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, (code, data) vm_getable, (code, data) = vm_checker.wait_getable(restored_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(restored_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({restored_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {restored_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert backup_data['content'] in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" ) # teardown: delete restored vm and volumes code, data = api_client.vms.get(restored_vm_name) vm_spec = api_client.vms.Spec.from_dict(data) api_client.vms.delete(restored_vm_name) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get(restored_vm_name) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to Delete VM({restored_vm_name}) with errors:\n" f"Status({code}): {data}" ) for vol in vm_spec.volumes: vol_name = vol['volume']['persistentVolumeClaim']['claimName'] api_client.volumes.delete(vol_name) @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_replace_with_delete_vols( self, api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker, backup_config, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair # mess up the existing data with vm_shell_from_host( base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'], base_vm_with_data['ssh_user'], pkey=pri_key ) as sh: out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}") assert not err, (out, err) sh.exec_command('sync') # Stop the VM then restore existing vm_stopped, (code, data) = vm_checker.wait_stopped(unique_vm_name) assert vm_stopped, ( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_running, (code, data) = vm_checker.wait_status_running(unique_vm_name) assert vm_running, ( f"Failed to restore VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'] = host_ip, vm_ip # Login to the new VM and check data is existing with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited, ( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert backup_data['content'] in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" ) @pytest.mark.negative @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_replace_vm_not_stop(self, api_client, backup_config, base_vm_with_data): spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(base_vm_with_data['name'], spec) assert 422 == code, (code, data) @pytest.mark.negative @pytest.mark.skip_version_if('< v1.1.2', '< v1.2.1') @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_with_invalid_name(self, api_client, backup_config, base_vm_with_data): # RFC1123 DNS Subdomain name rules: # 1. contain no more than 253 characters # 2. contain only lowercase alphanumeric characters, '-' or '.' # 3. start with an alphanumeric character # 4. end with an alphanumeric character unique_vm_name = base_vm_with_data['name'] # Case 1: longer than 253 chars invalid_name = 'a' * 254 spec = api_client.backups.RestoreSpec.for_new(invalid_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 422 == code, (code, data) # Case 2: having upper case invalid_name = 'the.name.IS.invalid' spec = api_client.backups.RestoreSpec.for_new(invalid_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 422 == code, (code, data) # Case 3: Not start with an alphanumeric character invalid_name = '-the.name.is.invalid' spec = api_client.backups.RestoreSpec.for_new(invalid_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 422 == code, (code, data) # Case 4: Not end with an alphanumeric character invalid_name = 'the.name.is.invalid.' spec = api_client.backups.RestoreSpec.for_new(invalid_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 422 == code, (code, data) @pytest.mark.skip_version_if('< v1.2.2') @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_replace_with_vm_shutdown_command( self, api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker, backup_config, base_vm_with_data ): ''' ref: https://github.com/harvester/tests/issues/943 1. Create VM and write some data 2. Take backup for the VM 3. Mess up existing data 3. Shutdown the VM by executing `shutdown` command in OS 4. Restore backup to replace existing VM 5. VM should be restored successfully 6. Data in VM should be the same as backed up ''' unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair # mess up the existing data then shutdown it with vm_shell_from_host( base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'], base_vm_with_data['ssh_user'], pkey=pri_key ) as sh: out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}") assert not err, (out, err) sh.exec_command('sync') sh.exec_command('sudo shutdown now') endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get(unique_vm_name) if 200 == code and "Stopped" == data.get('status', {}).get('printableStatus'): break sleep(5) else: raise AssertionError( f"Failed to shut down VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) # restore VM to existing spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited, ( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert backup_data['content'] in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" )
Class variables
var pytestmark
Methods
def test_connection(self, api_client, backup_config, config_backup_target)
-
Expand source code
@pytest.mark.dependency() def test_connection(self, api_client, backup_config, config_backup_target): code, data = api_client.settings.backup_target_test_connection() assert 200 == code, f'Failed to test backup target connection: {data}'
def test_restore_replace_vm_not_stop(self, api_client, backup_config, base_vm_with_data)
-
Expand source code
@pytest.mark.negative @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_replace_vm_not_stop(self, api_client, backup_config, base_vm_with_data): spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(base_vm_with_data['name'], spec) assert 422 == code, (code, data)
def test_restore_replace_with_delete_vols(self,
api_client,
vm_shell_from_host,
ssh_keypair,
wait_timeout,
vm_checker,
backup_config,
base_vm_with_data)-
Expand source code
@pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_replace_with_delete_vols( self, api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker, backup_config, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair # mess up the existing data with vm_shell_from_host( base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'], base_vm_with_data['ssh_user'], pkey=pri_key ) as sh: out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}") assert not err, (out, err) sh.exec_command('sync') # Stop the VM then restore existing vm_stopped, (code, data) = vm_checker.wait_stopped(unique_vm_name) assert vm_stopped, ( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_running, (code, data) = vm_checker.wait_status_running(unique_vm_name) assert vm_running, ( f"Failed to restore VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'] = host_ip, vm_ip # Login to the new VM and check data is existing with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited, ( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert backup_data['content'] in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" )
def test_restore_replace_with_vm_shutdown_command(self,
api_client,
vm_shell_from_host,
ssh_keypair,
wait_timeout,
vm_checker,
backup_config,
base_vm_with_data)-
Expand source code
@pytest.mark.skip_version_if('< v1.2.2') @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_replace_with_vm_shutdown_command( self, api_client, vm_shell_from_host, ssh_keypair, wait_timeout, vm_checker, backup_config, base_vm_with_data ): ''' ref: https://github.com/harvester/tests/issues/943 1. Create VM and write some data 2. Take backup for the VM 3. Mess up existing data 3. Shutdown the VM by executing `shutdown` command in OS 4. Restore backup to replace existing VM 5. VM should be restored successfully 6. Data in VM should be the same as backed up ''' unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair # mess up the existing data then shutdown it with vm_shell_from_host( base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'], base_vm_with_data['ssh_user'], pkey=pri_key ) as sh: out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}") assert not err, (out, err) sh.exec_command('sync') sh.exec_command('sudo shutdown now') endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get(unique_vm_name) if 200 == code and "Stopped" == data.get('status', {}).get('printableStatus'): break sleep(5) else: raise AssertionError( f"Failed to shut down VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) # restore VM to existing spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited, ( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert backup_data['content'] in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" )
ref: https://github.com/harvester/tests/issues/943 1. Create VM and write some data 2. Take backup for the VM 3. Mess up existing data 3. Shutdown the VM by executing
shutdown
command in OS 4. Restore backup to replace existing VM 5. VM should be restored successfully 6. Data in VM should be the same as backed up def test_restore_with_invalid_name(self, api_client, backup_config, base_vm_with_data)
-
Expand source code
@pytest.mark.negative @pytest.mark.skip_version_if('< v1.1.2', '< v1.2.1') @pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_with_invalid_name(self, api_client, backup_config, base_vm_with_data): # RFC1123 DNS Subdomain name rules: # 1. contain no more than 253 characters # 2. contain only lowercase alphanumeric characters, '-' or '.' # 3. start with an alphanumeric character # 4. end with an alphanumeric character unique_vm_name = base_vm_with_data['name'] # Case 1: longer than 253 chars invalid_name = 'a' * 254 spec = api_client.backups.RestoreSpec.for_new(invalid_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 422 == code, (code, data) # Case 2: having upper case invalid_name = 'the.name.IS.invalid' spec = api_client.backups.RestoreSpec.for_new(invalid_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 422 == code, (code, data) # Case 3: Not start with an alphanumeric character invalid_name = '-the.name.is.invalid' spec = api_client.backups.RestoreSpec.for_new(invalid_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 422 == code, (code, data) # Case 4: Not end with an alphanumeric character invalid_name = 'the.name.is.invalid.' spec = api_client.backups.RestoreSpec.for_new(invalid_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 422 == code, (code, data)
def test_restore_with_new_vm(self,
api_client,
vm_shell_from_host,
vm_checker,
ssh_keypair,
wait_timeout,
backup_config,
base_vm_with_data)-
Expand source code
@pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_restore_with_new_vm( self, api_client, vm_shell_from_host, vm_checker, ssh_keypair, wait_timeout, backup_config, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair # mess up the existing data with vm_shell_from_host( base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'], base_vm_with_data['ssh_user'], pkey=pri_key ) as sh: out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}") assert not err, (out, err) sh.exec_command('sync') # Restore VM into new restored_vm_name = f"{backup_config[0].lower()}-restore-{unique_vm_name}" spec = api_client.backups.RestoreSpec.for_new(restored_vm_name) code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, (code, data) vm_getable, (code, data) = vm_checker.wait_getable(restored_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(restored_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({restored_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {restored_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert backup_data['content'] in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" ) # teardown: delete restored vm and volumes code, data = api_client.vms.get(restored_vm_name) vm_spec = api_client.vms.Spec.from_dict(data) api_client.vms.delete(restored_vm_name) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get(restored_vm_name) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to Delete VM({restored_vm_name}) with errors:\n" f"Status({code}): {data}" ) for vol in vm_spec.volumes: vol_name = vol['volume']['persistentVolumeClaim']['claimName'] api_client.volumes.delete(vol_name)
def test_update_backup_by_yaml(self, api_client, wait_timeout, backup_config, base_vm_with_data)
-
Expand source code
@pytest.mark.dependency(depends=["TestBackupRestore::tests_backup_vm"], param=True) def test_update_backup_by_yaml( self, api_client, wait_timeout, backup_config, base_vm_with_data ): backup_name = base_vm_with_data['name'] # Get backup as yaml req_yaml = dict(Accept='application/yaml') resp = api_client.backups.get(backup_name, headers=req_yaml, raw=True) assert 200 == resp.status_code, (resp.status_code, resp.text) # update annotation yaml_header = {'Content-Type': 'application/yaml'} customized_annotations = {'test.harvesterhci.io': 'for-test-update'} data = yaml.safe_load(resp.text) data['metadata'].setdefault('annotations', {}).update(customized_annotations) yaml_data = yaml.safe_dump(data) code, data = api_client.backups.update(backup_name, yaml_data, as_json=False, headers=yaml_header) assert 200 == code, (code, data) # Verify annotation updated code, data = api_client.backups.get(backup_name) all_updated = all( True for key, val in data['metadata']['annotations'].items() if customized_annotations.get(key, "") == val ) assert all_updated, f"Failed to update annotations: {customized_annotations!r}"
def tests_backup_vm(self, api_client, wait_timeout, backup_config, base_vm_with_data)
-
Expand source code
@pytest.mark.dependency(depends=["TestBackupRestore::test_connection"], param=True) def tests_backup_vm(self, api_client, wait_timeout, backup_config, base_vm_with_data): unique_vm_name = base_vm_with_data['name'] # Create backup with the name as VM's name code, data = api_client.vms.backup(unique_vm_name, unique_vm_name) assert 204 == code, (code, data) # Check backup is ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, backup = api_client.backups.get(unique_vm_name) if 200 == code and backup.get('status', {}).get('readyToUse'): break sleep(3) else: raise AssertionError( f'Timed-out waiting for the backup \'{unique_vm_name}\' to be ready.' )
class TestBackupRestoreOnMigration
-
Expand source code
@pytest.mark.skip("https://github.com/harvester/harvester/issues/1473") @pytest.mark.p0 @pytest.mark.backup_target @pytest.mark.parametrize( "backup_config", [ pytest.param("S3", marks=pytest.mark.S3), pytest.param("NFS", marks=pytest.mark.NFS) ], indirect=True ) class TestBackupRestoreOnMigration: @pytest.mark.dependency(param=True) def test_backup_migrated_vm( self, api_client, wait_timeout, backup_config, config_backup_target, base_vm_migrated, base_vm_with_data ): unique_vm_name = base_vm_with_data['name'] # Create backup with the name as VM's name code, data = api_client.vms.backup(unique_vm_name, unique_vm_name) assert 204 == code, (code, data) # Check backup is ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, backup = api_client.backups.get(unique_vm_name) if 200 == code and backup.get('status', {}).get('readyToUse'): break sleep(3) else: raise AssertionError( f'Timed-out waiting for the backup \'{unique_vm_name}\' to be ready.' ) @pytest.mark.dependency( depends=["TestBackupRestoreOnMigration::test_backup_migrated_vm"], param=True ) def test_restore_replace_migrated_vm( self, api_client, wait_timeout, ssh_keypair, vm_shell_from_host, vm_checker, backup_config, base_vm_migrated, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair # mess up the existing data with vm_shell_from_host( base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'], base_vm_with_data['ssh_user'], pkey=pri_key ) as sh: out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}") assert not err, (out, err) sh.exec_command('sync') # Stop the VM then restore existing vm_stopped, (code, data) = vm_checker.wait_stopped(unique_vm_name) assert vm_stopped, ( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing() code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) # Check VM is not hosting on the migrated node host = data['status']['nodeName'] original_host, migrated_host = base_vm_migrated assert host == migrated_host, ( f"Restored VM is not hosted on {migrated_host} but {host}," f" the VM was initialized hosted on {original_host}" ) # Get IP of VM and host vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited, ( f"VM {unique_vm_name} Started {vm_checker.wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert backup_data['content'] in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" )
Class variables
var pytestmark
Methods
def test_backup_migrated_vm(self,
api_client,
wait_timeout,
backup_config,
config_backup_target,
base_vm_migrated,
base_vm_with_data)-
Expand source code
@pytest.mark.dependency(param=True) def test_backup_migrated_vm( self, api_client, wait_timeout, backup_config, config_backup_target, base_vm_migrated, base_vm_with_data ): unique_vm_name = base_vm_with_data['name'] # Create backup with the name as VM's name code, data = api_client.vms.backup(unique_vm_name, unique_vm_name) assert 204 == code, (code, data) # Check backup is ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, backup = api_client.backups.get(unique_vm_name) if 200 == code and backup.get('status', {}).get('readyToUse'): break sleep(3) else: raise AssertionError( f'Timed-out waiting for the backup \'{unique_vm_name}\' to be ready.' )
def test_restore_replace_migrated_vm(self,
api_client,
wait_timeout,
ssh_keypair,
vm_shell_from_host,
vm_checker,
backup_config,
base_vm_migrated,
base_vm_with_data)-
Expand source code
@pytest.mark.dependency( depends=["TestBackupRestoreOnMigration::test_backup_migrated_vm"], param=True ) def test_restore_replace_migrated_vm( self, api_client, wait_timeout, ssh_keypair, vm_shell_from_host, vm_checker, backup_config, base_vm_migrated, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair # mess up the existing data with vm_shell_from_host( base_vm_with_data['host_ip'], base_vm_with_data['vm_ip'], base_vm_with_data['ssh_user'], pkey=pri_key ) as sh: out, err = sh.exec_command(f"echo {pub_key!r} > {base_vm_with_data['data']['path']}") assert not err, (out, err) sh.exec_command('sync') # Stop the VM then restore existing vm_stopped, (code, data) = vm_checker.wait_stopped(unique_vm_name) assert vm_stopped, ( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing() code, data = api_client.backups.restore(unique_vm_name, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) # Check VM is not hosting on the migrated node host = data['status']['nodeName'] original_host, migrated_host = base_vm_migrated assert host == migrated_host, ( f"Restored VM is not hosted on {migrated_host} but {host}," f" the VM was initialized hosted on {original_host}" ) # Get IP of VM and host vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with vm_shell_from_host(host_ip, vm_ip, base_vm_with_data['ssh_user'], pkey=pri_key) as sh: cloud_inited, (out, err) = vm_checker.wait_cloudinit_done(sh) assert cloud_inited, ( f"VM {unique_vm_name} Started {vm_checker.wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert backup_data['content'] in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" )
class TestMultipleBackupRestore
-
Expand source code
@pytest.mark.p1 @pytest.mark.backup_target @pytest.mark.parametrize( "backup_config", [ pytest.param("S3", marks=pytest.mark.S3), pytest.param("NFS", marks=pytest.mark.NFS) ], indirect=True) class TestMultipleBackupRestore: @pytest.mark.dependency() def test_backup_multiple( self, api_client, wait_timeout, host_shell, vm_shell, vm_checker, ssh_keypair, backup_config, config_backup_target, base_vm_with_data ): def write_data(content): pub_key, pri_key = ssh_keypair # Log into VM to make some data with host_shell.login(host_ip, jumphost=True) as h: vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: vm_sh.connect(vm_ip, jumphost=h.client) except ChannelException as e: login_ex = e sleep(3) else: break else: raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex with vm_sh as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f'echo {content!r} >> ~/vmname') assert not err, (out, err) sh.exec_command('sync') def create_backup(vm_name, backup_name): code, data = api_client.vms.backup(vm_name, backup_name) assert 204 == code, (code, data) # Check backup is ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, backup = api_client.backups.get(backup_name) if 200 == code and backup.get('status', {}).get('readyToUse'): break sleep(3) else: raise AssertionError( f'Timed-out waiting for the backup \'{backup_name}\' to be ready.' ) unique_vm_name = base_vm_with_data['name'] # Check VM started and get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') content = "" # Create multiple backups for idx in range(0, 5): backup_name = f"{idx}-{unique_vm_name}" write_data(backup_name) create_backup(unique_vm_name, backup_name) content += f"{backup_name}\n" base_vm_with_data['data'].setdefault('backups', []).append((backup_name, content)) @pytest.mark.dependency( depends=["TestMultipleBackupRestore::test_backup_multiple"], param=True ) def test_delete_first_backup( self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout, backup_config, config_backup_target, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair backups = backup_data['backups'] (first_backup, content), *backup_data['backups'] = backups latest_backup = backups[-1][0] # Delete first backup code, data = api_client.backups.delete(first_backup) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.backups.get(first_backup) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to delete backup {first_backup}\n" f"API Status({code}): {data}" ) # Stop the VM code, data = api_client.vms.stop(unique_vm_name) assert 204 == code, "`Stop` return unexpected status code" endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get_status(unique_vm_name) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(latest_backup, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with host_shell.login(host_ip, jumphost=True) as h: vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: vm_sh.connect(vm_ip, jumphost=h.client) except ChannelException as e: login_ex = e sleep(3) else: break else: raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex with vm_sh as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert content in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" ) @pytest.mark.dependency( depends=["TestMultipleBackupRestore::test_backup_multiple"], param=True ) def test_delete_last_backup( self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout, backup_config, config_backup_target, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair *backups, (latest_backup, content), (last_backup, _) = backup_data['backups'] backup_data['backups'] = backup_data['backups'][:-1] # Delete first backup code, data = api_client.backups.delete(last_backup) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.backups.get(last_backup) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to delete backup {last_backup}\n" f"API Status({code}): {data}" ) # Stop the VM code, data = api_client.vms.stop(unique_vm_name) assert 204 == code, "`Stop` return unexpected status code" endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get_status(unique_vm_name) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(latest_backup, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with host_shell.login(host_ip, jumphost=True) as h: vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: vm_sh.connect(vm_ip, jumphost=h.client) except ChannelException as e: login_ex = e sleep(3) else: break else: raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex with vm_sh as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert content in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" ) @pytest.mark.dependency( depends=["TestMultipleBackupRestore::test_backup_multiple"], param=True ) def test_delete_middle_backup( self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout, backup_config, config_backup_target, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair *backups, (middle_backup, _), (latest_backup, content) = backup_data['backups'] backup_data['backups'] = backups + [(latest_backup, content)] # Delete second last backup code, data = api_client.backups.delete(middle_backup) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.backups.get(middle_backup) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to delete backup {middle_backup}\n" f"API Status({code}): {data}" ) # Stop the VM code, data = api_client.vms.stop(unique_vm_name) assert 204 == code, "`Stop` return unexpected status code" endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get_status(unique_vm_name) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(latest_backup, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with host_shell.login(host_ip, jumphost=True) as h: vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: vm_sh.connect(vm_ip, jumphost=h.client) except ChannelException as e: login_ex = e sleep(3) else: break else: raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex with vm_sh as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert content in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" )
Class variables
var pytestmark
Methods
def test_backup_multiple(self,
api_client,
wait_timeout,
host_shell,
vm_shell,
vm_checker,
ssh_keypair,
backup_config,
config_backup_target,
base_vm_with_data)-
Expand source code
@pytest.mark.dependency() def test_backup_multiple( self, api_client, wait_timeout, host_shell, vm_shell, vm_checker, ssh_keypair, backup_config, config_backup_target, base_vm_with_data ): def write_data(content): pub_key, pri_key = ssh_keypair # Log into VM to make some data with host_shell.login(host_ip, jumphost=True) as h: vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: vm_sh.connect(vm_ip, jumphost=h.client) except ChannelException as e: login_ex = e sleep(3) else: break else: raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex with vm_sh as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f'echo {content!r} >> ~/vmname') assert not err, (out, err) sh.exec_command('sync') def create_backup(vm_name, backup_name): code, data = api_client.vms.backup(vm_name, backup_name) assert 204 == code, (code, data) # Check backup is ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, backup = api_client.backups.get(backup_name) if 200 == code and backup.get('status', {}).get('readyToUse'): break sleep(3) else: raise AssertionError( f'Timed-out waiting for the backup \'{backup_name}\' to be ready.' ) unique_vm_name = base_vm_with_data['name'] # Check VM started and get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') content = "" # Create multiple backups for idx in range(0, 5): backup_name = f"{idx}-{unique_vm_name}" write_data(backup_name) create_backup(unique_vm_name, backup_name) content += f"{backup_name}\n" base_vm_with_data['data'].setdefault('backups', []).append((backup_name, content))
def test_delete_first_backup(self,
api_client,
host_shell,
vm_shell,
vm_checker,
ssh_keypair,
wait_timeout,
backup_config,
config_backup_target,
base_vm_with_data)-
Expand source code
@pytest.mark.dependency( depends=["TestMultipleBackupRestore::test_backup_multiple"], param=True ) def test_delete_first_backup( self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout, backup_config, config_backup_target, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair backups = backup_data['backups'] (first_backup, content), *backup_data['backups'] = backups latest_backup = backups[-1][0] # Delete first backup code, data = api_client.backups.delete(first_backup) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.backups.get(first_backup) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to delete backup {first_backup}\n" f"API Status({code}): {data}" ) # Stop the VM code, data = api_client.vms.stop(unique_vm_name) assert 204 == code, "`Stop` return unexpected status code" endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get_status(unique_vm_name) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(latest_backup, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with host_shell.login(host_ip, jumphost=True) as h: vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: vm_sh.connect(vm_ip, jumphost=h.client) except ChannelException as e: login_ex = e sleep(3) else: break else: raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex with vm_sh as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert content in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" )
def test_delete_last_backup(self,
api_client,
host_shell,
vm_shell,
vm_checker,
ssh_keypair,
wait_timeout,
backup_config,
config_backup_target,
base_vm_with_data)-
Expand source code
@pytest.mark.dependency( depends=["TestMultipleBackupRestore::test_backup_multiple"], param=True ) def test_delete_last_backup( self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout, backup_config, config_backup_target, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair *backups, (latest_backup, content), (last_backup, _) = backup_data['backups'] backup_data['backups'] = backup_data['backups'][:-1] # Delete first backup code, data = api_client.backups.delete(last_backup) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.backups.get(last_backup) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to delete backup {last_backup}\n" f"API Status({code}): {data}" ) # Stop the VM code, data = api_client.vms.stop(unique_vm_name) assert 204 == code, "`Stop` return unexpected status code" endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get_status(unique_vm_name) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(latest_backup, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with host_shell.login(host_ip, jumphost=True) as h: vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: vm_sh.connect(vm_ip, jumphost=h.client) except ChannelException as e: login_ex = e sleep(3) else: break else: raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex with vm_sh as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert content in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" )
def test_delete_middle_backup(self,
api_client,
host_shell,
vm_shell,
vm_checker,
ssh_keypair,
wait_timeout,
backup_config,
config_backup_target,
base_vm_with_data)-
Expand source code
@pytest.mark.dependency( depends=["TestMultipleBackupRestore::test_backup_multiple"], param=True ) def test_delete_middle_backup( self, api_client, host_shell, vm_shell, vm_checker, ssh_keypair, wait_timeout, backup_config, config_backup_target, base_vm_with_data ): unique_vm_name, backup_data = base_vm_with_data['name'], base_vm_with_data['data'] pub_key, pri_key = ssh_keypair *backups, (middle_backup, _), (latest_backup, content) = backup_data['backups'] backup_data['backups'] = backups + [(latest_backup, content)] # Delete second last backup code, data = api_client.backups.delete(middle_backup) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.backups.get(middle_backup) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to delete backup {middle_backup}\n" f"API Status({code}): {data}" ) # Stop the VM code, data = api_client.vms.stop(unique_vm_name) assert 204 == code, "`Stop` return unexpected status code" endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.vms.get_status(unique_vm_name) if 404 == code: break sleep(3) else: raise AssertionError( f"Failed to Stop VM({unique_vm_name}) with errors:\n" f"Status({code}): {data}" ) spec = api_client.backups.RestoreSpec.for_existing(delete_volumes=True) code, data = api_client.backups.restore(latest_backup, spec) assert 201 == code, f'Failed to restore backup with current VM replaced, {data}' vm_getable, (code, data) = vm_checker.wait_getable(unique_vm_name) assert vm_getable, (code, data) # Check VM Started then get IPs (vm and host) vm_got_ips, (code, data) = vm_checker.wait_ip_addresses(unique_vm_name, ['default']) assert vm_got_ips, ( f"Failed to Start VM({unique_vm_name}) with errors:\n" f"Status: {data.get('status')}\n" f"API Status({code}): {data}" ) vm_ip = next(iface['ipAddress'] for iface in data['status']['interfaces'] if iface['name'] == 'default') code, data = api_client.hosts.get(data['status']['nodeName']) host_ip = next(addr['address'] for addr in data['status']['addresses'] if addr['type'] == 'InternalIP') # Login to the new VM and check data is existing with host_shell.login(host_ip, jumphost=True) as h: vm_sh = vm_shell(base_vm_with_data['ssh_user'], pkey=pri_key) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): try: vm_sh.connect(vm_ip, jumphost=h.client) except ChannelException as e: login_ex = e sleep(3) else: break else: raise AssertionError(f"Unable to login to VM {unique_vm_name}") from login_ex with vm_sh as sh: endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): out, err = sh.exec_command('cloud-init status') if 'done' in out: break sleep(3) else: raise AssertionError( f"VM {unique_vm_name} Started {wait_timeout} seconds" f", but cloud-init still in {out}" ) out, err = sh.exec_command(f"cat {backup_data['path']}") assert content in out, ( f"cloud-init writefile failed\n" f"Executed stdout: {out}\n" f"Executed stderr: {err}" )