Source code for dgenerate.invoker

# Copyright (c) 2023, Teriks
#
# dgenerate is distributed under the following BSD 3-Clause License
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections.abc
import os.path
import traceback
import typing

import dgenerate.exceptions as _d_exceptions
import dgenerate.arguments as _arguments
import dgenerate.events as _event
import dgenerate.imageprocessors as _imageprocessors
import dgenerate.latentsprocessors as _latentsprocessors
import dgenerate.mediainput as _mediainput
import dgenerate.messages as _messages
import dgenerate.pipelinewrapper as _pipelinewrapper
import dgenerate.promptweighters as _promptweighters
import dgenerate.promptupscalers as _promptupscalers
import dgenerate.plugin as _plugin
import dgenerate.renderloop as _renderloop
import dgenerate.subcommands as _subcommands
import dgenerate.prompt as _prompt
import dgenerate.globalconfig as _globalconfig
import dgenerate.textprocessing as _textprocessing
import dgenerate.hfhub as _hfhub
import dgenerate.webcache as _webcache
import dgenerate.spacycache as _spacycache

__doc__ = """
Functions to invoke dgenerate inside the current process using its command line arguments.
"""


[docs] class DgenerateExitEvent(_event.Event): """ Generated in the event stream created by :py:func:`.invoke_dgenerate_events` Exit with return code event for :py:func:`.invoke_dgenerate_events` """ return_code: int
[docs] def __init__(self, origin, return_code: int): super().__init__(origin) self.return_code = return_code
InvokeDgenerateEvents = _renderloop.RenderLoopEvent | DgenerateExitEvent """ Events yield-able by :py:func:`.invoke_dgenerate_events` """ InvokeDgenerateEventStream = typing.Generator[InvokeDgenerateEvents, None, None] """ Event stream produced by :py:func:`.invoke_dgenerate_events` """
[docs] def invoke_dgenerate(args: collections.abc.Sequence[str], render_loop: _renderloop.RenderLoop | None = None, config_overrides: dict[str, typing.Any] | None = None, throw: bool = False, log_error: bool = True, help_raises: bool = False) -> int: """ Invoke dgenerate using its command line arguments and return a return code. dgenerate is invoked in the current process, this method does not spawn a subprocess. Meta arguments such as ``--file``, ``--shell``, ``--no-stdin``, and ``--console`` are not supported :param args: dgenerate command line arguments in the form of a list, see: shlex module, or sys.argv :param render_loop: :py:class:`dgenerate.renderloop.RenderLoop` instance, if ``None`` is provided one will be created. Note that the config object generated by argument parsing will completely overwrite the render loop config. :param config_overrides: Optional dictionary of configuration overrides to apply to the render loop config object after argument parsing, this should consist of attribute names with values, the config object generated by argument parsing is of type :py:class:`dgenerate.arguments.DgenerateArguments`. :param throw: Whether to throw known exceptions or handle them. :param log_error: Write ERROR diagnostics with :py:mod:`dgenerate.messages`? :param help_raises: ``--help`` raises :py:exc:`dgenerate.arguments.DgenerateHelpException` ? When ``True``, this will occur even if ``throw=False`` :raises dgenerate.DgenerateUsageError: :raises dgenerate.DgenerateHelpException: :raises dgenerate.ImageSeedError: :raises dgenerate.UnknownMimetypeError: :raises dgenerate.MediaIdentificationError: :raises dgenerate.FrameStartOutOfBounds: :raises dgenerate.InvalidModelFileError: :raises dgenerate.InvalidModelUriError: :raises dgenerate.ModelUriLoadError: :raises dgenerate.InvalidSchedulerNameError: :raises dgenerate.OutOfMemoryError: :raises dgenerate.ModelNotFoundError: :raises dgenerate.NonHFModelDownloadError: :raises dgenerate.UnsupportedPipelineConfigError: :raises dgenerate.PromptWeightingUnsupported: :raises dgenerate.PluginNotFoundError: :raises dgenerate.PluginArgumentError: :raises dgenerate.ModuleFileNotFoundError: :raises dgenerate.SpacyModelNotFoundException: :raises dgenerate.WebFileCacheOfflineModeException: :raises OSError: :return: integer return-code, anything other than 0 is failure """ for event in invoke_dgenerate_events(**locals()): if isinstance(event, DgenerateExitEvent): return event.return_code return 0
[docs] def invoke_dgenerate_events( args: collections.abc.Sequence[str], render_loop: _renderloop.RenderLoop | None = None, config_overrides: dict[str, typing.Any] | None = None, throw: bool = False, log_error: bool = True, help_raises: bool = False) -> InvokeDgenerateEventStream: """ Invoke dgenerate using its command line arguments and return a stream of events, you must iterate over this stream of events to progress through the execution of dgenerate. dgenerate is invoked in the current process, this method does not spawn a subprocess. Meta arguments such as ``--file``, ``--shell``, ``--no-stdin``, and ``--console`` are not supported The exceptions mentioned here are those you may encounter upon iterating, they will not occur upon simple acquisition of the event stream iterator. :param args: dgenerate command line arguments in the form of a list, see: shlex module, or sys.argv :param render_loop: :py:class:`dgenerate.renderloop.RenderLoop` instance, if ``None`` is provided one will be created. Note that the config object generated by argument parsing will completely overwrite the render loop config. :param config_overrides: Optional dictionary of configuration overrides to apply to the render loop config object after argument parsing, this should consist of attribute names with values, the config object generated by argument parsing is of type :py:class:`dgenerate.arguments.DgenerateArguments`. :param throw: Whether to throw known exceptions or handle them. :param log_error: Write ERROR diagnostics with :py:mod:`dgenerate.messages`? :param help_raises: ``--help`` raises :py:exc:`dgenerate.arguments.DgenerateHelpException` ? When ``True``, this will occur even if ``throw=False`` :raises dgenerate.DgenerateUsageError: :raises dgenerate.DgenerateHelpException: :raises dgenerate.ImageSeedError: :raises dgenerate.UnknownMimetypeError: :raises dgenerate.MediaIdentificationError: :raises dgenerate.FrameStartOutOfBounds: :raises dgenerate.InvalidModelFileError: :raises dgenerate.InvalidModelUriError: :raises dgenerate.ModelUriLoadError: :raises dgenerate.InvalidSchedulerNameError: :raises dgenerate.OutOfMemoryError: :raises dgenerate.ModelNotFoundError: :raises dgenerate.NonHFModelDownloadError: :raises dgenerate.UnsupportedPipelineConfigError: :raises dgenerate.PromptWeightingUnsupported: :raises dgenerate.PromptEmbeddedArgumentError: :raises dgenerate.PluginNotFoundError: :raises dgenerate.PluginArgumentError: :raises dgenerate.ModuleFileNotFoundError: :raises dgenerate.SpacyModelNotFoundException: :raises dgenerate.WebFileCacheOfflineModeException: :raises OSError: :return: :py:data:`.InvokeDgenerateEventStream` """ def debug_exception(error: Exception): _messages.debug_log( lambda: _textprocessing.underline("invoker.invoke_dgenerate_events() error trace:") + "\n" + _textprocessing.underline('\n'.join(traceback.format_exception(error))) ) def unexpected_exception_trace(error: Exception): _messages.error( _textprocessing.underline("invoker.invoke_dgenerate_events() unexpected error trace:") + "\n" + _textprocessing.underline('\n'.join(traceback.format_exception(error))) ) def rethrow_with_message(error: Exception, usage_error=False, expected=True): if expected: debug_exception(error) else: unexpected_exception_trace(error) if log_error and expected: _messages.error(f'dgenerate: error: {str(error).strip()}') if throw: if usage_error: raise _arguments.DgenerateUsageError(error) from error else: raise error return DgenerateExitEvent(invoke_dgenerate_events, 1) try: plugin_module_paths, plugin_module_paths_rest = _arguments.parse_plugin_modules(args) except _arguments.DgenerateUsageError as e: yield rethrow_with_message(e) return if plugin_module_paths: _plugin.import_plugins(plugin_module_paths) try: quantizer_help, _ = _arguments.parse_quantizer_help( args, throw_unknown=True, log_error=log_error) except _arguments.DgenerateUsageError as e: debug_exception(e) if throw: raise yield DgenerateExitEvent(invoke_dgenerate_events, 1) return if quantizer_help is not None: try: yield DgenerateExitEvent( origin=invoke_dgenerate_events, return_code=_pipelinewrapper.quantizer_help( names=quantizer_help, log_error=False, throw=True)) except _pipelinewrapper.UnknownQuantizerName as e: yield rethrow_with_message(e) return try: prompt_weighter_help, _ = _arguments.parse_prompt_weighter_help( args, throw_unknown=True, log_error=log_error) except _arguments.DgenerateUsageError as e: debug_exception(e) if throw: raise yield DgenerateExitEvent(invoke_dgenerate_events, 1) return if prompt_weighter_help is not None: try: yield DgenerateExitEvent( origin=invoke_dgenerate_events, return_code=_promptweighters.prompt_weighter_help( names=prompt_weighter_help, log_error=False, throw=True)) except (_promptweighters.PromptWeighterNotFoundError, _plugin.ModuleFileNotFoundError) as e: yield rethrow_with_message(e) return try: prompt_upscaler_help, _ = _arguments.parse_prompt_upscaler_help( args, throw_unknown=True, log_error=log_error) except _arguments.DgenerateUsageError as e: debug_exception(e) if throw: raise yield DgenerateExitEvent(invoke_dgenerate_events, 1) return if prompt_upscaler_help is not None: try: yield DgenerateExitEvent( origin=invoke_dgenerate_events, return_code=_promptupscalers.prompt_upscaler_help( names=prompt_upscaler_help, log_error=False, throw=True)) except (_promptupscalers.PromptUpscalerNotFoundError, _plugin.ModuleFileNotFoundError) as e: yield rethrow_with_message(e) return try: image_processor_help, _ = _arguments.parse_image_processor_help( args, throw_unknown=True, log_error=log_error) except _arguments.DgenerateUsageError as e: debug_exception(e) if throw: raise yield DgenerateExitEvent(invoke_dgenerate_events, 1) return if image_processor_help is not None: try: yield DgenerateExitEvent( origin=invoke_dgenerate_events, return_code=_imageprocessors.image_processor_help( names=image_processor_help, log_error=False, throw=True)) except (_imageprocessors.ImageProcessorNotFoundError, _plugin.ModuleFileNotFoundError) as e: yield rethrow_with_message(e) return try: latents_processor_help, _ = _arguments.parse_latents_processor_help( args, throw_unknown=True, log_error=log_error) except _arguments.DgenerateUsageError as e: debug_exception(e) if throw: raise yield DgenerateExitEvent(invoke_dgenerate_events, 1) return if latents_processor_help is not None: try: yield DgenerateExitEvent( origin=invoke_dgenerate_events, return_code=_latentsprocessors.latents_processor_help( names=latents_processor_help, log_error=False, throw=True)) except (_latentsprocessors.LatentsProcessorNotFoundError, _plugin.ModuleFileNotFoundError) as e: yield rethrow_with_message(e) return try: sub_command_help, _ = _arguments.parse_sub_command_help( args, throw_unknown=True, log_error=log_error) except _arguments.DgenerateUsageError as e: debug_exception(e) if throw: raise yield DgenerateExitEvent(invoke_dgenerate_events, 1) return if sub_command_help is not None: try: yield DgenerateExitEvent( origin=invoke_dgenerate_events, return_code=_subcommands.sub_command_help( names=sub_command_help, log_error=False, throw=True)) except (_subcommands.SubCommandNotFoundError, _plugin.ModuleFileNotFoundError) as e: yield rethrow_with_message(e) return try: sub_command_name, sub_command_name_rest = _arguments.parse_sub_command(plugin_module_paths_rest) except _arguments.DgenerateUsageError as e: yield rethrow_with_message(e) return if sub_command_name is not None: verbose, verbose_rest = _arguments.parse_verbose(sub_command_name_rest) offline_mode, offline_mode_rest = _arguments.parse_offline_mode(verbose_rest) if verbose: _messages.push_level(_messages.DEBUG) else: _messages.push_level(_messages.INFO) try: yield DgenerateExitEvent( origin=invoke_dgenerate_events, return_code=_subcommands.SubCommandLoader().load( plugin_module_paths=plugin_module_paths, uri=sub_command_name, local_files_only=offline_mode, args=offline_mode_rest)()) except (_plugin.PluginNotFoundError, _plugin.PluginArgumentError, _hfhub.NonHFDownloadError, _d_exceptions.ModelNotFoundError, _d_exceptions.ConfigNotFoundError, _webcache.WebFileCacheOfflineModeException) as e: yield rethrow_with_message(e) except Exception as e: yield rethrow_with_message(e, expected=False) finally: _messages.pop_level() return try: template_help_variable_names, _ = _arguments.parse_templates_help( args, throw_unknown=True, log_error=log_error) directives_help_variable_names, _ = _arguments.parse_directives_help( args, throw_unknown=True, log_error=log_error) functions_help_variable_names, _ = _arguments.parse_functions_help( args, throw_unknown=True, log_error=log_error) except _arguments.DgenerateUsageError as e: debug_exception(e) if throw: raise yield DgenerateExitEvent(invoke_dgenerate_events, 1) return if template_help_variable_names is not None \ or directives_help_variable_names is not None \ or functions_help_variable_names is not None: import dgenerate.batchprocess config_runner_args = dict() if plugin_module_paths: config_runner_args['injected_args'] = ['--plugin-modules'] + plugin_module_paths try: config_runner = dgenerate.batchprocess.ConfigRunner(**config_runner_args) except _plugin.ModuleFileNotFoundError as e: yield rethrow_with_message(e) return if template_help_variable_names is not None: try: _messages.log( config_runner.generate_template_variables_help( template_help_variable_names, show_values=False)) except ValueError as e: yield rethrow_with_message(e, usage_error=True) return if directives_help_variable_names is not None: try: _messages.log( config_runner.generate_directives_help( directives_help_variable_names)) except ValueError as e: yield rethrow_with_message(e, usage_error=True) return if functions_help_variable_names is not None: try: _messages.log( config_runner.generate_functions_help( functions_help_variable_names)) except ValueError as e: yield rethrow_with_message(e, usage_error=True) return yield DgenerateExitEvent(invoke_dgenerate_events, 0) return try: arguments = _arguments.parse_args( args, log_error=log_error, help_raises=True, overrides=config_overrides ) except _arguments.DgenerateHelpException: if help_raises: raise yield DgenerateExitEvent(invoke_dgenerate_events, 0) return except _arguments.DgenerateUsageError as e: debug_exception(e) if throw: raise yield DgenerateExitEvent(invoke_dgenerate_events, 1) return message_level = _messages.DEBUG if arguments.verbose else _messages.INFO with _messages.with_level(message_level), \ _globalconfig.restore_config_context(): if arguments.global_config is not None: global_config_ext = os.path.splitext(arguments.global_config)[1] try: with open(arguments.global_config, 'r') as global_config_file: _globalconfig.load_config(global_config_file, mode=global_config_ext) except Exception as e: _messages.log(f'Could not load global config file: {e}') yield DgenerateExitEvent(invoke_dgenerate_events, 1) return try: render_loop.config = arguments render_loop.config.apply_prompt_upscalers() yield from render_loop.events() except (_mediainput.ImageSeedError, _mediainput.UnknownMimetypeError, _mediainput.MediaIdentificationError, _mediainput.FrameStartOutOfBounds, _d_exceptions.ModelNotFoundError, _d_exceptions.ConfigNotFoundError, _hfhub.NonHFDownloadError, _pipelinewrapper.InvalidModelFileError, _pipelinewrapper.InvalidModelUriError, _pipelinewrapper.ModelUriLoadError, _pipelinewrapper.SchedulerLoadError, _pipelinewrapper.UnsupportedPipelineConfigError, _promptweighters.PromptWeightingUnsupported, _promptupscalers.PromptUpscalerProcessingError, _prompt.PromptEmbeddedArgumentError, _plugin.ModuleFileNotFoundError, _plugin.PluginNotFoundError, _plugin.PluginArgumentError, _d_exceptions.OutOfMemoryError, _webcache.WebFileCacheOfflineModeException, OSError) as e: yield rethrow_with_message(e) return except Exception as e: yield rethrow_with_message(e, expected=False) return # Return the template environment for pipelining yield DgenerateExitEvent(invoke_dgenerate_events, 0) return