Source code for flip.nvflare.components.cleanup

# Copyright (c) 2026 Guy's and St Thomas' NHS Foundation Trust & King's College London
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import shutil
from pathlib import Path

from nvflare.apis.executor import Executor
from nvflare.apis.fl_constant import ReturnCode
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable, make_reply
from nvflare.apis.signal import Signal
from nvflare.security.logging import secure_format_traceback

from flip.constants import FlipConstants, FlipTasks


[docs] class CleanupImages(Executor): def __init__(self): """CleanupImages takes place at the start and end of the run. All the images used for the training are deleted to prevent the build-up of unnecessary files on the storage space. Executing at the start of a run ensures that any training code is executed with a clean slate. Args: Raises: """ super().__init__()
[docs] def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal): try: if task_name in (FlipTasks.POST_VALIDATION, FlipTasks.POST_TASK): cwd = os.getcwd() job_dir = os.path.join(cwd, fl_ctx.get_job_id()) if os.path.isdir(job_dir): if not FlipConstants.LOCAL_DEV: self.log_info(fl_ctx, f"Deleting job directory {job_dir}") shutil.rmtree(job_dir) else: self.log_info(fl_ctx, f"[DEV] Running in local dev mode, skipping deletion of {job_dir}") if task_name in ( FlipTasks.INIT_TRAINING, FlipTasks.POST_VALIDATION, FlipTasks.INIT_TASK, FlipTasks.POST_TASK, ): if not FlipConstants.LOCAL_DEV: net_directory = os.path.join(FlipConstants.IMAGES_DIR, FlipConstants.NET_ID) size_in_bytes = sum(f.stat().st_size for f in Path(net_directory).glob("**/*") if f.is_file()) size_in_gb = round(size_in_bytes / pow(1024, 3), 2) self.log_info( fl_ctx, f"Attempting to delete the images stored under: {net_directory} - Total {size_in_gb}gb", ) if not os.path.exists(net_directory): self.log_info( fl_ctx, f"Directory {net_directory} does not exist, nothing to clean up.", ) return make_reply(ReturnCode.OK) # Delete all files and directories in the net_directory for filename in os.listdir(net_directory): file_path = os.path.join(net_directory, filename) if os.path.isfile(file_path) or os.path.islink(file_path): self.log_info(fl_ctx, f"Deleting file {file_path}") os.unlink(file_path) elif os.path.isdir(file_path): self.log_info(fl_ctx, f"Deleting directory {file_path}") shutil.rmtree(file_path) self.log_info( fl_ctx, "Cleanup executed successfully, images and job folder have been deleted.", ) return make_reply(ReturnCode.OK) else: self.log_info(fl_ctx, "[DEV] Running in local dev mode, skipping cleanup of images.") return make_reply(ReturnCode.OK) return make_reply(ReturnCode.TASK_UNKNOWN) except Exception: self.log_info(fl_ctx, "An exception has been caught during cleanup") formatted_exception = secure_format_traceback() self.log_error(fl_ctx, formatted_exception) return make_reply(ReturnCode.EXECUTION_EXCEPTION, headers={"exception": formatted_exception})