Source code for dgenerate.image_process.renderloop

# Copyright (c) 2023, Teriks
#
# dgenerate is distributed under the following BSD 3-Clause License
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import collections.abc
import datetime
import os.path
import pathlib
import tempfile
import time
import typing

import PIL.Image

import dgenerate.filelock as _filelock
import dgenerate.image_process.renderloopconfig as _renderloopconfig
import dgenerate.imageprocessors as _imageprocessors
import dgenerate.mediainput as _mediainput
import dgenerate.mediaoutput as _mediaoutput
import dgenerate.messages as _messages
import dgenerate.types as _types
import dgenerate.files as _files
from dgenerate.events import \
    Event, \
    AnimationFinishedEvent, \
    StartingGenerationStepEvent, \
    AnimationETAEvent, \
    StartingAnimationEvent, \
    StartingAnimationFileEvent


[docs] class AnimationFileFinishedEvent(Event): """ Generated in the event stream of :py:meth:`.ImageProcessRenderLoop.events` Occurs when an animation (video or animated image) has finished being written to disk. """ path: str """ Path on disk where the video/animated image was saved. """ starting_event: StartingAnimationFileEvent """ Animation :py:class:`.StartingAnimationFileEvent` related to this file finished event. """
[docs] def __init__(self, origin: 'ImageProcessRenderLoop', path: str, starting_event: StartingAnimationFileEvent): super().__init__(origin) self.starting_event = starting_event self.path = path
[docs] class ImageGeneratedEvent(Event): """ Generated in the event stream of :py:meth:`.ImageProcessRenderLoop.events` Occurs when an image is generated (but not saved yet). """ image: PIL.Image.Image """ The generated image. """ generation_step: int """ The current generation step. (zero indexed) """ suggested_directory: str """ A suggested directory path for saving this image in. A value of ``'.'`` may be present, this indicates the current working directory. """ suggested_filename: str """ A suggested filename for saving this image as. This filename will be unique to the render loop run / configuration. This is just the filename, it will not contain a directory name. """ is_animation_frame: bool """ Is this image a frame in an animation? """ frame_index: _types.OptionalInteger """ The frame index if this is an animation frame. """
[docs] def __init__(self, origin: 'ImageProcessRenderLoop', image: PIL.Image.Image, generation_step: int, suggested_directory: str, suggested_filename: str, is_animation_frame=False, frame_index: _types.OptionalInteger = None): super().__init__(origin) self.image = image self.generation_step = generation_step self.suggested_directory = suggested_directory if suggested_directory.strip() else '.' self.suggested_filename = suggested_filename self.is_animation_frame = is_animation_frame self.frame_index = frame_index
[docs] class ImageFileSavedEvent(Event): """ Generated in the event stream of :py:meth:`.ImageProcessRenderLoop.events` Occurs when an image file is written to disk. """ generated_event: ImageGeneratedEvent """ The :py:class:`.ImageGeneratedEvent` for the image that was saved. """ path: str """ Path to the saved image. """
[docs] def __init__(self, origin: 'ImageProcessRenderLoop', generated_event, path): super().__init__(origin) self.generated_event = generated_event self.path = path
RenderLoopEvent = typing.Union[ImageGeneratedEvent, StartingAnimationEvent, StartingAnimationFileEvent, AnimationFileFinishedEvent, ImageFileSavedEvent, AnimationFinishedEvent, StartingGenerationStepEvent, AnimationETAEvent] """ Possible events from the event stream created by :py:meth:`.ImageProcessRenderLoop.events` """ RenderLoopEventStream = typing.Generator[RenderLoopEvent, None, None] """ Event stream created by :py:meth:`.ImageProcessRenderLoop.events` """
[docs] class ImageProcessRenderLoop: """ Implements the behavior of the ``image-process`` sub-command as well as ``\\image_process`` directive. """ image_processor_loader: _imageprocessors.ImageProcessorLoader """ The loader responsible for loading user specified image processors """ message_header: str = 'image-process' """ Used as the header for messages written via :py:mod:`dgenerate.messages` """ disable_writes: bool = False """ Disable or enable all writes to disk, if you intend to only ever use the event stream of the render loop when using dgenerate as a library, this is a useful option. :py:attr:`RenderLoop.last_images` and :py:attr:`last_animations` will not be available if writes to disk are disabled. """
[docs] def __init__(self, config: _renderloopconfig.ImageProcessRenderLoopConfig = None, image_processor_loader: typing.Optional[_imageprocessors.ImageProcessorLoader] = None): if config is None: self.config = _renderloopconfig.ImageProcessRenderLoopConfig() else: self.config = config if image_processor_loader is None: self.image_processor_loader = _imageprocessors.ImageProcessorLoader() else: self.image_processor_loader = image_processor_loader self._written_images: typing.Optional[_files.GCFile] = None self._written_animations: typing.Optional[_files.GCFile] = None self._iterating = False
@property def written_images(self) -> collections.abc.Iterable[str]: """ Iterable over image filenames written by the last run """ class Iterable: def __init__(self, images): self.images = images def __iter__(self): if self.images is None: return self.images.seek(0) for line in self.images: yield line.rstrip('\n') return Iterable(self._written_images) @property def written_animations(self) -> collections.abc.Iterable[str]: """ Iterable over animation filenames written by the last run """ class Iterable: def __init__(self, animations): self.animations = animations def __iter__(self): if self.animations is None: return self.animations.seek(0) for line in self.animations: yield line.rstrip('\n') return Iterable(self._written_animations) def _record_save_image(self, filename): self._written_images.write(os.path.abspath(filename) + '\n') def _record_save_animation(self, filename): self._written_animations.write(os.path.abspath(filename) + '\n') def _process_reader(self, file, reader: _mediainput.MediaReader, out_filename, generation_step): out_directory = os.path.dirname(out_filename) duplicate_output_suffix = '_duplicate_' if out_directory and not self.disable_writes: pathlib.Path(out_directory).mkdir( parents=True, exist_ok=True) _messages.log(fr'{self.message_header}: Processing "{file}"', underline=True) if reader.total_frames == 1: if not self.config.output_overwrite and not self.disable_writes: out_filename = _filelock.touch_avoid_duplicate( out_directory if out_directory else '.', path_maker=_filelock.suffix_path_maker(out_filename, duplicate_output_suffix)) # Processing happens here, when the frame is read with next(reader) as processed_image: generated_event = ImageGeneratedEvent( origin=self, image=processed_image, suggested_directory=os.path.dirname(out_filename), suggested_filename=os.path.basename(out_filename), generation_step=generation_step) yield generated_event if not self.disable_writes: processed_image.save(out_filename) self._record_save_image(out_filename) yield ImageFileSavedEvent(origin=self, generated_event=generated_event, path=out_filename) _messages.log(fr'{self.message_header}: Wrote Image "{out_filename}"', underline=True) else: out_filename_base, ext = os.path.splitext(out_filename) if not self.config.output_overwrite and not self.disable_writes: out_anim_name = _filelock.touch_avoid_duplicate( out_directory if out_directory else '.', path_maker=_filelock.suffix_path_maker(out_filename, duplicate_output_suffix)) else: out_anim_name = out_filename if not self.config.no_animation_file and not self.disable_writes: anim_writer = _mediaoutput.create_animation_writer( animation_format=ext.lstrip('.'), out_filename=out_anim_name, fps=reader.fps) else: # mock anim_writer = _mediaoutput.AnimationWriter() starting_animation_event = StartingAnimationEvent( origin=self, total_frames=reader.total_frames, fps=reader.fps, frame_duration=reader.frame_duration) yield starting_animation_event starting_animation_file_event = None if not self.config.no_animation_file and not self.disable_writes: starting_animation_file_event = StartingAnimationFileEvent( origin=self, path=out_anim_name, fps=reader.fps, frame_duration=reader.frame_duration, total_frames=reader.total_frames ) yield starting_animation_file_event with anim_writer as writer: for frame_idx in range(0, reader.total_frames): if self._last_frame_time == 0: eta = None else: self._frame_time_sum += time.time() - self._last_frame_time eta_seconds = (self._frame_time_sum / frame_idx) * ( reader.total_frames - frame_idx) eta = datetime.timedelta(seconds=eta_seconds) self._last_frame_time = time.time() eta_str = str(eta) if eta is not None else 'tbd...' _messages.log( fr'{self.message_header}: Processing Frame {frame_idx + 1}/{reader.total_frames}, Completion ETA: {eta_str}') if eta is not None: yield AnimationETAEvent(origin=self, frame_index=frame_idx, total_frames=reader.total_frames, eta=eta) frame_filename = out_filename_base + f'_frame_{frame_idx + 1}.{self.config.frame_format}' # Processing happens here, when the frame is read with next(reader) as frame: frame_generated_event = ImageGeneratedEvent( origin=self, image=frame, generation_step=generation_step, suggested_directory=os.path.dirname(out_filename_base), suggested_filename=os.path.basename(frame_filename), is_animation_frame=True, frame_index=frame_idx ) yield frame_generated_event if not self.config.no_animation_file: writer.write(frame) if not self.config.no_frames and not self.disable_writes: # frames do not get the _processed_ suffix in any case if not self.config.output_overwrite: frame_filename = _filelock.touch_avoid_duplicate( out_directory if out_directory else '.', path_maker=_filelock.suffix_path_maker(frame_filename, duplicate_output_suffix)) frame.save(frame_filename) self._record_save_image(frame_filename) yield ImageFileSavedEvent( origin=self, path=frame_filename, generated_event=frame_generated_event) _messages.log(fr'{self.message_header}: Wrote Frame "{frame_filename}"') frame_idx += 1 yield AnimationFinishedEvent( origin=self, starting_event=starting_animation_event) if not self.config.no_animation_file and not self.disable_writes: self._record_save_animation(out_filename) yield AnimationFileFinishedEvent( origin=self, path=out_filename, starting_event=starting_animation_file_event) _messages.log(fr'{self.message_header}: Wrote File "{out_anim_name}"', underline=True) def _process_file(self, file, out_filename, generation_step, total_generation_steps): if self.config.processors: processor = self.image_processor_loader.load(self.config.processors, device=self.config.device) else: processor = None with _mediainput.MediaReader( path=file, image_processor=processor, resize_resolution=self.config.resize, aspect_correct=not self.config.no_aspect, align=self.config.align, frame_start=self.config.frame_start, frame_end=self.config.frame_end) as reader: self._last_frame_time = 0 self._frame_time_sum = 0 yield StartingGenerationStepEvent(origin=self, generation_step=generation_step, total_steps=total_generation_steps) yield from self._process_reader(file, reader, out_filename, generation_step) def _run(self) -> RenderLoopEventStream: self.config.check() self._written_images = _files.GCFile( tempfile.TemporaryFile('w+t')) self._written_animations = _files.GCFile( tempfile.TemporaryFile('w+t')) total_generation_steps = len(self.config.input) def _is_dir_spec(path): return os.path.isdir(path) or path[-1] in '/\\' if self.config.output and len(self.config.output) == 1 and _is_dir_spec(self.config.output[0]): for idx, file in enumerate(self.config.input): file = _mediainput.url_aware_normpath(file) base, ext = os.path.splitext(_mediainput.url_aware_basename(file)) output_file = os.path.normpath( os.path.join(self.config.output[0], base + f'_processed_{idx + 1}{ext}')) yield from self._process_file(file, output_file, idx, total_generation_steps) else: for idx, file in enumerate(self.config.input): file = _mediainput.url_aware_normpath(file) output_file = _mediainput.url_aware_normpath( self.config.output[idx] if self.config.output else file) if file == output_file and not self.config.output_overwrite: if not _mediainput.is_downloadable_url(file): base, ext = os.path.splitext(output_file) else: base, ext = os.path.splitext(_mediainput.url_aware_basename(output_file)) output_file = base + f'_processed_{idx + 1}{ext}' elif _is_dir_spec(output_file): base, ext = os.path.splitext(_mediainput.url_aware_basename(file)) output_file = os.path.join(output_file, base + f'_processed_{idx + 1}{ext}') yield from self._process_file(file, output_file, idx, total_generation_steps)
[docs] def run(self): """ Run the render loop, this calls :py:meth:`ImageProcessRenderLoopConfig.check` prior to running. :raises ImageProcessRenderLoopConfigError: """ for _ in self._run(): continue
[docs] def events(self) -> RenderLoopEventStream: """ Run the render loop, and iterate over a stream of event objects produced by the render loop. Event objects are of the union type :py:class:`.RenderLoopEvent` The exceptions mentioned here are those you may encounter upon iterating, they will not occur upon simple acquisition of the event stream iterator. :raises ImageProcessRenderLoopConfigError: :return: :py:class:`.RenderLoopEventStream` """ try: self._iterating = True yield from self._run() finally: self._iterating = False
__all__ = _types.module_all()