Module harvester_e2e_tests.fixtures.api_client
Functions
def api_client(request, harvester_metadata)
-
Expand source code
@pytest.fixture(scope="session") def api_client(request, harvester_metadata): endpoint = request.config.getoption("--endpoint") username = request.config.getoption("--username") password = request.config.getoption("--password") ssl_verify = request.config.getoption("--ssl_verify", False) api = HarvesterAPI(endpoint) api.authenticate(username, password, verify=ssl_verify) api.session.verify = ssl_verify harvester_metadata['Cluster Endpoint'] = endpoint harvester_metadata['Cluster Version'] = api.cluster_version.raw return api
def expected_settings()
-
Expand source code
@pytest.fixture(scope="session") def expected_settings(): return { "1.1.0": {'storage-network', 'containerd-registry', 'ui-plugin-index'}, "default": { 'additional-ca', 'auto-disk-provision-paths', 'backup-target', 'cluster-registration-url', 'http-proxy', 'log-level', 'overcommit-config', 'release-download-url', 'server-version', 'ssl-certificates', 'ssl-parameters', 'support-bundle-image', 'support-bundle-namespaces', 'support-bundle-timeout', 'ui-index', 'ui-source', 'upgrade-checker-enabled', 'upgrade-checker-url', 'vip-pools', 'vm-force-reset-policy', } }
def fake_image_file()
-
Expand source code
@pytest.fixture(scope="session") def fake_image_file(): with NamedTemporaryFile("wb") as f: f.seek(10 * 1024 ** 2 - 1) # 10MB f.write(b"\0") f.seek(0) yield Path(f.name)
def gen_unique_name()
-
Expand source code
@pytest.fixture(scope='module') def gen_unique_name(): """Generate unique name on-demand""" return lambda: datetime.now().strftime("%Hh%Mm%Ss%f-%m-%d")
Generate unique name on-demand
def harvester_metadata(pytestconfig)
-
Expand source code
@pytest.fixture(scope="session") def harvester_metadata(pytestconfig): ''' be used to store harvester's metadata and expose into html report. ''' # ref: https://github.com/pytest-dev/pytest-html/blob/4.1.1/src/pytest_html/basereport.py#L71 try: from pytest_metadata.plugin import metadata_key metadata = pytestconfig.stash[metadata_key] except ImportError: metadata = pytestconfig._metadata harv = dict() metadata["Harvester"] = harv return harv
be used to store harvester's metadata and expose into html report.
def host_shell(request)
-
Expand source code
@pytest.fixture(scope="session") def host_shell(request): password = request.config.getoption("--host-password") or None pkey = request.config.getoption('--host-private-key') or None if pkey: pkey = RSAKey.from_private_key(StringIO(pkey)) class HostShell: _client = _jump = None def __init__(self, username, password=None, pkey=None): self.username = username self.password = password self.pkey = pkey def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.logout() @property def client(self): return self._client def reconnect(self, ipaddr, port=22, **kwargs): if self.client: self.client.close() self._client = cli = SSHClient() cli.set_missing_host_key_policy(MissingHostKeyPolicy()) kws = dict(username=self.username, password=self.password, pkey=self.pkey) kws.update(kwargs) cli.connect(ipaddr, port, **kws) def login(self, ipaddr, port=22, jumphost=False, allow_agent=False, look_for_keys=False, **kwargs): if not self.client: cli = SSHClient() cli.set_missing_host_key_policy(MissingHostKeyPolicy()) kws = dict(username=self.username, password=self.password, pkey=self.pkey) kws.update(kwargs) # in case we're using a password to log into the host, this # prevents paramiko from getting confused by ssh keys in the ssh # agent: if self.password and not self.pkey: kws.update(dict(allow_agent=allow_agent, look_for_keys=look_for_keys)) cli.connect(ipaddr, port, **kws) self._client = cli if jumphost: self.jumphost_policy() self._jump = True self.reconnect(ipaddr, port, **kws) return self def logout(self): if self.client and self.client.get_transport(): if self._jump: self.jumphost_policy(False) self._jump = None self.client.close() self._client = None def exec_command(self, command, bufsize=-1, timeout=None, get_pty=False, env=None, splitlines=False): _, out, err = self.client.exec_command(command, bufsize, timeout, get_pty, env) out, err = out.read().decode(), err.read().decode() if splitlines: out = out.splitlines() return out, err def jumphost_policy(self, allow=True): ctx, err = self.exec_command("sudo cat /etc/ssh/sshd_config") if allow: renew = re.sub(r'\n(Allow(?:Tcp|Agent)Forwarding no)', lambda m: f"\n#{m.group(1)}", ctx, re.I | re.M) else: renew = re.sub(r'#(Allow(?:Tcp|Agent)Forwarding no)', lambda m: m.group(1), ctx, re.I | re.M) self.exec_command(f'sudo cat<<"EOF">_config\n{renew}EOF') self.exec_command('sudo mv _config /etc/ssh/sshd_config' ' && sudo systemctl restart sshd') return HostShell('rancher', password, pkey)
def host_state(request)
-
Expand source code
@pytest.fixture(scope="session") def host_state(request): class HostState: files = ("power_off.sh", "power_on.sh", "reboot.sh") # [False, True, -1] def __init__(self, script_path, delay=120): self.path = Path(script_path) self.delay = delay def __repr__(self): return f"HostState({self.path}, {self.delay})" def power(self, name, ip, on=True): proc = run([self.path / self.files[on], name, ip], stdout=PIPE, stderr=PIPE) return proc.returncode, proc.stdout, proc.stderr def reboot(self, name, ip): return self.power(name, ip, -1) return HostState(request.config.getoption("--node-scripts-location"))
def opensuse_checksum(request)
-
Expand source code
@pytest.fixture(scope="session") def opensuse_checksum(request): """Returns openSUSE checksum from config""" return request.config.getoption("--opensuse-checksum")
Returns openSUSE checksum from config
def polling_for(wait_timeout, sleep_timeout)
-
Expand source code
@pytest.fixture(scope="session") def polling_for(wait_timeout, sleep_timeout): # TODO: Try to redesign refer to multiprocessing package (e.g. apply_async and map_async) def _polling_for(subject: str, checker: Callable[..., bool], poller: Callable, *args, timeout=wait_timeout): """ Polling expected confition for `timeout`s every `sleep_timeout`s Arguments: subject: str, what is waiting for checker: Callable, check `poller` output and returns bool args: list, [*poller_args, testee] poller: Callable, poller(*poller_args, testee) for each testee Returns: Any: `poller` output if qualified by `checker` Raises: AssertionError: if still NOT qualified within `timeout`s """ *poller_args, testee = args testees = testee if isinstance(testee, list) else [testee] checker_args_len = len(getfullargspec(checker).args) endtime = datetime.now() + timedelta(seconds=timeout) while endtime > datetime.now(): for testee in testees[:]: output = poller(*poller_args, testee) # unpack poller output according to checker signature qualified = checker(*output) if checker_args_len > 1 else checker(output) if qualified: testees.remove(testee) if not testees: return output sleep(sleep_timeout) else: raise AssertionError( f'Timeout {timeout}s waiting for {subject}\n' f'Got error: {output}' ) return _polling_for
def rancher_wait_timeout(request)
-
Expand source code
@pytest.fixture(scope="session") def rancher_wait_timeout(request): return request.config.getoption("--rancher-cluster-wait-timeout", 1800)
def skip_version_after(request, api_client)
-
Expand source code
@pytest.fixture(autouse=True) def skip_version_after(request, api_client): mark = request.node.get_closest_marker("skip_version_after") if mark: cluster_ver = api_client.cluster_version for target_ver in mark.args: if not hasattr(cluster_ver, 'major') or parse_version(target_ver) <= cluster_ver: return pytest.skip( f"Cluster Version `{api_client.cluster_version}` is not included" f" in the supported version (most < `{target_ver}`)" )
def skip_version_before(request, api_client)
-
Expand source code
@pytest.fixture(autouse=True) def skip_version_before(request, api_client): mark = request.node.get_closest_marker("skip_version_before") if mark: cluster_ver = api_client.cluster_version for target_ver in mark.args: if '-head' not in cluster_ver.public and parse_version(target_ver) > cluster_ver: return pytest.skip( f"Cluster Version `{api_client.cluster_version}` is not included" f" in the supported version (most >= `{target_ver}`)" )
def skip_version_if(request, api_client)
-
Expand source code
@pytest.fixture(autouse=True) def skip_version_if(request, api_client): ''' To mark test case should be skip when hit the condition string. Args: *args: Version string prefixing with one of operators: `!=`, `==`, `>=`, `<=`, `>`, `<` Keyword Args: reason: The reason string for `pytest.skip`, default is: "Cluster Version `{cluster_version}` is not included in versions: {versions}" condition: Condition callable function to check compare result(bool), default is `all` ''' default_reason = ( "Cluster Version `{cluster_version}` is not included in versions: {versions}" ) mark = request.node.get_closest_marker("skip_version_if") if mark: cluster_ver = api_client.cluster_version checks = [version_check(vstr, cluster_ver) for vstr in mark.args] reason = mark.kwargs.get('reason', default_reason) if mark.kwargs.get('condition', all)(r for *_, r in checks): versions = [f"{op} {v}" for op, v, _ in checks] return pytest.skip( reason.format(cluster_version=cluster_ver, versions=versions) )
To mark test case should be skip when hit the condition string.
Args
*args
- Version string prefixing with one of operators:
!=
,==
,>=
,<=
,>
,<
Keyword Args: reason: The reason string for
pytest.skip
, default is: "Cluster Version{cluster_version}
is not included in versions: {versions}" condition: Condition callable function to check compare result(bool), default isall
def sleep_timeout(request)
-
Expand source code
@pytest.fixture(scope="session") def sleep_timeout(request): return request.config.getoption("--sleep-timeout", 4)
def ssh_keypair()
-
Expand source code
@pytest.fixture(scope="module") def ssh_keypair(): private_key = asymmetric.rsa.generate_private_key( public_exponent=65537, key_size=1024, backend=backends.default_backend() ) private_key_pem = private_key.private_bytes( serialization.Encoding.PEM, serialization.PrivateFormat.OpenSSH, serialization.NoEncryption() ) public_key = private_key.public_key() public_key_ssh = public_key.public_bytes( serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH ) return public_key_ssh.decode('utf-8'), private_key_pem.decode('utf-8')
def support_bundle_state()
-
Expand source code
@pytest.fixture(scope="session") def support_bundle_state(): class SupportBundle: def __init__(self, fio): self.uid = "" self.files = list() # for checking file name self.fio = fio # for checking file content with NamedTemporaryFile() as f: yield SupportBundle(f)
def ubuntu_checksum(request)
-
Expand source code
@pytest.fixture(scope="session") def ubuntu_checksum(request): """Returns Ubuntu checksum from config""" return request.config.getoption("--ubuntu-checksum")
Returns Ubuntu checksum from config
def unique_name()
-
Expand source code
@pytest.fixture(scope='module') def unique_name(): """Default unique name""" return datetime.now().strftime("%Hh%Mm%Ss%f-%m-%d")
Default unique name
def upgrade_timeout(request)
-
Expand source code
@pytest.fixture(scope="session") def upgrade_timeout(request): return request.config.getoption('--upgrade-wait-timeout') or 7200
def version_check(vstring, version)
-
Expand source code
def version_check(vstring, version): from operator import le, lt, ge, gt, ne, eq ops = {"<=": le, "<": lt, ">=": ge, ">": gt, "!=": ne, "==": eq} try: op = target_ver = None op, target_ver = re.search(r"([<>=!]+)\s?(.+)", vstring).groups() return op, target_ver, ops[op](version, parse_version(target_ver)) except Exception: return op, target_ver, False
def wait_timeout(request)
-
Expand source code
@pytest.fixture(scope="session") def wait_timeout(request): return request.config.getoption("--wait-timeout", 300)