Module harvester_e2e_tests.fixtures.upgrades

Functions

def upgrade_checker(api_client, wait_timeout, sleep_timeout)
Expand source code
@pytest.fixture(scope="session")
def upgrade_checker(api_client, wait_timeout, sleep_timeout):
    class UpgradeChecker:
        def __init__(self):
            self.versions = api_client.versions
            self.upgrades = api_client.upgrades

        @wait_until(wait_timeout, sleep_timeout)
        def wait_version_created(self, version):
            code, data = self.versions.get(version)
            if code == 200:
                return True, (code, data)
            return False, (code, data)

        @wait_until(wait_timeout, sleep_timeout)
        def wait_upgrade_fail_by_invalid_iso_url(self, upgrade_name):
            code, data = self.upgrades.get(upgrade_name)
            conds = dict((c['type'], c) for c in data.get('status', {}).get('conditions', []))
            verified = [
                "False" == conds.get('Completed', {}).get('status'),
                "False" == conds.get('ImageReady', {}).get('status'),
                any([
                    "no such host" in conds.get('ImageReady', {}).get('message', ""),  # >= v1.4.0
                    "retry limit" in conds.get('ImageReady', {}).get('message', "")    # < v1.4.0
                ])
            ]
            if all(verified):
                return True, (code, data)
            return False, (code, data)

        @wait_until(wait_timeout, sleep_timeout)
        def wait_upgrade_fail_by_invalid_checksum(self, upgrade_name):
            code, data = self.upgrades.get(upgrade_name)
            conds = dict((c['type'], c) for c in data.get('status', {}).get('conditions', []))
            verified = [
                "False" == conds.get('Completed', {}).get('status'),
                "False" == conds.get('ImageReady', {}).get('status'),
                any([
                    "actual check" in conds.get('ImageReady', {}).get('message', ""),  # >= v1.4.0
                    "retry limit" in conds.get('ImageReady', {}).get('message', "")    # < v1.4.0
                ])
            ]
            if all(verified):
                return True, (code, data)
            return False, (code, data)

    return UpgradeChecker()