Module harvester_e2e_tests.fixtures.settings

Functions

def setting_checker(api_client, wait_timeout, sleep_timeout)
Expand source code
@pytest.fixture(scope="session")
def setting_checker(api_client, wait_timeout, sleep_timeout):
    class SettingChecker:
        def __init__(self):
            self.settings = api_client.settings
            self.nets_annotation = 'k8s.v1.cni.cncf.io/networks'
            self.net_status_annotation = 'k8s.v1.cni.cncf.io/network-status'

        def _storage_net_configured(self):
            code, data = self.settings.get('storage-network')

            if (cs := data.get('status', {}).get('conditions')):
                if 'True' == cs[-1].get('status') and 'Completed' == cs[-1].get('reason'):
                    return True, (code, data)
            return False, (code, data)

        @wait_until(wait_timeout, sleep_timeout)
        def wait_storage_net_enabled_on_harvester(self):
            snet_configured, (code, data) = self._storage_net_configured()
            if snet_configured and data.get('value'):
                return True, (code, data)
            return False, (code, data)

        @wait_until(wait_timeout, sleep_timeout)
        def wait_storage_net_disabled_on_harvester(self):
            snet_configured, (code, data) = self._storage_net_configured()
            if snet_configured and not data.get('value'):
                return True, (code, data)
            return False, (code, data)

        def _lh_instance_mgrs_running(self):
            code, data = api_client.get_pods(namespace='longhorn-system')
            if not (code == 200):
                return False, (code, data)

            lh_instance_mgrs = [pod for pod in data['data'] if 'instance-manager' in pod['id']]
            if not lh_instance_mgrs:
                return False, ("No instance-manager pods", data)

            for imgr in lh_instance_mgrs:
                if 'Running' != imgr['status']['phase']:
                    return False, (f"Pod {imgr['id']} is not Running", imgr)

            return True, (None, lh_instance_mgrs)

        @wait_until(wait_timeout, sleep_timeout)
        def wait_storage_net_enabled_on_longhorn(self, snet_cidr):
            imgrs_running, (code, data) = self._lh_instance_mgrs_running()
            if not imgrs_running:
                return False, (code, data)

            for imgr in data:
                annotations = imgr['metadata']['annotations']

                for na in [self.nets_annotation, self.net_status_annotation]:
                    if na not in annotations:
                        return False, (f"Pod has no annotation {na}", imgr)

                # Check k8s.v1.cni.cncf.io/networks
                try:
                    nets = json.loads(annotations[self.nets_annotation])
                    snet = next(n for n in nets if 'lhnet1' == n.get('interface'))
                except StopIteration:
                    msg = f"Annotation {self.nets_annotation} has no interface 'lhnet1'"
                    return False, (msg, imgr)

                # Check k8s.v1.cni.cncf.io/network-status
                try:
                    net_statuses = json.loads(annotations[self.net_status_annotation])
                    snet_status = next(s for s in net_statuses if 'lhnet1' == s.get('interface'))
                except StopIteration:
                    msg = f"Annotation {self.net_status_annotation} has no interface 'lhnet1'"
                    return False, (msg, imgr)

                snet_ips = snet_status.get('ips', ['::1'])
                if not all(ip_address(sip) in ip_network(snet_cidr) for sip in snet_ips):
                    return False, (f"Dedicated IPs {snet_ips} does NOT fits {snet_cidr}", imgr)

                # Check network name identical in both annotations
                if f"{snet.get('namespace')}/{snet.get('name')}" != snet_status.get('name'):
                    msg = "Network name is not identical between annotations {} and {}".format(
                        self.nets_annotation, self.net_status_annotation)
                    return False, (msg, imgr)

            return True, (None, None)

        @wait_until(wait_timeout, sleep_timeout)
        def wait_storage_net_disabled_on_longhorn(self):
            imgrs_running, (code, data) = self._lh_instance_mgrs_running()
            if not imgrs_running:
                return False, (code, data)

            for imgr in data:
                if self.nets_annotation in imgr['metadata']['annotations']:
                    return False, (f"Pod should not has annotation {self.nets_annotation}", imgr)

            return True, (None, None)

    return SettingChecker()