Module harvester_e2e_tests.integrations.test_1_images
Functions
def cluster_network(api_client, vlan_nic)-
Expand source code
@pytest.fixture(scope="class") def cluster_network(api_client, vlan_nic): # We should change this at some point. It fails if the total cnet name is over 12 chars cnet = f"cnet-{vlan_nic.lower()}"[:12] # ???: RFC1123 and length limits code, data = api_client.clusternetworks.get(cnet) if code != 200: code, data = api_client.clusternetworks.create(cnet) assert 201 == code, (code, data) code, data = api_client.clusternetworks.get_config(cnet) if code != 200: code, data = api_client.clusternetworks.create_config(cnet, cnet, vlan_nic) assert 201 == code, (code, data) yield cnet # Teardown code, data = api_client.clusternetworks.delete_config(cnet) assert 200 == code, (code, data) code, data = api_client.clusternetworks.delete(cnet) assert 200 == code, (code, data) def create_image_url(api_client, name, image_url, image_checksum, wait_timeout)-
Expand source code
def create_image_url(api_client, name, image_url, image_checksum, wait_timeout): code, data = api_client.images.create_by_url(name, image_url, image_checksum) assert 201 == code, (code, data) image_spec = data.get("spec") assert name == image_spec.get("displayName") assert "download" == image_spec.get("sourceType") endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(name) image_status = data.get("status", {}) assert 200 == code, (code, data) if image_status.get("progress") == 100: break sleep(5) else: raise AssertionError( f"Failed to download image {name} with {wait_timeout} timed out\n" f"Still got {code} with {data}" ) def delete_image(api_client, image_name, wait_timeout)-
Expand source code
def delete_image(api_client, image_name, wait_timeout): code, data = api_client.images.delete(image_name) assert 200 == code, f"Failed to delete image with error: {code}, {data}" endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(image_name) if code == 404: break sleep(5) else: raise AssertionError( f"Failed to delete image {image_name} with {wait_timeout} timed out\n" f"Still got {code} with {data}" ) def delete_volume(api_client, volume_name, wait_timeout)-
Expand source code
def delete_volume(api_client, volume_name, wait_timeout): code, data = api_client.volumes.delete(volume_name) assert 200 == code, f"Failed to delete volume with error: {code}, {data}" endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.volumes.get(volume_name) if code == 404: break sleep(5) else: raise AssertionError( f"Failed to delete volume {volume_name} with {wait_timeout} timed out\n" f"Still got {code} with {data}" ) def export_storage_class()-
Expand source code
@pytest.fixture(scope="session") def export_storage_class(): storage_class = "harvester-longhorn" return storage_class def fake_invalid_image_file()-
Expand source code
@pytest.fixture(scope="session") def fake_invalid_image_file(): with NamedTemporaryFile("wb") as f: f.seek(5) # less than 10MB f.write(b"\0") f.seek(0) yield Path(f.name) def get_image(api_client, image_name)-
Expand source code
def get_image(api_client, image_name): code, data = api_client.images.get() assert len(data["items"]) > 0, (code, data) code, data = api_client.images.get(image_name) assert 200 == code, (code, data) assert image_name == data["metadata"]["name"] def image_info(request)-
Expand source code
@pytest.fixture(params=["image_opensuse", "image_ubuntu"]) def image_info(request): return request.getfixturevalue(request.param) def storage_network(api_client, cluster_network, vlan_id, vlan_cidr, setting_checker)-
Expand source code
@pytest.fixture(scope="class") def storage_network(api_client, cluster_network, vlan_id, vlan_cidr, setting_checker): ''' Ref. https://docs.harvesterhci.io/v1.3/advanced/storagenetwork/#configuration-example ''' enable_spec = api_client.settings.StorageNetworkSpec.enable_with( vlan_id, cluster_network, vlan_cidr ) code, data = api_client.settings.update('storage-network', enable_spec) assert 200 == code, (code, data) snet_enabled, (code, data) = setting_checker.wait_storage_net_enabled_on_harvester() assert snet_enabled, (code, data) snet_enabled, (code, data) = setting_checker.wait_storage_net_enabled_on_longhorn(vlan_cidr) assert snet_enabled, (code, data) yield # Teardown disable_spec = api_client.settings.StorageNetworkSpec.disable() code, data = api_client.settings.update('storage-network', disable_spec) assert 200 == code, (code, data) snet_disabled, (code, data) = setting_checker.wait_storage_net_disabled_on_harvester() assert snet_disabled, (code, data) snet_disabled, (code, data) = setting_checker.wait_storage_net_disabled_on_longhorn() assert snet_disabled, (code, data) def vlan_cidr(api_client, cluster_network, vlan_id, wait_timeout, sleep_timeout)-
Expand source code
@pytest.fixture(scope="class") def vlan_cidr(api_client, cluster_network, vlan_id, wait_timeout, sleep_timeout): vnet = f'{cluster_network}-vlan{vlan_id}' code, data = api_client.networks.get(vnet) if code != 200: code, data = api_client.networks.create(vnet, vlan_id, cluster_network=cluster_network) assert 201 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.networks.get(vnet) annotations = data['metadata'].get('annotations', {}) if 200 == code and annotations.get('network.harvesterhci.io/route'): route = json.loads(annotations['network.harvesterhci.io/route']) if route['cidr']: break sleep(sleep_timeout) else: raise AssertionError( f"Fail to get route info of VM network {vnet} with error: {code}, {data}" ) yield route['cidr'] # Teardown code, data = api_client.networks.delete(vnet) assert 200 == code, (code, data)
Classes
class TestBackendImages-
Expand source code
@pytest.mark.p0 @pytest.mark.images class TestBackendImages: @pytest.mark.smoke @pytest.mark.dependency(name="create_image_from_volume") def test_create_image_from_volume( self, api_client, unique_name, export_storage_class, wait_timeout ): """ Test create image from volume Steps: 1. Create a volume "test-volume" in Volumes page 2. Export the volume to image "export-image" 3. Check the image "export-image" exists 4. Cleanup image "export-image" on Images page 5. Cleanup volume "test-volume" on Volumes page """ volume_name = f"volume-{unique_name}" image_name = f"image-{unique_name}" spec = api_client.volumes.Spec(1) code, data = api_client.volumes.create(volume_name, spec) assert 201 == code, (code, data) # Check volume ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.volumes.get(volume_name) if data["status"]["phase"] == "Bound": break sleep(5) else: raise AssertionError( f"Failed to delete volume {volume_name} bound in {wait_timeout} timed out\n" f"Still got {code} with {data}" ) api_client.volumes.export(volume_name, image_name, export_storage_class) endtime = datetime.now() + timedelta(seconds=wait_timeout) image_id = "" while endtime > datetime.now(): code, data = api_client.images.get() assert 200 == code, (code, data) for image in data["items"]: if image["spec"]["displayName"] == image_name: if 100 == image.get("status", {}).get("progress", 0): image_id = image["metadata"]["name"] break else: raise AssertionError(f"Failed to find image {image_name}") if image_id != "": break sleep(3) # snooze delete_volume(api_client, volume_name, wait_timeout) delete_image(api_client, image_id, wait_timeout) @pytest.mark.smoke @pytest.mark.dependency(name="create_image_url") def test_create_image_url(self, image_info, unique_name, api_client, wait_timeout): """ Test create raw and iso type image from url Steps: 1. Open image page and select default URL 2. Input qcow2 image file download URL, wait for download complete 3. Check the qcow2 image exists 4. Input iso image file download URL, wait for download complete 5. Check the iso image exists """ image_name = f"{image_info.name}-{unique_name}" image_url = image_info.url create_image_url(api_client, image_name, image_url, image_info.image_checksum, wait_timeout) @pytest.mark.sanity @pytest.mark.skip_version_if("> v1.2.0", "<= v1.4.0", reason="Issue#4293 fix after `v1.4.0`") @pytest.mark.dependency(name="delete_image_recreate", depends=["create_image_url"]) def test_delete_image_recreate( self, api_client, image_info, unique_name, fake_image_file, wait_timeout, ): """ Test create raw and iso type image from file Steps: 1. Check the image created by URL exists 2. Delete the newly created image 3. Create an iso file type image from URL 4. Check the iso image exists 5. Upload an qcow2 file type image 5. Delete the newly uploaded file 6. Upload a new qcow2 file type image """ image_name = f"{image_info.name}-{unique_name}" image_url = image_info.url image_checksum = image_info.image_checksum get_image(api_client, image_name) delete_image(api_client, image_name, wait_timeout) create_image_url(api_client, image_name, image_url, image_checksum, wait_timeout) get_image(api_client, image_name) resp = api_client.images.create_by_file(unique_name, fake_image_file) assert ( 200 == resp.status_code ), f"Failed to upload fake image with error:{resp.status_code}, {resp.content}" get_image(api_client, unique_name) delete_image(api_client, unique_name, wait_timeout) resp = api_client.images.create_by_file(unique_name, fake_image_file) assert ( 200 == resp.status_code ), f"Failed to upload fake image with error:{resp.status_code}, {resp.content}" get_image(api_client, unique_name) delete_image(api_client, unique_name, wait_timeout) @pytest.mark.sanity @pytest.mark.negative def test_create_invalid_file( self, api_client, gen_unique_name, fake_invalid_image_file, wait_timeout ): """ Test create upload image from invalid file type Steps: 1. Prepare an invalid file that is not in a multiple of 512 bytes 2. Try to upload invalid image file which to images page 2. Check should get an error """ unique_name = gen_unique_name() resp = api_client.images.create_by_file(unique_name, fake_invalid_image_file) assert ( 500 == resp.status_code ), f"File size correct, it's a multiple of 512 bytes:{resp.status_code}, {resp.content}" delete_image(api_client, unique_name, wait_timeout) @pytest.mark.sanity @pytest.mark.negative @pytest.mark.dependency(name="edit_image_in_use", depends=["create_image_url"]) def test_edit_image_in_use(self, api_client, unique_name, image_info, wait_timeout): """ Test can edit image which already in use Steps: 1. Check the image created from URL exists 2. Create a volume from existing image 3. Update the image labels and description 4. Check can change the image content """ image_name = f"{image_info.name}-{unique_name}" volume_name = f"volume-{image_info.name}-{unique_name}" code, data = api_client.images.get(name=image_name) assert 200 == code, (code, data) image_size_gb = data["status"]["virtualSize"] // 1024**3 + 1 image_id = f"{data['metadata']['namespace']}/{image_name}" # Create volume from image_id spec = api_client.volumes.Spec(image_size_gb) code, data = api_client.volumes.create(volume_name, spec, image_id=image_id) assert 201 == code, (code, data) # Check volume ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.volumes.get(volume_name) if data["status"]["phase"] == "Bound": break sleep(5) else: raise AssertionError( f"Failed to delete volume {unique_name} bound in {wait_timeout} timed out\n" f"Still got {code} with {data}" ) # Update image content updates = { "labels": {"usage-label": "yes"}, "annotations": {"field.cattle.io/description": "edit image in use"}, } # Update image by input code, data = api_client.images.update(image_name, dict(metadata=updates)) assert 200 == code, f"Failed to update image with error: {code}, {data}" unexpected = list() for field, pairs in updates.items(): for k, val in pairs.items(): if data["metadata"][field].get(k) != val: unexpected.append((field, k, val, data["metadata"][field].get(k))) assert not unexpected, "\n".join( f"Update {f} failed, set key {k} as {v} but got {n}" for f, k, v, n in unexpected ) delete_volume(api_client, volume_name, wait_timeout) delete_image(api_client, image_name, wait_timeout)Class variables
var pytestmark
Methods
def test_create_image_from_volume(self, api_client, unique_name, export_storage_class, wait_timeout)-
Expand source code
@pytest.mark.smoke @pytest.mark.dependency(name="create_image_from_volume") def test_create_image_from_volume( self, api_client, unique_name, export_storage_class, wait_timeout ): """ Test create image from volume Steps: 1. Create a volume "test-volume" in Volumes page 2. Export the volume to image "export-image" 3. Check the image "export-image" exists 4. Cleanup image "export-image" on Images page 5. Cleanup volume "test-volume" on Volumes page """ volume_name = f"volume-{unique_name}" image_name = f"image-{unique_name}" spec = api_client.volumes.Spec(1) code, data = api_client.volumes.create(volume_name, spec) assert 201 == code, (code, data) # Check volume ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.volumes.get(volume_name) if data["status"]["phase"] == "Bound": break sleep(5) else: raise AssertionError( f"Failed to delete volume {volume_name} bound in {wait_timeout} timed out\n" f"Still got {code} with {data}" ) api_client.volumes.export(volume_name, image_name, export_storage_class) endtime = datetime.now() + timedelta(seconds=wait_timeout) image_id = "" while endtime > datetime.now(): code, data = api_client.images.get() assert 200 == code, (code, data) for image in data["items"]: if image["spec"]["displayName"] == image_name: if 100 == image.get("status", {}).get("progress", 0): image_id = image["metadata"]["name"] break else: raise AssertionError(f"Failed to find image {image_name}") if image_id != "": break sleep(3) # snooze delete_volume(api_client, volume_name, wait_timeout) delete_image(api_client, image_id, wait_timeout)Test create image from volume
Steps
- Create a volume "test-volume" in Volumes page
- Export the volume to image "export-image"
- Check the image "export-image" exists
- Cleanup image "export-image" on Images page
- Cleanup volume "test-volume" on Volumes page
def test_create_image_url(self, image_info, unique_name, api_client, wait_timeout)-
Expand source code
@pytest.mark.smoke @pytest.mark.dependency(name="create_image_url") def test_create_image_url(self, image_info, unique_name, api_client, wait_timeout): """ Test create raw and iso type image from url Steps: 1. Open image page and select default URL 2. Input qcow2 image file download URL, wait for download complete 3. Check the qcow2 image exists 4. Input iso image file download URL, wait for download complete 5. Check the iso image exists """ image_name = f"{image_info.name}-{unique_name}" image_url = image_info.url create_image_url(api_client, image_name, image_url, image_info.image_checksum, wait_timeout)Test create raw and iso type image from url
Steps: 1. Open image page and select default URL 2. Input qcow2 image file download URL, wait for download complete 3. Check the qcow2 image exists 4. Input iso image file download URL, wait for download complete 5. Check the iso image exists
def test_create_invalid_file(self, api_client, gen_unique_name, fake_invalid_image_file, wait_timeout)-
Expand source code
@pytest.mark.sanity @pytest.mark.negative def test_create_invalid_file( self, api_client, gen_unique_name, fake_invalid_image_file, wait_timeout ): """ Test create upload image from invalid file type Steps: 1. Prepare an invalid file that is not in a multiple of 512 bytes 2. Try to upload invalid image file which to images page 2. Check should get an error """ unique_name = gen_unique_name() resp = api_client.images.create_by_file(unique_name, fake_invalid_image_file) assert ( 500 == resp.status_code ), f"File size correct, it's a multiple of 512 bytes:{resp.status_code}, {resp.content}" delete_image(api_client, unique_name, wait_timeout)Test create upload image from invalid file type
Steps: 1. Prepare an invalid file that is not in a multiple of 512 bytes 2. Try to upload invalid image file which to images page 2. Check should get an error
def test_delete_image_recreate(self, api_client, image_info, unique_name, fake_image_file, wait_timeout)-
Expand source code
@pytest.mark.sanity @pytest.mark.skip_version_if("> v1.2.0", "<= v1.4.0", reason="Issue#4293 fix after `v1.4.0`") @pytest.mark.dependency(name="delete_image_recreate", depends=["create_image_url"]) def test_delete_image_recreate( self, api_client, image_info, unique_name, fake_image_file, wait_timeout, ): """ Test create raw and iso type image from file Steps: 1. Check the image created by URL exists 2. Delete the newly created image 3. Create an iso file type image from URL 4. Check the iso image exists 5. Upload an qcow2 file type image 5. Delete the newly uploaded file 6. Upload a new qcow2 file type image """ image_name = f"{image_info.name}-{unique_name}" image_url = image_info.url image_checksum = image_info.image_checksum get_image(api_client, image_name) delete_image(api_client, image_name, wait_timeout) create_image_url(api_client, image_name, image_url, image_checksum, wait_timeout) get_image(api_client, image_name) resp = api_client.images.create_by_file(unique_name, fake_image_file) assert ( 200 == resp.status_code ), f"Failed to upload fake image with error:{resp.status_code}, {resp.content}" get_image(api_client, unique_name) delete_image(api_client, unique_name, wait_timeout) resp = api_client.images.create_by_file(unique_name, fake_image_file) assert ( 200 == resp.status_code ), f"Failed to upload fake image with error:{resp.status_code}, {resp.content}" get_image(api_client, unique_name) delete_image(api_client, unique_name, wait_timeout)Test create raw and iso type image from file
Steps: 1. Check the image created by URL exists 2. Delete the newly created image 3. Create an iso file type image from URL 4. Check the iso image exists 5. Upload an qcow2 file type image 5. Delete the newly uploaded file 6. Upload a new qcow2 file type image
def test_edit_image_in_use(self, api_client, unique_name, image_info, wait_timeout)-
Expand source code
@pytest.mark.sanity @pytest.mark.negative @pytest.mark.dependency(name="edit_image_in_use", depends=["create_image_url"]) def test_edit_image_in_use(self, api_client, unique_name, image_info, wait_timeout): """ Test can edit image which already in use Steps: 1. Check the image created from URL exists 2. Create a volume from existing image 3. Update the image labels and description 4. Check can change the image content """ image_name = f"{image_info.name}-{unique_name}" volume_name = f"volume-{image_info.name}-{unique_name}" code, data = api_client.images.get(name=image_name) assert 200 == code, (code, data) image_size_gb = data["status"]["virtualSize"] // 1024**3 + 1 image_id = f"{data['metadata']['namespace']}/{image_name}" # Create volume from image_id spec = api_client.volumes.Spec(image_size_gb) code, data = api_client.volumes.create(volume_name, spec, image_id=image_id) assert 201 == code, (code, data) # Check volume ready endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.volumes.get(volume_name) if data["status"]["phase"] == "Bound": break sleep(5) else: raise AssertionError( f"Failed to delete volume {unique_name} bound in {wait_timeout} timed out\n" f"Still got {code} with {data}" ) # Update image content updates = { "labels": {"usage-label": "yes"}, "annotations": {"field.cattle.io/description": "edit image in use"}, } # Update image by input code, data = api_client.images.update(image_name, dict(metadata=updates)) assert 200 == code, f"Failed to update image with error: {code}, {data}" unexpected = list() for field, pairs in updates.items(): for k, val in pairs.items(): if data["metadata"][field].get(k) != val: unexpected.append((field, k, val, data["metadata"][field].get(k))) assert not unexpected, "\n".join( f"Update {f} failed, set key {k} as {v} but got {n}" for f, k, v, n in unexpected ) delete_volume(api_client, volume_name, wait_timeout) delete_image(api_client, image_name, wait_timeout)Test can edit image which already in use
Steps: 1. Check the image created from URL exists 2. Create a volume from existing image 3. Update the image labels and description 4. Check can change the image content
class TestImageEnhancements-
Expand source code
class TestImageEnhancements: @pytest.mark.p2 @pytest.mark.images @pytest.mark.performance @pytest.mark.parametrize("image_size", ["10Mi", "50Mi", "200Mi"]) def test_image_processing_performance(self, api_client, unique_name, image_size, wait_timeout): """ Test Harvester image processing performance (excluding network upload time) Steps: 1. Upload image files of different sizes 2. Measure processing time from upload completion to ready state 3. Verify Harvester processing meets performance expectations """ # Processing thresholds (seconds) - time from upload complete to ready processing_thresholds = {"10Mi": 10, "50Mi": 20, "200Mi": 40} size_bytes = int(image_size[:-2]) * 1024 * 1024 if size_bytes % 512 != 0: size_bytes = ((size_bytes // 512) + 1) * 512 with NamedTemporaryFile("wb", suffix=".raw") as f: f.seek(size_bytes - 1) f.write(b"\x00") f.seek(0) image_name = f"proc-perf-{unique_name}" # Step 1: Upload (don't measure this time - it's network dependent) resp = api_client.images.create_by_file(image_name, Path(f.name)) assert resp.ok, f"Failed to upload {image_size} image: {resp.status_code}, {resp.text}" # Step 2: Wait for upload to be acknowledged by Harvester initial_timeout = datetime.now() + timedelta(seconds=30) while initial_timeout > datetime.now(): code, data = api_client.images.get(image_name) if code == 200: status = data.get("status", {}) if "progress" in status: # Upload acknowledged, start measuring processing time processing_start_time = time() break sleep(1) else: raise AssertionError(f"Image {image_name} not acknowledged by Harvester") # Step 3: Measure time from processing start to completion endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(image_name) if code == 200 and data.get("status", {}).get("progress") == 100: processing_end_time = time() break sleep(2) else: raise AssertionError(f"Image processing did not complete within {wait_timeout}s") # Step 4: Validate Harvester processing performance processing_time = processing_end_time - processing_start_time threshold = processing_thresholds[image_size] assert processing_time < threshold, f"Harvester processing took" \ f" {processing_time:.2f}s, expected < {threshold}s" print(f"Harvester Processing Performance:" f" {image_size} processed in {processing_time:.2f}s") delete_image(api_client, image_name, wait_timeout) @pytest.mark.p1 @pytest.mark.images def test_image_checksum_validation(self, api_client, image_info, unique_name, wait_timeout): """ Test image checksum validation during URL-based creation Steps: 1. Create image with correct checksum - should succeed 2. Create image with incorrect checksum - should fail 3. Create image with no checksum - should succeed 4. Verify checksum validation behavior """ # Extract OS name from image_info for naming os_name = image_info.name.lower() # e.g., "opensuse" or "ubuntu" # Test with correct checksum correct_name = f"correct-checksum-{os_name}-{unique_name}" create_image_url(api_client, correct_name, image_info.url, image_info.image_checksum, wait_timeout) # Test with incorrect checksum incorrect_name = f"incorrect-checksum-{os_name}-{unique_name}" fake_checksum = hashlib.sha512(b'fake_checksum').hexdigest() code, data = api_client.images.create_by_url(incorrect_name, image_info.url, fake_checksum) assert 201 == code, "Image creation should initially succeed" # Should eventually fail due to checksum mismatch endtime = datetime.now() + timedelta(seconds=wait_timeout) checksum_error = False while endtime > datetime.now(): code, data = api_client.images.get(incorrect_name) if code == 200: status = data.get("status", {}) if "checksum" in str(status).lower() and status.get("failed", 0) > 0: checksum_error = True break sleep(5) assert checksum_error, "Image should fail due to checksum mismatch" # Test with no checksum no_checksum_name = f"no-checksum-{os_name}-{unique_name}" create_image_url(api_client, no_checksum_name, image_info.url, None, wait_timeout) # Cleanup delete_image(api_client, correct_name, wait_timeout) # Cleanup incorrect checksum image (may not exist if creation failed) try: delete_image(api_client, incorrect_name, wait_timeout) except AssertionError as e: print(f"Cleanup failed/skipped for incorrect checksum image {incorrect_name}: {e}") # Cleanup no-checksum image (only if it was created) try: delete_image(api_client, no_checksum_name, wait_timeout) except AssertionError as e: print(f"Cleanup failed/skipped for no-checksum image {no_checksum_name}: {e}") @pytest.mark.p1 @pytest.mark.images def test_image_concurrent_operations(self, api_client, fake_image_file, gen_unique_name, wait_timeout): """ Test comprehensive concurrent image operations Steps: 1. Test concurrent image uploads (multiple different images) 2. Test concurrent operations on same image (get, update, status checks) 3. Verify system handles all concurrency scenarios gracefully 4. Ensure data integrity and proper resource management """ # Part 1: Concurrent uploads of different images concurrent_upload_count = 3 upload_image_names = [f"upload-concurrent-{i}-{gen_unique_name()}" for i in range(concurrent_upload_count)] successful_uploads = [] upload_errors = [] def upload_single_image(image_name): resp = api_client.images.create_by_file(image_name, fake_image_file) if resp.ok: return {"name": image_name, "success": True} else: return {"name": image_name, "success": False, "error": f"{resp.status_code}: {resp.text}"} # Execute concurrent uploads with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_upload_count) as executor: # NOQA upload_futures = [executor.submit(upload_single_image, name) for name in upload_image_names] for future in concurrent.futures.as_completed(upload_futures): result = future.result() if result["success"]: successful_uploads.append(result["name"]) else: upload_errors.append(result) assert len(upload_errors) == 0, f"Concurrent upload errors: {upload_errors}" assert len(successful_uploads) == concurrent_upload_count, \ f"Expected {concurrent_upload_count} uploads, got {len(successful_uploads)}" # Verify all uploaded images are accessible for image_name in successful_uploads: code, data = api_client.images.get(image_name) assert code == 200, f"Image {image_name} not accessible: {code}" # Part 2: Concurrent operations on the same image # Use the first uploaded image for operations testing target_image_name = successful_uploads[0] concurrent_results = [] def concurrent_operation(operation_type, operation_id): if operation_type == "get": code, data = api_client.images.get(target_image_name) return {"id": operation_id, "op": "get", "success": code == 200, "code": code} elif operation_type == "update": update_data = {"labels": {f"concurrent-op-{operation_id}": f"timestamp-{time.time()}"}} # NOQA code, data = api_client.images.update(target_image_name, dict(metadata=update_data)) return {"id": operation_id, "op": "update", "success": code in [200, 409], "code": code} elif operation_type == "status_check": code, data = api_client.images.get(target_image_name) if code == 200: progress = data.get("status", {}).get("progress", 0) return {"id": operation_id, "op": "status_check", "success": True, "code": code, "progress": progress} return {"id": operation_id, "op": "status_check", "success": False, "code": code} # Launch concurrent operations on the same image threads = [] operations = [ ("get", 1), ("update", 2), ("status_check", 3), ("get", 4), ("update", 5), ("get", 6), ("status_check", 7) ] for op_type, op_id in operations: thread = threading.Thread( target=lambda ot=op_type, oid=op_id: concurrent_results.append(concurrent_operation(ot, oid)) # NOQA ) threads.append(thread) thread.start() # Wait for all operations to complete for thread in threads: thread.join(timeout=15) assert not thread.is_alive(), "Thread didn't complete in time" # Analyze concurrent operation results successful_ops = [r for r in concurrent_results if r and r.get("success")] assert len(successful_ops) > 0, "No concurrent operations succeeded on same image" # Target image should still be responsive after concurrent operations final_code, final_data = api_client.images.get(target_image_name) assert final_code == 200, f"Target image not accessible after " \ f"concurrent operations: {final_code}" # Part 3: Concurrent cleanup of all images cleanup_successful = 0 def cleanup_single_image(image_name): delete_image(api_client, image_name, wait_timeout) return image_name with concurrent.futures.ThreadPoolExecutor(max_workers=len(successful_uploads)) as executor: # NOQA cleanup_futures = [executor.submit(cleanup_single_image, name) for name in successful_uploads] for _ in concurrent.futures.as_completed(cleanup_futures): cleanup_successful += 1 assert cleanup_successful == len(successful_uploads), \ f"Cleanup failed: {cleanup_successful}/{len(successful_uploads)}" @pytest.mark.p1 @pytest.mark.images def test_image_metadata_operations(self, api_client, fake_image_file, unique_name, wait_timeout): """ Test comprehensive image metadata operations Steps: 1. Create image and verify initial metadata 2. Update labels, annotations, and description 3. Test metadata persistence across operations 4. Verify metadata validation and limits """ image_name = f"metadata-test-{unique_name}" # Create image resp = api_client.images.create_by_file(image_name, fake_image_file) assert resp.ok, f"Failed to create image: {resp.status_code}, {resp.text}" # Wait for initial processing endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(image_name) if code == 200 and data.get("status", {}).get("progress") == 100: break sleep(2) # Test comprehensive metadata updates metadata_updates = { "labels": { "environment": "test", "team": "qa", "version": "1.0.0", "critical": "false" }, "annotations": { "field.cattle.io/description": "Test image for metadata operations", "harvesterhci.io/imageId": image_name, "custom.annotation/usage": "automation-testing", "created.by": "harvester-e2e-tests" } } code, data = api_client.images.update(image_name, dict(metadata=metadata_updates)) assert 200 == code, f"Failed to update metadata: {code}, {data}" # Verify metadata was applied correctly code, data = api_client.images.get(image_name) assert 200 == code, (code, data) metadata = data["metadata"] for field, expected_pairs in metadata_updates.items(): for key, expected_value in expected_pairs.items(): actual_value = metadata.get(field, {}).get(key) assert actual_value == expected_value, f"Metadata {field}.{key}: expected"\ f" {expected_value}, got {actual_value}" # Test incremental metadata updates incremental_updates = { "labels": {"priority": "high"}, "annotations": {"last.modified": "2025-09-30"} } code, data = api_client.images.update(image_name, dict(metadata=incremental_updates)) assert 200 == code, f"Failed incremental update: {code}, {data}" # Verify both old and new metadata exist code, data = api_client.images.get(image_name) assert 200 == code, (code, data) metadata = data["metadata"] assert metadata.get("labels", {}).get("environment") == "test" # Old label assert metadata.get("labels", {}).get("priority") == "high" # New label delete_image(api_client, image_name, wait_timeout) @pytest.mark.p2 @pytest.mark.images def test_image_complete_lifecycle(self, api_client, image_info, unique_name, wait_timeout): """ Test complete image lifecycle from creation to deletion Steps: 1. Create image from URL with metadata 2. Update image properties multiple times 3. Create volume from image 4. Export volume back to new image 5. Download and verify image content 6. Perform cleanup and verify deletion """ original_image = f"lifecycle-original-{unique_name}" exported_image = f"lifecycle-exported-{unique_name}" test_volume = f"lifecycle-volume-{unique_name}" # Step 1: Create original image with metadata create_image_url(api_client, original_image, image_info.url, image_info.image_checksum, wait_timeout) initial_metadata = { "labels": {"lifecycle": "test", "stage": "original"}, "annotations": {"description": "Lifecycle test original image"} } code, data = api_client.images.update(original_image, dict(metadata=initial_metadata)) assert 200 == code, f"Failed to set initial metadata: {code}, {data}" # Step 2: Multiple metadata updates for i in range(3): update_metadata = { "labels": {"update.count": str(i + 1)}, "annotations": {f"update.{i}": f"value-{i}"} } code, data = api_client.images.update(original_image, dict(metadata=update_metadata)) assert 200 == code, f"Failed update {i}: {code}, {data}" # Step 3: Create volume from image code, data = api_client.images.get(original_image) assert 200 == code, (code, data) image_size_gb = data["status"]["virtualSize"] // 1024**3 + 1 image_id = f"{data['metadata']['namespace']}/{original_image}" spec = api_client.volumes.Spec(image_size_gb) code, data = api_client.volumes.create(test_volume, spec, image_id=image_id) assert 201 == code, (code, data) # Wait for volume to be bound endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.volumes.get(test_volume) if data["status"]["phase"] == "Bound": break sleep(5) else: raise AssertionError(f"Volume binding timeout: {code}, {data}") # Step 4: Export volume back to image code, data = api_client.volumes.export(test_volume, exported_image, "harvester-longhorn") assert 200 == code, (code, data) # Wait for export completion endtime = datetime.now() + timedelta(seconds=wait_timeout) exported_image_id = "" while endtime > datetime.now(): code, data = api_client.images.get() assert 200 == code, (code, data) for image in data["items"]: if image["spec"]["displayName"] == exported_image: if image.get("status", {}).get("progress") == 100: exported_image_id = image["metadata"]["name"] break if exported_image_id: break sleep(5) assert exported_image_id, f"Failed to find exported image {exported_image}" # Step 5: Verify exported image properties code, data = api_client.images.get(exported_image_id) assert 200 == code, (code, data) assert data["spec"]["displayName"] == exported_image # Step 6: Cleanup in proper order delete_volume(api_client, test_volume, wait_timeout) delete_image(api_client, original_image, wait_timeout) delete_image(api_client, exported_image_id, wait_timeout) @pytest.mark.p1 @pytest.mark.images @pytest.mark.negative def test_image_name_length_limits(self, api_client, unique_name, wait_timeout): """ Test image creation with various name lengths Steps: 1. Test failure with image name > 63 characters (should fail) 2. Test success with image name ≤ 63 characters (should pass) """ aligned_size = 1024 * 1024 # 1MB exactly with NamedTemporaryFile("wb", suffix=".raw") as f: f.write(b"\x00" * aligned_size) f.flush() # Test 1: Invalid long name (should FAIL) long_base = "very-long-image-name-for-testing-kubernetes-name-length-limits-that-exceeds-maximum" # NOQA invalid_long_name = f"{long_base}-{unique_name}" resp = api_client.images.create_by_file(invalid_long_name, Path(f.name)) assert not resp.ok, f"Expected failure with long name but" \ f" got success: {resp.status_code}" assert resp.status_code in [400, 422], f"Expected 400/422 for " \ f"invalid name, got: {resp.status_code}" # Test 2: Valid name (should PASS) valid_base = "valid-image-name" valid_name = f"{valid_base}-{unique_name}" if len(valid_name) > 63: valid_name = f"{valid_base[:30]}-{unique_name}" valid_name = valid_name.rstrip('-') resp = api_client.images.create_by_file(valid_name, Path(f.name)) assert resp.ok, f"Failed with valid name: {resp.status_code}, {resp.text}" # Cleanup delete_image(api_client, valid_name, wait_timeout) @pytest.mark.p1 @pytest.mark.images @pytest.mark.negative def test_image_metadata_limits(self, api_client, unique_name, wait_timeout): """ Test image metadata handling with excessive data Steps: 1. Create a valid image 2. Test behavior with excessive metadata (many labels/annotations) 3. Verify proper handling of metadata limits """ aligned_size = 1024 * 1024 # 1MB exactly with NamedTemporaryFile("wb", suffix=".raw") as f: f.write(b"\x00" * aligned_size) f.flush() # Create base image for metadata testing image_name = f"metadata-test-{unique_name}" resp = api_client.images.create_by_file(image_name, Path(f.name)) assert resp.ok, f"Failed to create base image: {resp.status_code}, {resp.text}" # Test excessive metadata excessive_metadata = { "labels": {f"test-label-key-{i:03d}": f"test-label-value" f"-{i:03d}" for i in range(20)}, "annotations": { f"test.annotation.key/{i:03d}": "x" * 200 for i in range(10) } } code, data = api_client.images.update(image_name, dict(metadata=excessive_metadata)) if code == 200: # Verify metadata was actually stored code, updated_data = api_client.images.get(image_name) assert code == 200, f"Failed to retrieve updated image: {code}" elif code in [400, 422, 413]: # 413 = Request Entity Too Large print(f"✓ Excessive metadata properly rejected with status: {code}") else: assert False, f"Unexpected metadata response: {code}, {data}" # Cleanup delete_image(api_client, image_name, wait_timeout)Methods
def test_image_checksum_validation(self, api_client, image_info, unique_name, wait_timeout)-
Expand source code
@pytest.mark.p1 @pytest.mark.images def test_image_checksum_validation(self, api_client, image_info, unique_name, wait_timeout): """ Test image checksum validation during URL-based creation Steps: 1. Create image with correct checksum - should succeed 2. Create image with incorrect checksum - should fail 3. Create image with no checksum - should succeed 4. Verify checksum validation behavior """ # Extract OS name from image_info for naming os_name = image_info.name.lower() # e.g., "opensuse" or "ubuntu" # Test with correct checksum correct_name = f"correct-checksum-{os_name}-{unique_name}" create_image_url(api_client, correct_name, image_info.url, image_info.image_checksum, wait_timeout) # Test with incorrect checksum incorrect_name = f"incorrect-checksum-{os_name}-{unique_name}" fake_checksum = hashlib.sha512(b'fake_checksum').hexdigest() code, data = api_client.images.create_by_url(incorrect_name, image_info.url, fake_checksum) assert 201 == code, "Image creation should initially succeed" # Should eventually fail due to checksum mismatch endtime = datetime.now() + timedelta(seconds=wait_timeout) checksum_error = False while endtime > datetime.now(): code, data = api_client.images.get(incorrect_name) if code == 200: status = data.get("status", {}) if "checksum" in str(status).lower() and status.get("failed", 0) > 0: checksum_error = True break sleep(5) assert checksum_error, "Image should fail due to checksum mismatch" # Test with no checksum no_checksum_name = f"no-checksum-{os_name}-{unique_name}" create_image_url(api_client, no_checksum_name, image_info.url, None, wait_timeout) # Cleanup delete_image(api_client, correct_name, wait_timeout) # Cleanup incorrect checksum image (may not exist if creation failed) try: delete_image(api_client, incorrect_name, wait_timeout) except AssertionError as e: print(f"Cleanup failed/skipped for incorrect checksum image {incorrect_name}: {e}") # Cleanup no-checksum image (only if it was created) try: delete_image(api_client, no_checksum_name, wait_timeout) except AssertionError as e: print(f"Cleanup failed/skipped for no-checksum image {no_checksum_name}: {e}")Test image checksum validation during URL-based creation Steps: 1. Create image with correct checksum - should succeed 2. Create image with incorrect checksum - should fail 3. Create image with no checksum - should succeed 4. Verify checksum validation behavior
def test_image_complete_lifecycle(self, api_client, image_info, unique_name, wait_timeout)-
Expand source code
@pytest.mark.p2 @pytest.mark.images def test_image_complete_lifecycle(self, api_client, image_info, unique_name, wait_timeout): """ Test complete image lifecycle from creation to deletion Steps: 1. Create image from URL with metadata 2. Update image properties multiple times 3. Create volume from image 4. Export volume back to new image 5. Download and verify image content 6. Perform cleanup and verify deletion """ original_image = f"lifecycle-original-{unique_name}" exported_image = f"lifecycle-exported-{unique_name}" test_volume = f"lifecycle-volume-{unique_name}" # Step 1: Create original image with metadata create_image_url(api_client, original_image, image_info.url, image_info.image_checksum, wait_timeout) initial_metadata = { "labels": {"lifecycle": "test", "stage": "original"}, "annotations": {"description": "Lifecycle test original image"} } code, data = api_client.images.update(original_image, dict(metadata=initial_metadata)) assert 200 == code, f"Failed to set initial metadata: {code}, {data}" # Step 2: Multiple metadata updates for i in range(3): update_metadata = { "labels": {"update.count": str(i + 1)}, "annotations": {f"update.{i}": f"value-{i}"} } code, data = api_client.images.update(original_image, dict(metadata=update_metadata)) assert 200 == code, f"Failed update {i}: {code}, {data}" # Step 3: Create volume from image code, data = api_client.images.get(original_image) assert 200 == code, (code, data) image_size_gb = data["status"]["virtualSize"] // 1024**3 + 1 image_id = f"{data['metadata']['namespace']}/{original_image}" spec = api_client.volumes.Spec(image_size_gb) code, data = api_client.volumes.create(test_volume, spec, image_id=image_id) assert 201 == code, (code, data) # Wait for volume to be bound endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.volumes.get(test_volume) if data["status"]["phase"] == "Bound": break sleep(5) else: raise AssertionError(f"Volume binding timeout: {code}, {data}") # Step 4: Export volume back to image code, data = api_client.volumes.export(test_volume, exported_image, "harvester-longhorn") assert 200 == code, (code, data) # Wait for export completion endtime = datetime.now() + timedelta(seconds=wait_timeout) exported_image_id = "" while endtime > datetime.now(): code, data = api_client.images.get() assert 200 == code, (code, data) for image in data["items"]: if image["spec"]["displayName"] == exported_image: if image.get("status", {}).get("progress") == 100: exported_image_id = image["metadata"]["name"] break if exported_image_id: break sleep(5) assert exported_image_id, f"Failed to find exported image {exported_image}" # Step 5: Verify exported image properties code, data = api_client.images.get(exported_image_id) assert 200 == code, (code, data) assert data["spec"]["displayName"] == exported_image # Step 6: Cleanup in proper order delete_volume(api_client, test_volume, wait_timeout) delete_image(api_client, original_image, wait_timeout) delete_image(api_client, exported_image_id, wait_timeout)Test complete image lifecycle from creation to deletion Steps: 1. Create image from URL with metadata 2. Update image properties multiple times 3. Create volume from image 4. Export volume back to new image 5. Download and verify image content 6. Perform cleanup and verify deletion
def test_image_concurrent_operations(self, api_client, fake_image_file, gen_unique_name, wait_timeout)-
Expand source code
@pytest.mark.p1 @pytest.mark.images def test_image_concurrent_operations(self, api_client, fake_image_file, gen_unique_name, wait_timeout): """ Test comprehensive concurrent image operations Steps: 1. Test concurrent image uploads (multiple different images) 2. Test concurrent operations on same image (get, update, status checks) 3. Verify system handles all concurrency scenarios gracefully 4. Ensure data integrity and proper resource management """ # Part 1: Concurrent uploads of different images concurrent_upload_count = 3 upload_image_names = [f"upload-concurrent-{i}-{gen_unique_name()}" for i in range(concurrent_upload_count)] successful_uploads = [] upload_errors = [] def upload_single_image(image_name): resp = api_client.images.create_by_file(image_name, fake_image_file) if resp.ok: return {"name": image_name, "success": True} else: return {"name": image_name, "success": False, "error": f"{resp.status_code}: {resp.text}"} # Execute concurrent uploads with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_upload_count) as executor: # NOQA upload_futures = [executor.submit(upload_single_image, name) for name in upload_image_names] for future in concurrent.futures.as_completed(upload_futures): result = future.result() if result["success"]: successful_uploads.append(result["name"]) else: upload_errors.append(result) assert len(upload_errors) == 0, f"Concurrent upload errors: {upload_errors}" assert len(successful_uploads) == concurrent_upload_count, \ f"Expected {concurrent_upload_count} uploads, got {len(successful_uploads)}" # Verify all uploaded images are accessible for image_name in successful_uploads: code, data = api_client.images.get(image_name) assert code == 200, f"Image {image_name} not accessible: {code}" # Part 2: Concurrent operations on the same image # Use the first uploaded image for operations testing target_image_name = successful_uploads[0] concurrent_results = [] def concurrent_operation(operation_type, operation_id): if operation_type == "get": code, data = api_client.images.get(target_image_name) return {"id": operation_id, "op": "get", "success": code == 200, "code": code} elif operation_type == "update": update_data = {"labels": {f"concurrent-op-{operation_id}": f"timestamp-{time.time()}"}} # NOQA code, data = api_client.images.update(target_image_name, dict(metadata=update_data)) return {"id": operation_id, "op": "update", "success": code in [200, 409], "code": code} elif operation_type == "status_check": code, data = api_client.images.get(target_image_name) if code == 200: progress = data.get("status", {}).get("progress", 0) return {"id": operation_id, "op": "status_check", "success": True, "code": code, "progress": progress} return {"id": operation_id, "op": "status_check", "success": False, "code": code} # Launch concurrent operations on the same image threads = [] operations = [ ("get", 1), ("update", 2), ("status_check", 3), ("get", 4), ("update", 5), ("get", 6), ("status_check", 7) ] for op_type, op_id in operations: thread = threading.Thread( target=lambda ot=op_type, oid=op_id: concurrent_results.append(concurrent_operation(ot, oid)) # NOQA ) threads.append(thread) thread.start() # Wait for all operations to complete for thread in threads: thread.join(timeout=15) assert not thread.is_alive(), "Thread didn't complete in time" # Analyze concurrent operation results successful_ops = [r for r in concurrent_results if r and r.get("success")] assert len(successful_ops) > 0, "No concurrent operations succeeded on same image" # Target image should still be responsive after concurrent operations final_code, final_data = api_client.images.get(target_image_name) assert final_code == 200, f"Target image not accessible after " \ f"concurrent operations: {final_code}" # Part 3: Concurrent cleanup of all images cleanup_successful = 0 def cleanup_single_image(image_name): delete_image(api_client, image_name, wait_timeout) return image_name with concurrent.futures.ThreadPoolExecutor(max_workers=len(successful_uploads)) as executor: # NOQA cleanup_futures = [executor.submit(cleanup_single_image, name) for name in successful_uploads] for _ in concurrent.futures.as_completed(cleanup_futures): cleanup_successful += 1 assert cleanup_successful == len(successful_uploads), \ f"Cleanup failed: {cleanup_successful}/{len(successful_uploads)}"Test comprehensive concurrent image operations Steps: 1. Test concurrent image uploads (multiple different images) 2. Test concurrent operations on same image (get, update, status checks) 3. Verify system handles all concurrency scenarios gracefully 4. Ensure data integrity and proper resource management
def test_image_metadata_limits(self, api_client, unique_name, wait_timeout)-
Expand source code
@pytest.mark.p1 @pytest.mark.images @pytest.mark.negative def test_image_metadata_limits(self, api_client, unique_name, wait_timeout): """ Test image metadata handling with excessive data Steps: 1. Create a valid image 2. Test behavior with excessive metadata (many labels/annotations) 3. Verify proper handling of metadata limits """ aligned_size = 1024 * 1024 # 1MB exactly with NamedTemporaryFile("wb", suffix=".raw") as f: f.write(b"\x00" * aligned_size) f.flush() # Create base image for metadata testing image_name = f"metadata-test-{unique_name}" resp = api_client.images.create_by_file(image_name, Path(f.name)) assert resp.ok, f"Failed to create base image: {resp.status_code}, {resp.text}" # Test excessive metadata excessive_metadata = { "labels": {f"test-label-key-{i:03d}": f"test-label-value" f"-{i:03d}" for i in range(20)}, "annotations": { f"test.annotation.key/{i:03d}": "x" * 200 for i in range(10) } } code, data = api_client.images.update(image_name, dict(metadata=excessive_metadata)) if code == 200: # Verify metadata was actually stored code, updated_data = api_client.images.get(image_name) assert code == 200, f"Failed to retrieve updated image: {code}" elif code in [400, 422, 413]: # 413 = Request Entity Too Large print(f"✓ Excessive metadata properly rejected with status: {code}") else: assert False, f"Unexpected metadata response: {code}, {data}" # Cleanup delete_image(api_client, image_name, wait_timeout)Test image metadata handling with excessive data Steps: 1. Create a valid image 2. Test behavior with excessive metadata (many labels/annotations) 3. Verify proper handling of metadata limits
def test_image_metadata_operations(self, api_client, fake_image_file, unique_name, wait_timeout)-
Expand source code
@pytest.mark.p1 @pytest.mark.images def test_image_metadata_operations(self, api_client, fake_image_file, unique_name, wait_timeout): """ Test comprehensive image metadata operations Steps: 1. Create image and verify initial metadata 2. Update labels, annotations, and description 3. Test metadata persistence across operations 4. Verify metadata validation and limits """ image_name = f"metadata-test-{unique_name}" # Create image resp = api_client.images.create_by_file(image_name, fake_image_file) assert resp.ok, f"Failed to create image: {resp.status_code}, {resp.text}" # Wait for initial processing endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(image_name) if code == 200 and data.get("status", {}).get("progress") == 100: break sleep(2) # Test comprehensive metadata updates metadata_updates = { "labels": { "environment": "test", "team": "qa", "version": "1.0.0", "critical": "false" }, "annotations": { "field.cattle.io/description": "Test image for metadata operations", "harvesterhci.io/imageId": image_name, "custom.annotation/usage": "automation-testing", "created.by": "harvester-e2e-tests" } } code, data = api_client.images.update(image_name, dict(metadata=metadata_updates)) assert 200 == code, f"Failed to update metadata: {code}, {data}" # Verify metadata was applied correctly code, data = api_client.images.get(image_name) assert 200 == code, (code, data) metadata = data["metadata"] for field, expected_pairs in metadata_updates.items(): for key, expected_value in expected_pairs.items(): actual_value = metadata.get(field, {}).get(key) assert actual_value == expected_value, f"Metadata {field}.{key}: expected"\ f" {expected_value}, got {actual_value}" # Test incremental metadata updates incremental_updates = { "labels": {"priority": "high"}, "annotations": {"last.modified": "2025-09-30"} } code, data = api_client.images.update(image_name, dict(metadata=incremental_updates)) assert 200 == code, f"Failed incremental update: {code}, {data}" # Verify both old and new metadata exist code, data = api_client.images.get(image_name) assert 200 == code, (code, data) metadata = data["metadata"] assert metadata.get("labels", {}).get("environment") == "test" # Old label assert metadata.get("labels", {}).get("priority") == "high" # New label delete_image(api_client, image_name, wait_timeout)Test comprehensive image metadata operations Steps: 1. Create image and verify initial metadata 2. Update labels, annotations, and description 3. Test metadata persistence across operations 4. Verify metadata validation and limits
def test_image_name_length_limits(self, api_client, unique_name, wait_timeout)-
Expand source code
@pytest.mark.p1 @pytest.mark.images @pytest.mark.negative def test_image_name_length_limits(self, api_client, unique_name, wait_timeout): """ Test image creation with various name lengths Steps: 1. Test failure with image name > 63 characters (should fail) 2. Test success with image name ≤ 63 characters (should pass) """ aligned_size = 1024 * 1024 # 1MB exactly with NamedTemporaryFile("wb", suffix=".raw") as f: f.write(b"\x00" * aligned_size) f.flush() # Test 1: Invalid long name (should FAIL) long_base = "very-long-image-name-for-testing-kubernetes-name-length-limits-that-exceeds-maximum" # NOQA invalid_long_name = f"{long_base}-{unique_name}" resp = api_client.images.create_by_file(invalid_long_name, Path(f.name)) assert not resp.ok, f"Expected failure with long name but" \ f" got success: {resp.status_code}" assert resp.status_code in [400, 422], f"Expected 400/422 for " \ f"invalid name, got: {resp.status_code}" # Test 2: Valid name (should PASS) valid_base = "valid-image-name" valid_name = f"{valid_base}-{unique_name}" if len(valid_name) > 63: valid_name = f"{valid_base[:30]}-{unique_name}" valid_name = valid_name.rstrip('-') resp = api_client.images.create_by_file(valid_name, Path(f.name)) assert resp.ok, f"Failed with valid name: {resp.status_code}, {resp.text}" # Cleanup delete_image(api_client, valid_name, wait_timeout)Test image creation with various name lengths Steps: 1. Test failure with image name > 63 characters (should fail) 2. Test success with image name ≤ 63 characters (should pass)
def test_image_processing_performance(self, api_client, unique_name, image_size, wait_timeout)-
Expand source code
@pytest.mark.p2 @pytest.mark.images @pytest.mark.performance @pytest.mark.parametrize("image_size", ["10Mi", "50Mi", "200Mi"]) def test_image_processing_performance(self, api_client, unique_name, image_size, wait_timeout): """ Test Harvester image processing performance (excluding network upload time) Steps: 1. Upload image files of different sizes 2. Measure processing time from upload completion to ready state 3. Verify Harvester processing meets performance expectations """ # Processing thresholds (seconds) - time from upload complete to ready processing_thresholds = {"10Mi": 10, "50Mi": 20, "200Mi": 40} size_bytes = int(image_size[:-2]) * 1024 * 1024 if size_bytes % 512 != 0: size_bytes = ((size_bytes // 512) + 1) * 512 with NamedTemporaryFile("wb", suffix=".raw") as f: f.seek(size_bytes - 1) f.write(b"\x00") f.seek(0) image_name = f"proc-perf-{unique_name}" # Step 1: Upload (don't measure this time - it's network dependent) resp = api_client.images.create_by_file(image_name, Path(f.name)) assert resp.ok, f"Failed to upload {image_size} image: {resp.status_code}, {resp.text}" # Step 2: Wait for upload to be acknowledged by Harvester initial_timeout = datetime.now() + timedelta(seconds=30) while initial_timeout > datetime.now(): code, data = api_client.images.get(image_name) if code == 200: status = data.get("status", {}) if "progress" in status: # Upload acknowledged, start measuring processing time processing_start_time = time() break sleep(1) else: raise AssertionError(f"Image {image_name} not acknowledged by Harvester") # Step 3: Measure time from processing start to completion endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(image_name) if code == 200 and data.get("status", {}).get("progress") == 100: processing_end_time = time() break sleep(2) else: raise AssertionError(f"Image processing did not complete within {wait_timeout}s") # Step 4: Validate Harvester processing performance processing_time = processing_end_time - processing_start_time threshold = processing_thresholds[image_size] assert processing_time < threshold, f"Harvester processing took" \ f" {processing_time:.2f}s, expected < {threshold}s" print(f"Harvester Processing Performance:" f" {image_size} processed in {processing_time:.2f}s") delete_image(api_client, image_name, wait_timeout)Test Harvester image processing performance (excluding network upload time) Steps: 1. Upload image files of different sizes 2. Measure processing time from upload completion to ready state 3. Verify Harvester processing meets performance expectations
class TestImageWithStorageNetwork-
Expand source code
@pytest.mark.p0 @pytest.mark.smoke @pytest.mark.images @pytest.mark.settings @pytest.mark.networks @pytest.mark.skip_version_if("< v1.0.3") @pytest.mark.usefixtures("storage_network") class TestImageWithStorageNetwork: @pytest.mark.dependency(name="create_image_by_file") def test_create_image_by_file(self, api_client, fake_image_file, unique_name): resp = api_client.images.create_by_file(unique_name, fake_image_file) assert resp.ok, f"Fail to upload fake image with error: {resp.status_code}, {resp.text}" code, data = api_client.images.get(unique_name) assert 200 == code, (code, data) assert unique_name == data["metadata"]["name"], (code, data) @pytest.mark.dependency(depends=["create_image_by_file"]) def test_download_image(self, api_client, fake_image_file, tmp_path, unique_name): resp = api_client.images.download(unique_name) assert resp.ok, f"Fail to download fake image with error: {resp.status_code}, {resp.text}" filename = re.search(r'filename=(\S+)', resp.headers.get("Content-Disposition")) assert filename, f"No filename info in the response header: {resp.headers}" filename = filename.groups()[0] tmp_image_file = tmp_path / filename tmp_image_file.write_bytes( zlib.decompress(resp.content, 32+15) if ".gz" in filename else resp.content ) assert filecmp.cmp(fake_image_file, tmp_image_file), ( "Contents of downloaded image is NOT identical to the fake image" ) @pytest.mark.dependency(depends=["create_image_by_file"]) def test_delete_image(self, api_client, unique_name, wait_timeout, sleep_timeout): code, data = api_client.images.delete(unique_name) assert 200 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(unique_name) if code == 404: break sleep(sleep_timeout) else: raise AssertionError( f"Fail to delete image {unique_name} with error: {code}, {data}" )Class variables
var pytestmark
Methods
def test_create_image_by_file(self, api_client, fake_image_file, unique_name)-
Expand source code
@pytest.mark.dependency(name="create_image_by_file") def test_create_image_by_file(self, api_client, fake_image_file, unique_name): resp = api_client.images.create_by_file(unique_name, fake_image_file) assert resp.ok, f"Fail to upload fake image with error: {resp.status_code}, {resp.text}" code, data = api_client.images.get(unique_name) assert 200 == code, (code, data) assert unique_name == data["metadata"]["name"], (code, data) def test_delete_image(self, api_client, unique_name, wait_timeout, sleep_timeout)-
Expand source code
@pytest.mark.dependency(depends=["create_image_by_file"]) def test_delete_image(self, api_client, unique_name, wait_timeout, sleep_timeout): code, data = api_client.images.delete(unique_name) assert 200 == code, (code, data) endtime = datetime.now() + timedelta(seconds=wait_timeout) while endtime > datetime.now(): code, data = api_client.images.get(unique_name) if code == 404: break sleep(sleep_timeout) else: raise AssertionError( f"Fail to delete image {unique_name} with error: {code}, {data}" ) def test_download_image(self, api_client, fake_image_file, tmp_path, unique_name)-
Expand source code
@pytest.mark.dependency(depends=["create_image_by_file"]) def test_download_image(self, api_client, fake_image_file, tmp_path, unique_name): resp = api_client.images.download(unique_name) assert resp.ok, f"Fail to download fake image with error: {resp.status_code}, {resp.text}" filename = re.search(r'filename=(\S+)', resp.headers.get("Content-Disposition")) assert filename, f"No filename info in the response header: {resp.headers}" filename = filename.groups()[0] tmp_image_file = tmp_path / filename tmp_image_file.write_bytes( zlib.decompress(resp.content, 32+15) if ".gz" in filename else resp.content ) assert filecmp.cmp(fake_image_file, tmp_image_file), ( "Contents of downloaded image is NOT identical to the fake image" )