Source code for dgenerate.imageprocessors.imageprocessor

# Copyright (c) 2023, Teriks
#
# dgenerate is distributed under the following BSD 3-Clause License
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import abc
import gc
import itertools
import os
import typing

import PIL.Image
import torch

import dgenerate.devicecache as _devicecache
import dgenerate.exceptions as _d_exceptions
import dgenerate.filelock as _filelock
import dgenerate.image as _image
import dgenerate.imageprocessors.constants as _constants
import dgenerate.imageprocessors.exceptions as _exceptions
import dgenerate.memoize as _memoize
import dgenerate.memory as _memory
import dgenerate.messages as _messages
import dgenerate.plugin as _plugin
import dgenerate.types
import dgenerate.torchutil as _torchutil

_image_processor_cache = _memoize.create_object_cache(
    'image_processor',
    cache_type=_memory.SizedConstrainedObjectCache
)


def _cache_debug_hit(key, hit):
    _memoize.simple_cache_hit_debug("Image Processor Model", key, hit)


def _cache_debug_miss(key, new):
    _memoize.simple_cache_miss_debug("Image Processor Model", key, new)


_in_filetypes = None
_out_filetypes = None

[docs] class ImageProcessor(_plugin.Plugin, abc.ABC): """ Abstract base class for image processor implementations. """
[docs] @staticmethod def image_out_filetypes(): """ Utility for derived classes to get a list of supported image output file types for use with ``FILE_ARGS``. :return: List of supported image output file types, for example ``['*.png', '*.jpg']``. """ import dgenerate.mediaoutput as _mediaoutput global _out_filetypes if _out_filetypes is None: _out_filetypes = ['*.' + i for i in _mediaoutput.get_supported_static_image_formats()] return list(_out_filetypes) else: return list(_out_filetypes)
[docs] @staticmethod def image_in_filetypes(): """ Utility for derived classes to get a list of supported image input file types for use with ``FILE_ARGS``. :return: List of supported image input file types, for example ``['*.png', '*.jpg']``. """ import dgenerate.mediainput as _mediainput global _in_filetypes if _in_filetypes is None: _in_filetypes = ['*.' + i for i in _mediainput.get_supported_image_formats()] return list(_in_filetypes) else: return list(_in_filetypes)
# you cannot specify these via a URI HIDE_ARGS = ['local-files-only'] FILE_ARGS = {'output-file': {'mode': 'out', 'filetypes': [('Images', image_out_filetypes())]}}
[docs] @classmethod def inheritable_help(cls, loaded_by_name): help_messages = { 'device': ( 'The "device" argument can be used to set the device ' 'the processor will run on, for example: cpu, cuda, cuda:1. ' 'If you are using this image processor as a preprocess or ' 'postprocess step for dgenerate, or with the image-process ' 'subcommand, or \\image_process directive, this argument will ' 'default to the value of --device.' ), 'output-file': ( 'The "output-file" argument can be used to set the output ' 'path for a processor debug image, this will save the ' 'processed image to a path of your choosing.' ), 'output-overwrite': ( 'The "output-overwrite" argument can be used to enable ' 'overwrite for a processor debug image. If this is not enabled, ' 'new images written by the processor while it is being used ' 'will be written with a numbered suffix instead of being overwritten.' ), 'model-offload': ( 'The "model-offload" argument can be used to enable ' 'cpu model offloading for a processor. If this is disabled, ' 'any torch tensors or modules placed on the GPU will remain there until ' 'the processor is done being used, instead of them being moved back to the CPU ' 'after each image. Enabling this may help save VRAM when using an image processor ' 'as a preprocessor or postprocessor for diffusion with dgenerate but will impact ' 'rendering speed when generating many images.' ) } return help_messages
[docs] def __init__(self, loaded_by_name: str, device: str | None = None, output_file: dgenerate.types.OptionalPath = None, output_overwrite: bool = False, model_offload: bool = False, local_files_only: bool = False, **kwargs): """ :param loaded_by_name: The name the processor was loaded by :param device: the device the processor will run on, for example: cpu, cuda, cuda:1. Specifying ``None`` causes the device to default to cpu. :param output_file: output a debug image to this path :param output_overwrite: can the debug image output path be overwritten? :param model_offload: if ``True``, any torch modules that the processor has registered are offloaded to the CPU immediately after processing an image :param local_files_only: if ``True``, the plugin should never try to download models from the internet automatically, and instead only look for them in cache / on disk. :param kwargs: child class forwarded arguments """ super().__init__(loaded_by_name=loaded_by_name, argument_error_type=_exceptions.ImageProcessorArgumentError, **kwargs) if device is not None: if not _torchutil.is_valid_device_string(device): raise _exceptions.ImageProcessorArgumentError( f'Invalid device argument, {_torchutil.invalid_device_message(device, cap=False)}') self.__output_file = output_file self.__output_overwrite = output_overwrite self.__device = device if device else 'cpu' self.__modules = [] self.__modules_device = torch.device('cpu') self.__model_offload = model_offload self.__size_estimate = 0 self.__local_files_only = local_files_only
# noinspection PyMethodMayBeStatic @property def image_modes(self) -> list[str]: """ Returns a list of PIL image modes that this processor can handle. This may be overridden by implementers :return: ``['RGB']`` """ return ['RGB']
[docs] def set_size_estimate(self, size_bytes: int): """ Set the estimated size of this plugin in bytes for memory management heuristics, this is intended to be used by implementors of the :py:class:`ImageProcessor` plugin class. For the best memory optimization, this value should be set very shortly before any associated model even enters CPU side ram, IE: before it is loaded at all. :raise ValueError: if ``size_bytes`` is less than zero. :param size_bytes: the size in bytes """ if size_bytes < 0: raise ValueError( 'image processor size estimate cannot be less than zero.') self.__size_estimate = int(size_bytes)
[docs] def memory_guard_device(self, device: str | torch.device, memory_required: int): """ Check a specific device against an amount of memory in bytes. If the device is a gpu device and any of the memory constraints specified by :py:attr:`dgenerate.imageprocessors.constants.IMAGE_PROCESSOR_GPU_MEMORY_CONSTRAINTS` are met on that device, attempt to remove cached objects off a gpu device to free space. If the device is a cpu and any of the memory constraints specified by :py:attr:`dgenerate.imageprocessors.constants.IMAGE_PROCESSOR_CACHE_GC_CONSTRAINTS` are met, attempt to remove cached image processor objects off the device to free space. Then, enforce :py:attr:`dgenerate.imageprocessors.constants.IMAGE_PROCESSOR_CACHE_MEMORY_CONSTRAINTS`. :param device: the device :param memory_required: the amount of memory required on the device in bytes :return: ``True`` if an attempt was made to free memory, ``False`` otherwise. """ device = torch.device(device) cleared = False if _memory.is_supported_gpu_device(device): if _memory.gpu_memory_constraints( _constants.IMAGE_PROCESSOR_GPU_MEMORY_CONSTRAINTS, extra_vars={'memory_required': memory_required}, device=device): _messages.debug_log( f'Image Processor "{self.__class__.__name__}" is clearing the GPU side object ' f'cache due to GPU side memory constraint evaluating to to True.') _devicecache.clear_device_cache(device) cleared = True elif device.type == 'cpu': if (_memory.memory_constraints( _constants.IMAGE_PROCESSOR_CACHE_GC_CONSTRAINTS, extra_vars={'memory_required': memory_required})): _messages.debug_log( f'Image Processor "{self.__class__.__name__}" is clearing the CPU side object ' f'cache due to CPU side memory constraint evaluating to to True.') _memoize.clear_object_caches() cleared = True cleared = cleared or _image_processor_cache.enforce_cpu_mem_constraints( _constants.IMAGE_PROCESSOR_CACHE_MEMORY_CONSTRAINTS, size_var='memory_required', new_object_size=memory_required ) return cleared
[docs] def load_object_cached(self, tag: str, estimated_size: int, method: typing.Callable, memory_guard_device: str | torch.device | None = 'cpu' ): """ Load a potentially large object into the CPU side ``image_processor`` object cache. :param tag: A unique string within the context of the image processor implementation constructor. :param estimated_size: Estimated size in bytes of the object in RAM. :param method: A method which loads and returns the object. :param memory_guard_device: call :py:meth:`ImageProcessor.memory_guard_device` on the specified device before the object is loaded (on cache miss) :return: The loaded object """ @_memoize.memoize( _image_processor_cache, on_hit=_cache_debug_hit, on_create=_cache_debug_miss) def load_cached(loaded_by_name=self.loaded_by_name, tag=tag): if memory_guard_device is not None: self.memory_guard_device(memory_guard_device, estimated_size) return method(), _memoize.CachedObjectMetadata(size=estimated_size) return load_cached()
@property def size_estimate(self) -> int: """ Estimated size of the models / objects used by this image processor. :return: size in bytes """ return self.__size_estimate @property def device(self) -> str: """ The rendering device requested for this processor. Torch modules associated with the processor will not be on this device until the processor is used. :return: device string, for example "cuda", "cuda:N", or "cpu" """ return self.__device @property def model_offload(self) -> bool: """ Model offload status. :return: ``True`` or ``False`` """ return self.__model_offload @property def local_files_only(self) -> bool: """ Is this image processor only going to look for resources such as models in cache / on disk? """ return self.__local_files_only @property def modules_device(self) -> torch.device: """ The rendering device that this processors modules currently exist on. This will change with calls to :py:meth:`.ImageProcessor.to` and possibly when the processor is used. :return: :py:class:`torch.device`, using ``str()`` on this object will yield a device string such as "cuda", "cuda:N", or "cpu" """ return self.__modules_device def __gen_filename(self): return _filelock.touch_avoid_duplicate(os.path.dirname(self.__output_file), _filelock.suffix_path_maker(self.__output_file, '_')) def __save_debug_image(self, image, debug_header): if self.__output_file is not None: if not self.__output_overwrite: filename = self.__gen_filename() else: filename = self.__output_file image.save(filename) _messages.debug_log(f'{debug_header}: "{filename}"') def __to_cpu_ignore_error(self): try: self.to('cpu') except: pass @staticmethod def __flush_mem_ignore_error(): try: _memory.torch_gc() gc.collect() except: pass def __with_memory_safety(self, func, args: dict, oom_attempt=0): raise_exc = None try: try_again = False try: return func(**args) except _d_exceptions.TORCH_CUDA_OOM_EXCEPTIONS as e: _d_exceptions.raise_if_not_cuda_oom(e) if oom_attempt == 0: self.__flush_diffusion_pipeline_after_oom() try_again = True else: _messages.debug_log( f'ImageProcessor "{self.__class__.__name__}" failed attempt at ' f'OOM recovery in {dgenerate.types.fullname(func)}()') self.__to_cpu_ignore_error() self.__flush_mem_ignore_error() raise_exc = _d_exceptions.OutOfMemoryError(e) if try_again: return self.__with_memory_safety(func, args, oom_attempt=1) except MemoryError as e: gc.collect() raise _d_exceptions.OutOfMemoryError('cpu (system memory)') from e except Exception as e: if not isinstance(e, _d_exceptions.OutOfMemoryError): self.__to_cpu_ignore_error() self.__flush_mem_ignore_error() raise if raise_exc is not None: raise raise_exc
[docs] def pre_resize(self, image: PIL.Image.Image, resize_resolution: dgenerate.types.OptionalSize = None) -> PIL.Image.Image: """ Invoke a processors :py:meth:`.ImageProcessor.impl_pre_resize` method. Implements important behaviors depending on if the image was modified. This is the only appropriate way to invoke a processor manually. The original image will be closed if the implementation returns a new image instead of modifying it in place, you should not count on the original image being open and usable once this function completes though it is safe to use the input image in a ``with`` context, if you need to retain a copy, pass a copy. :raise dgenerate.OutOfMemoryError: if the execution device runs out of memory :raise dgenerate.ImageProcessorImageModeError: if a passed image has an invalid format :param self: :py:class:`.ImageProcessor` implementation instance :param image: the image to pass :param resize_resolution: the size that the image is going to be resized to after this step, or None if it is not being resized. :return: processed image, may be the same image or a copy. """ if image.mode not in self.image_modes: raise _exceptions.ImageProcessorImageModeError( f'Invalid image mode: {image.mode}') self.to(self.device) img_copy = image.copy() processed = self.__with_memory_safety( self.impl_pre_resize, {'image': image, 'resize_resolution': resize_resolution}) if processed is not image: image.close() self.__save_debug_image( processed, 'Wrote Processor Debug Image (because copied)') processed.filename = _image.get_filename(image) return processed # Not copied but may be modified identical = all(a == b for a, b in itertools.zip_longest(processed.getdata(), img_copy.getdata(), fillvalue=None)) if not identical: # Write the debug output if it was modified in place self.__save_debug_image( processed, 'Wrote Processor Debug Image (because modified)') return processed
[docs] def post_resize(self, image: PIL.Image.Image) -> PIL.Image.Image: """ Invoke a processors :py:meth:`.ImageProcessor.impl_post_resize` method. Implements important behaviors depending on if the image was modified. This is the only appropriate way to invoke a processor manually. The original image will be closed if the implementation returns a new image instead of modifying it in place, you should not count on the original image being open and usable once this function completes though it is safe to use the input image in a ``with`` context, if you need to retain a copy, pass a copy. :raise dgenerate.OutOfMemoryError: if the execution device runs out of memory :raise dgenerate.ImageProcessorImageModeError: if a passed image has an invalid format :param self: :py:class:`.ImageProcessor` implementation instance :param image: the image to pass :return: processed image, may be the same image or a copy. """ if image.mode not in self.image_modes: raise _exceptions.ImageProcessorImageModeError( f'Invalid image mode: {image.mode}') img_copy = image.copy() processed = self.__with_memory_safety( self.impl_post_resize, {'image': image}) if self.__model_offload: self.to('cpu') self.__flush_mem_ignore_error() if processed is not image: image.close() self.__save_debug_image( processed, 'Wrote Processor Debug Image (because copied)') processed.filename = _image.get_filename(image) return processed # Not copied but may be modified identical = all(a == b for a, b in itertools.zip_longest(processed.getdata(), img_copy.getdata(), fillvalue=None)) if not identical: # Write the debug output if it was modified in place self.__save_debug_image( processed, 'Wrote Processor Debug Image (because modified)') return processed
def _process_pre_resize(self, image: PIL.Image.Image, resize_resolution: dgenerate.types.OptionalSize): filename = _image.get_filename(image) _messages.debug_log('Starting Image Process - ' f'{self}.pre_resize(' f'image="{filename}", resize_resolution={resize_resolution})') processed = self.pre_resize(image, resize_resolution) _messages.debug_log(f'Finished Image Process - {self}.pre_resize') return processed def _process_post_resize(self, image: PIL.Image.Image): filename = _image.get_filename(image) _messages.debug_log('Starting Image Process - ' f'{self}.post_resize(' f'image="{filename}")') processed = self.post_resize(image) _messages.debug_log(f'Finished Image Process - {self}.post_resize') return processed
[docs] def get_alignment(self) -> int | None: """ Overridable method. Get required input image alignment, which will be forcefully applied. If this function returns ``None``, specific alignment is not required and will never be forced. :return: integer or ``None`` """ return None
[docs] def process(self, image: PIL.Image.Image, resize_resolution: dgenerate.types.OptionalSize = None, aspect_correct: bool = True, align: int | None = None): """ Perform image processing on an image, including the requested resizing step. Invokes the image processor pre and post resizing with appropriate arguments and correct resource management. The original image will be closed if the implementation returns a new image instead of modifying it in place, you should not count on the original image being open and usable once this function completes though it is safe to use the input image in a ``with`` context, if you need to retain a copy, pass a copy. :raise dgenerate.OutOfMemoryError: if the execution device runs out of memory :raise dgenerate.ImageProcessorImageModeError: if a passed image has an invalid format :param image: image to process :param resize_resolution: image will be resized to this dimension by this method. :param aspect_correct: Should the resize operation be aspect correct? :param align: Align by this amount of pixels, if the input image is not aligned to this amount of pixels, it will be aligned by resizing. Passing ``None`` or ``1`` disables alignment. :return: the processed image """ forced_alignment = self.get_alignment() if forced_alignment is not None: if (not _image.is_aligned(image.size, align=forced_alignment)) and align != forced_alignment: align = forced_alignment _messages.warning( f'"{self.loaded_by_name}" image processor requires an image alignment of {align}, ' f'this alignment has been forced to prevent an error.' ) # This is the actual size it will end # up being resized to by resize_image calculate_new_size = _image.resize_image_calc(old_size=image.size, new_size=resize_resolution, aspect_correct=aspect_correct, align=align) pre_processed = self._process_pre_resize(image, calculate_new_size) if resize_resolution is None: image = pre_processed else: image = _image.resize_image(img=pre_processed, size=resize_resolution, aspect_correct=aspect_correct, align=align) if image is not pre_processed: pre_processed.close() return self._process_post_resize(image)
[docs] @abc.abstractmethod def impl_pre_resize(self, image: PIL.Image.Image, resize_resolution: dgenerate.types.OptionalSize) -> PIL.Image.Image: """ Inheritor must implement. This method should not be invoked directly, use the class method :py:meth:`.ImageProcessor.call_pre_resize` to invoke it. :param image: image to process :param resize_resolution: image will be resized to this resolution after this process is complete. If None is passed no resize is going to occur. It is not the duty of the inheritor to resize the image, in fact it should NEVER be resized. :return: the processed image """ return image
[docs] @abc.abstractmethod def impl_post_resize(self, image: PIL.Image.Image) -> PIL.Image.Image: """ Inheritor must implement. This method should not be invoked directly, use the class method :py:meth:`.ImageProcessor.call_post_resize` to invoke it. :param image: image to process :return: the processed image """ return image
def __repr__(self): return str(self)
[docs] def register_module(self, module): """ Register :py:class:`torch.nn.Module` objects. These will be brought on to the cpu during finalization. All of these modules can be cast to a specific device with :py:attr:`.ImageProcessor.to` :param module: the module """ self.__modules.append(module)
def __flush_diffusion_pipeline_after_oom(self): _messages.debug_log( f'Image processor "{self.__class__.__name__}" is clearing the GPU side object ' f'cache for device {self.device} due to VRAM out of memory condition.') _devicecache.clear_device_cache(self.device) def __to(self, device: torch.device | str, attempt=0): device = torch.device(device) if device.type != 'cpu': _image_processor_cache.size -= self.__size_estimate else: _image_processor_cache.size += self.__size_estimate self.__modules_device = device try_again = False for m in self.__modules: if not hasattr(m, '_DGENERATE_IMAGE_PROCESSOR_DEVICE') or \ not _torchutil.devices_equal(m._DGENERATE_IMAGE_PROCESSOR_DEVICE, device): self.memory_guard_device(device, self.size_estimate) m._DGENERATE_IMAGE_PROCESSOR_DEVICE = device _messages.debug_log( f'Moving ImageProcessor registered module: {dgenerate.types.fullname(m)}.to("{device}")') try: m.to(device) except _d_exceptions.TORCH_CUDA_OOM_EXCEPTIONS as e: _d_exceptions.raise_if_not_cuda_oom(e) if attempt == 0: # hail marry self.__flush_diffusion_pipeline_after_oom() try_again = True break else: _messages.debug_log( f'ImageProcessor "{self.__class__.__name__}" failed attempt ' f'at OOM recovery in to({device})') m._DGENERATE_IMAGE_PROCESSOR_DEVICE = torch.device('cpu') self.__to_cpu_ignore_error() self.__flush_mem_ignore_error() raise _d_exceptions.OutOfMemoryError(e) from e except MemoryError as e: # out of cpu side memory self.__flush_mem_ignore_error() raise _d_exceptions.OutOfMemoryError('cpu (system memory)') from e if try_again: self.__to(device, attempt=1) return self
[docs] def to(self, device: torch.device | str) -> "ImageProcessor": """ Move all :py:class:`torch.nn.Module` modules registered to this image processor to a specific device. :raise dgenerate.OutOfMemoryError: if there is not enough memory on the specified device :param device: The device string, or torch device object :return: the image processor itself """ return self.__to(device)
__all__ = dgenerate.types.module_all()