Expand source code
@pytest.fixture(scope="session")
def vm_checker(api_client, wait_timeout, sleep_timeout, vm_shell):
from dataclasses import dataclass, field
@dataclass
class ResponseContext:
callee: str
code: int
data: dict
options: dict = field(default_factory=dict, compare=False)
def __iter__(self):
''' handy method be used to unpack'''
return iter([self.code, self.data])
@dataclass
class ShellContext:
command: str
stdout: str
stderr: str
options: dict = field(default_factory=dict, compare=False)
def __iter__(self):
''' handy method be used to unpack'''
return iter([self.stdout, self.stderr])
def default_cb(ctx):
''' identity callback function for adjust checking condition.
:rtype: boolean
:return: True when hit the additional check
'''
return True
class VMChecker:
def __init__(self, vm_api, wait_timeout, snooze=3):
self.vms = vm_api
self.wait_timeout = wait_timeout
self.snooze = snooze
def _endtime(self):
return datetime.now() + timedelta(seconds=self.wait_timeout)
@contextmanager
def configure(self, snooze=None, wait_timeout=None):
''' context manager to temporarily change snooze or wait_timeout '''
s, t = self.snooze, self.wait_timeout
try:
self.snooze, self.wait_timeout = snooze or s, wait_timeout or t
yield self
finally:
self.snooze, self.wait_timeout = s, t
def wait_getable(self, vm_name, endtime=None, callback=default_cb, **kws):
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get', *self.vms.get(vm_name, **kws))
if 200 == ctx.code and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_stopped(self, vm_name, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.stop', *self.vms.stop(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('get_status', *self.vms.get_status(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_status_stopped(self, vm_name, endtime=None, callback=default_cb, **kws):
def cb(ctx):
if ctx.callee == 'vm.stop':
return callback(ctx)
ctx.code, ctx.data = self.vms.get(vm_name, **kws)
ctx.callee = 'vm.get'
return (
200 == ctx.code
and "Stopped" == ctx.data.get('status', {}).get('printableStatus')
and callback(ctx)
)
return self.wait_stopped(vm_name, endtime, cb, **kws)
def wait_status_running(self, vm_name, endtime=None, callback=default_cb, **kws):
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get', *self.vms.get(vm_name, **kws))
status = ctx.data.get('status', {}).get('printableStatus')
if 200 == ctx.code and "Running" == status and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_deleted(self, vm_name, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.delete', *self.vms.delete(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_restarted(self, vm_name, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
options = dict(old_pods=set(ctx.data['status']['activePods'].items()))
ctx = ResponseContext('vm.restart', *self.vms.restart(vm_name, **kws), options)
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws),
ctx.options)
if 404 != ctx.code:
old_pods = ctx.options['old_pods']
cur_pods = ctx.data['status'].get('activePods', {}).items()
if old_pods.difference(cur_pods or old_pods) and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return self.wait_started(vm_name, endtime, callback, **kws)
def wait_started(self, vm_name, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.start', *self.vms.start(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws))
if (
200 == ctx.code
and "Running" == ctx.data.get('status', {}).get('phase')
and callback(ctx)
):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_agent_connected(self, vm_name, endtime=None, callback=default_cb, **kws):
def cb(ctx):
if ctx.callee == 'vm.start':
return callback(ctx)
conds = ctx.data.get('status', {}).get('conditions', [{}])
return (
"AgentConnected" == conds[-1].get('type')
and callback(ctx)
)
return self.wait_started(vm_name, endtime, cb, **kws)
def wait_interfaces(self, vm_name, endtime=None, callback=default_cb, **kws):
def cb(ctx):
if ctx.callee == 'vm.start':
return callback(ctx)
return (
ctx.data.get('status', {}).get('interfaces')
and callback(ctx)
)
return self.wait_agent_connected(vm_name, endtime, cb, **kws)
def wait_ip_addresses(self, vm_name, ifnames, endtime=None, callback=default_cb, **kws):
def cb(ctx):
if ctx.callee == 'vm.start':
return callback(ctx)
ifaces = {d['name']: d for d in ctx.data.get('status', {}).get('interfaces', {})}
return (
all(ifaces.get(name, {}).get('ipAddress') for name in ifnames)
and callback(ctx)
)
ifnames = list(ifnames)
return self.wait_interfaces(vm_name, endtime, cb, **kws)
def wait_cloudinit_done(self, shell, endtime=None, callback=default_cb, **kws):
cmd = 'cloud-init status'
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ShellContext(cmd, *shell.exec_command(cmd))
if 'done' in ctx.stdout and callback(ctx):
break
sleep(self.snooze)
else:
return False, (ctx.stdout, ctx.stderr)
return True, (ctx.stdout, ctx.stderr)
def wait_migrated(self, vm_name, new_host, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.migrate', *self.vms.migrate(vm_name, new_host, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws))
if (
not ctx.data['metadata']['annotations'].get("harvesterhci.io/migrationState")
and new_host == ctx.data['status']['nodeName']
and callback(ctx)
):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_ssh_connected(
self, vm_ip, username, password=None, pkey=None, endtime=None, **kws
):
vm_sh = vm_shell(username, password, pkey)
endtime = endtime or self._endtime()
while endtime > datetime.now():
try:
vm_sh.connect(vm_ip, **kws)
except (ChannelException, NoValidConnectionsError) as e:
login_ex = e
sleep(self.snooze)
else:
break
else:
raise AssertionError(f"Unable to login to VM {vm_ip}") from login_ex
return vm_sh
return VMChecker(api_client.vms, wait_timeout, sleep_timeout)