# Copyright (c) 2023, Teriks
#
# dgenerate is distributed under the following BSD 3-Clause License
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections.abc
import datetime
import os.path
import pathlib
import tempfile
import time
import typing
import PIL.Image
import dgenerate.filelock as _filelock
import dgenerate.image_process.renderloopconfig as _renderloopconfig
import dgenerate.imageprocessors as _imageprocessors
import dgenerate.mediainput as _mediainput
import dgenerate.mediaoutput as _mediaoutput
import dgenerate.messages as _messages
import dgenerate.types as _types
import dgenerate.files as _files
from dgenerate.events import \
Event, \
AnimationFinishedEvent, \
StartingGenerationStepEvent, \
AnimationETAEvent, \
StartingAnimationEvent, \
StartingAnimationFileEvent
[docs]
class AnimationFileFinishedEvent(Event):
"""
Generated in the event stream of :py:meth:`.ImageProcessRenderLoop.events`
Occurs when an animation (video or animated image) has finished being written to disk.
"""
path: str
"""
Path on disk where the video/animated image was saved.
"""
starting_event: StartingAnimationFileEvent
"""
Animation :py:class:`.StartingAnimationFileEvent` related to this file finished event.
"""
[docs]
def __init__(self, origin: 'ImageProcessRenderLoop', path: str, starting_event: StartingAnimationFileEvent):
super().__init__(origin)
self.starting_event = starting_event
self.path = path
[docs]
class ImageGeneratedEvent(Event):
"""
Generated in the event stream of :py:meth:`.ImageProcessRenderLoop.events`
Occurs when an image is generated (but not saved yet).
"""
image: PIL.Image.Image
"""
The generated image.
"""
generation_step: int
"""
The current generation step. (zero indexed)
"""
suggested_directory: str
"""
A suggested directory path for saving this image in.
A value of ``'.'`` may be present, this indicates the current working directory.
"""
suggested_filename: str
"""
A suggested filename for saving this image as. This filename will be unique
to the render loop run / configuration. This is just the filename, it will
not contain a directory name.
"""
is_animation_frame: bool
"""
Is this image a frame in an animation?
"""
frame_index: _types.OptionalInteger
"""
The frame index if this is an animation frame.
"""
[docs]
def __init__(self,
origin: 'ImageProcessRenderLoop',
image: PIL.Image.Image,
generation_step: int,
suggested_directory: str,
suggested_filename: str,
is_animation_frame=False,
frame_index: _types.OptionalInteger = None):
super().__init__(origin)
self.image = image
self.generation_step = generation_step
self.suggested_directory = suggested_directory if suggested_directory.strip() else '.'
self.suggested_filename = suggested_filename
self.is_animation_frame = is_animation_frame
self.frame_index = frame_index
[docs]
class ImageFileSavedEvent(Event):
"""
Generated in the event stream of :py:meth:`.ImageProcessRenderLoop.events`
Occurs when an image file is written to disk.
"""
generated_event: ImageGeneratedEvent
"""
The :py:class:`.ImageGeneratedEvent` for the image that was saved.
"""
path: str
"""
Path to the saved image.
"""
[docs]
def __init__(self, origin: 'ImageProcessRenderLoop', generated_event, path):
super().__init__(origin)
self.generated_event = generated_event
self.path = path
RenderLoopEvent = \
typing.Union[ImageGeneratedEvent,
StartingAnimationEvent,
StartingAnimationFileEvent,
AnimationFileFinishedEvent,
ImageFileSavedEvent,
AnimationFinishedEvent,
StartingGenerationStepEvent,
AnimationETAEvent]
"""
Possible events from the event stream created by :py:meth:`.ImageProcessRenderLoop.events`
"""
RenderLoopEventStream = typing.Generator[RenderLoopEvent, None, None]
"""
Event stream created by :py:meth:`.ImageProcessRenderLoop.events`
"""
[docs]
class ImageProcessRenderLoop:
"""
Implements the behavior of the ``image-process`` sub-command as well as ``\\image_process`` directive.
"""
image_processor_loader: _imageprocessors.ImageProcessorLoader
"""
The loader responsible for loading user specified image processors
"""
message_header: str = 'image-process'
"""
Used as the header for messages written via :py:mod:`dgenerate.messages`
"""
disable_writes: bool = False
"""
Disable or enable all writes to disk, if you intend to only ever use the event
stream of the render loop when using dgenerate as a library, this is a useful option.
:py:attr:`RenderLoop.last_images` and :py:attr:`last_animations` will not be available
if writes to disk are disabled.
"""
[docs]
def __init__(self, config: _renderloopconfig.ImageProcessRenderLoopConfig = None,
image_processor_loader: _imageprocessors.ImageProcessorLoader | None = None):
if config is None:
self.config = _renderloopconfig.ImageProcessRenderLoopConfig()
else:
self.config = config
if image_processor_loader is None:
self.image_processor_loader = _imageprocessors.ImageProcessorLoader()
else:
self.image_processor_loader = image_processor_loader
self._written_images: _files.GCFile | None = None
self._written_animations: _files.GCFile | None = None
self._iterating = False
@property
def written_images(self) -> collections.abc.Iterable[str]:
"""
Iterable over image filenames written by the last run
"""
class Iterable:
def __init__(self, images):
self.images = images
def __iter__(self):
if self.images is None:
return
self.images.seek(0)
for line in self.images:
yield line.rstrip('\n')
return Iterable(self._written_images)
@property
def written_animations(self) -> collections.abc.Iterable[str]:
"""
Iterable over animation filenames written by the last run
"""
class Iterable:
def __init__(self, animations):
self.animations = animations
def __iter__(self):
if self.animations is None:
return
self.animations.seek(0)
for line in self.animations:
yield line.rstrip('\n')
return Iterable(self._written_animations)
def _record_save_image(self, filename):
self._written_images.write(os.path.abspath(filename) + '\n')
def _record_save_animation(self, filename):
self._written_animations.write(os.path.abspath(filename) + '\n')
def _process_reader(self, file, reader: _mediainput.MediaReader, out_filename, generation_step):
out_directory = os.path.dirname(out_filename)
duplicate_output_suffix = '_duplicate_'
if out_directory and not self.disable_writes:
pathlib.Path(out_directory).mkdir(
parents=True, exist_ok=True)
_messages.log(fr'{self.message_header}: Processing "{file}"',
underline=True)
if reader.total_frames == 1:
if not self.config.output_overwrite and not self.disable_writes:
out_filename = _filelock.touch_avoid_duplicate(
out_directory if out_directory else '.',
path_maker=_filelock.suffix_path_maker(out_filename, duplicate_output_suffix))
# Processing happens here, when the frame is read
with next(reader) as processed_image:
generated_event = ImageGeneratedEvent(
origin=self,
image=processed_image,
suggested_directory=os.path.dirname(out_filename),
suggested_filename=os.path.basename(out_filename),
generation_step=generation_step)
yield generated_event
if not self.disable_writes:
processed_image.save(out_filename)
self._record_save_image(out_filename)
yield ImageFileSavedEvent(origin=self,
generated_event=generated_event,
path=out_filename)
_messages.log(fr'{self.message_header}: Wrote Image "{out_filename}"',
underline=True)
else:
out_filename_base, ext = os.path.splitext(out_filename)
if not self.config.output_overwrite and not self.disable_writes:
out_anim_name = _filelock.touch_avoid_duplicate(
out_directory if out_directory else '.',
path_maker=_filelock.suffix_path_maker(out_filename, duplicate_output_suffix))
else:
out_anim_name = out_filename
if not self.config.no_animation_file and not self.disable_writes:
anim_writer = _mediaoutput.create_animation_writer(
animation_format=ext.lstrip('.'),
out_filename=out_anim_name,
fps=reader.fps)
else:
# mock
anim_writer = _mediaoutput.AnimationWriter()
starting_animation_event = StartingAnimationEvent(
origin=self,
total_frames=reader.total_frames,
fps=reader.fps,
frame_duration=reader.frame_duration)
yield starting_animation_event
starting_animation_file_event = None
if not self.config.no_animation_file and not self.disable_writes:
starting_animation_file_event = StartingAnimationFileEvent(
origin=self,
path=out_anim_name,
fps=reader.fps,
frame_duration=reader.frame_duration,
total_frames=reader.total_frames
)
yield starting_animation_file_event
with anim_writer as writer:
for frame_idx in range(0, reader.total_frames):
if self._last_frame_time == 0:
eta = None
else:
self._frame_time_sum += time.time() - self._last_frame_time
eta_seconds = (self._frame_time_sum / frame_idx) * (
reader.total_frames - frame_idx)
eta = datetime.timedelta(seconds=eta_seconds)
self._last_frame_time = time.time()
eta_str = str(eta) if eta is not None else 'tbd...'
_messages.log(
fr'{self.message_header}: Processing Frame {frame_idx + 1}/{reader.total_frames}, Completion ETA: {eta_str}')
if eta is not None:
yield AnimationETAEvent(origin=self,
frame_index=frame_idx,
total_frames=reader.total_frames,
eta=eta)
frame_filename = out_filename_base + f'_frame_{frame_idx + 1}.{self.config.frame_format}'
# Processing happens here, when the frame is read
with next(reader) as frame:
frame_generated_event = ImageGeneratedEvent(
origin=self,
image=frame,
generation_step=generation_step,
suggested_directory=os.path.dirname(out_filename_base),
suggested_filename=os.path.basename(frame_filename),
is_animation_frame=True,
frame_index=frame_idx
)
yield frame_generated_event
if not self.config.no_animation_file:
writer.write(frame)
if not self.config.no_frames and not self.disable_writes:
# frames do not get the _processed_ suffix in any case
if not self.config.output_overwrite:
frame_filename = _filelock.touch_avoid_duplicate(
out_directory if out_directory else '.',
path_maker=_filelock.suffix_path_maker(frame_filename,
duplicate_output_suffix))
frame.save(frame_filename)
self._record_save_image(frame_filename)
yield ImageFileSavedEvent(
origin=self,
path=frame_filename,
generated_event=frame_generated_event)
_messages.log(fr'{self.message_header}: Wrote Frame "{frame_filename}"')
frame_idx += 1
yield AnimationFinishedEvent(
origin=self,
starting_event=starting_animation_event)
if not self.config.no_animation_file and not self.disable_writes:
self._record_save_animation(out_filename)
yield AnimationFileFinishedEvent(
origin=self,
path=out_filename,
starting_event=starting_animation_file_event)
_messages.log(fr'{self.message_header}: Wrote File "{out_anim_name}"',
underline=True)
def _process_file(self, file, out_filename, generation_step, total_generation_steps):
if self.config.processors:
processor = self.image_processor_loader.load(self.config.processors, device=self.config.device)
else:
processor = None
with _mediainput.MediaReader(
path=file,
image_processor=processor,
resize_resolution=self.config.resize,
aspect_correct=not self.config.no_aspect,
align=self.config.align,
frame_start=self.config.frame_start,
frame_end=self.config.frame_end) as reader:
self._last_frame_time = 0
self._frame_time_sum = 0
yield StartingGenerationStepEvent(origin=self,
generation_step=generation_step,
total_steps=total_generation_steps)
yield from self._process_reader(file, reader, out_filename, generation_step)
def _run(self) -> RenderLoopEventStream:
self.config.check()
self._written_images = _files.GCFile(
tempfile.TemporaryFile('w+t'))
self._written_animations = _files.GCFile(
tempfile.TemporaryFile('w+t'))
total_generation_steps = len(self.config.input)
def _is_dir_spec(path):
return os.path.isdir(path) or path[-1] in '/\\'
if self.config.output and len(self.config.output) == 1 and _is_dir_spec(self.config.output[0]):
for idx, file in enumerate(self.config.input):
file = _mediainput.url_aware_normpath(file)
base, ext = os.path.splitext(_mediainput.url_aware_basename(file))
output_file = os.path.normpath(
os.path.join(self.config.output[0], base + f'_processed_{idx + 1}{ext}'))
yield from self._process_file(file, output_file, idx, total_generation_steps)
else:
for idx, file in enumerate(self.config.input):
file = _mediainput.url_aware_normpath(file)
output_file = _mediainput.url_aware_normpath(
self.config.output[idx] if self.config.output else file)
if file == output_file and not self.config.output_overwrite:
if not _mediainput.is_downloadable_url(file):
base, ext = os.path.splitext(output_file)
else:
base, ext = os.path.splitext(_mediainput.url_aware_basename(output_file))
output_file = base + f'_processed_{idx + 1}{ext}'
elif _is_dir_spec(output_file):
base, ext = os.path.splitext(_mediainput.url_aware_basename(file))
output_file = os.path.join(output_file, base + f'_processed_{idx + 1}{ext}')
yield from self._process_file(file, output_file, idx, total_generation_steps)
[docs]
def run(self):
"""
Run the render loop, this calls :py:meth:`ImageProcessRenderLoopConfig.check` prior to running.
:raises dgenerate.OutOfMemoryError: if the execution device runs out of memory
:raises ImageProcessRenderLoopConfigError: on config errors
"""
for _ in self._run():
continue
[docs]
def events(self) -> RenderLoopEventStream:
"""
Run the render loop, and iterate over a stream of event objects produced by the render loop.
Event objects are of the union type :py:class:`.RenderLoopEvent`
The exceptions mentioned here are those you may encounter upon iterating,
they will not occur upon simple acquisition of the event stream iterator.
:raises dgenerate.OutOfMemoryError: if the execution device runs out of memory
:raises ImageProcessRenderLoopConfigError: on config errors
:return: :py:class:`.RenderLoopEventStream`
"""
try:
self._iterating = True
yield from self._run()
finally:
self._iterating = False
__all__ = _types.module_all()