# Copyright (c) 2023, Teriks
#
# dgenerate is distributed under the following BSD 3-Clause License
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections.abc
import contextlib
import decimal
import functools
import importlib.util
import inspect
import math
import typing
import DeepCache as _deepcache
import PIL.Image
import diffusers
import numpy
import torch
import dgenerate.eval as _eval
import dgenerate.extras.asdff.base as _asdff_base
import dgenerate.extras.hidiffusion as _hidiffusion
import dgenerate.extras.sada.patch as _sada
import dgenerate.extras.teacache.teacache_flux as _teacache_flux
import dgenerate.hfhub as _hfhub
import dgenerate.image as _image
import dgenerate.imageprocessors as _imageprocessors
import dgenerate.latentsprocessors as _latentsprocessors
import dgenerate.mediainput as _mediainput
import dgenerate.messages as _messages
import dgenerate.pipelinewrapper.constants as _constants
import dgenerate.pipelinewrapper.enums as _enums
import dgenerate.pipelinewrapper.help as _help
import dgenerate.pipelinewrapper.pipelines as _pipelines
import dgenerate.pipelinewrapper.schedulers as _schedulers
import dgenerate.pipelinewrapper.uris as _uris
import dgenerate.pipelinewrapper.util as _util
import dgenerate.prompt as _prompt
import dgenerate.promptweighters as _promptweighters
import dgenerate.textprocessing as _textprocessing
import dgenerate.torchutil as _torchutil
import dgenerate.types as _types
from dgenerate.extras.ras import RASArgs as _RASArgs
from dgenerate.extras.ras import sd3_ras_context as _sd3_ras_context
from dgenerate.pipelinewrapper.arguments import DiffusionArguments
from dgenerate.pipelinewrapper.denoise_range import DenoiseRangeError as _DenoiseRangeError
from dgenerate.pipelinewrapper.denoise_range import denoise_range as _denoise_range
from dgenerate.pipelinewrapper.denoise_range import supports_native_denoising_start as _supports_native_denoising_start
class _InpaintCropInfo:
"""
Contains state information for inpaint crop processing.
This object stores all necessary information to apply inpaint crop pasting
after the diffusion process completes.
"""
def __init__(self,
original_images: list[PIL.Image.Image],
original_masks: list[PIL.Image.Image] | None,
crop_bounds: tuple[int, int, int, int],
use_masked: bool = False,
feather: int | None = None):
"""
Initialize inpaint crop information.
:param original_images: List of original uncropped images to paste onto
:param original_masks: List of original uncropped masks (for masked pasting)
:param crop_bounds: Crop bounds as (left, top, right, bottom)
:param use_masked: Whether to use masked pasting
:param feather: Optional feather value for feathered pasting
"""
self.original_images = original_images
self.original_masks = original_masks
self.crop_bounds = crop_bounds
self.use_masked = use_masked
self.feather = feather
def __repr__(self) -> str:
return (f"_InpaintCropInfo(original_images={len(self.original_images)}, "
f"crop_bounds={self.crop_bounds}, use_masked={self.use_masked}, "
f"feather={self.feather})")
[docs]
class DiffusionArgumentsHelpException(Exception):
"""
Thrown when a :py:class:`DiffusionArguments` attribute that supports
passing a help request value (such as :py:attr:`DiffusionArguments.scheduler_uri`)
is passed its help value.
This exception returns the help string to the caller.
"""
pass
[docs]
class PipelineWrapperResult:
"""
The result of calling :py:class:`.DiffusionPipelineWrapper`
"""
images: _types.MutableImages | None
latents: _types.MutableTensors | None
@property
def image_count(self) -> int:
"""
The number of images produced.
:return: int
"""
if self.images is None:
return 0
return len(self.images)
@property
def latents_count(self) -> int:
"""
The number of latents produced.
:return: int
"""
if self.latents is None:
return 0
return len(self.latents)
@property
def output_count(self) -> int:
"""
The number of outputs produced (images or latents).
:return: int
"""
return max(self.image_count, self.latents_count)
@property
def image(self) -> PIL.Image.Image | None:
"""
The first image in the batch of requested batch size.
:return: :py:class:`PIL.Image.Image`
"""
return self.images[0] if self.images else None
@property
def latent(self) -> torch.Tensor | None:
"""
The first latent in the batch of requested batch size.
:return: :py:class:`torch.Tensor`
"""
return self.latents[0] if self.latents else None
@property
def has_images(self) -> bool:
"""
Whether this result contains images.
:return: bool
"""
return self.images is not None and len(self.images) > 0
@property
def has_latents(self) -> bool:
"""
Whether this result contains latents.
:return: bool
"""
return self.latents is not None and len(self.latents) > 0
[docs]
def image_grid(self, cols_rows: _types.Size):
"""
Render an image grid from the images in this result.
:raise ValueError: if no images are present on this object.
This is impossible if this object was produced by :py:class:`.DiffusionPipelineWrapper`.
:raise ValueError: if this result contains latents instead of images.
Image grids can only be created from decoded images, not raw latent tensors.
:param cols_rows: columns and rows (WxH) desired as a tuple
:return: :py:class:`PIL.Image.Image`
"""
if not self.images:
if self.has_latents:
raise ValueError(
'Cannot create image grid from latent tensors. '
'Image grids can only be created from decoded images, not raw latent tensors. '
'Use output_latents=False to get decoded images instead.'
)
else:
raise ValueError('No images present.')
if len(self.images) == 1:
return self.images[0]
cols, rows = cols_rows
w, h = self.images[0].size
grid = PIL.Image.new('RGB', size=(cols * w, rows * h))
for i, img in enumerate(self.images):
grid.paste(img, box=(i % cols * w, i // cols * h))
return grid
[docs]
def __init__(self, images: _types.Images | None = None, latents: _types.MutableTensors | None = None):
if images is None and latents is None:
raise ValueError("PipelineWrapperResult must have either images or latents, both cannot be None")
if images is not None and latents is not None:
raise ValueError("PipelineWrapperResult cannot have both images and latents, only one is allowed")
self.images = images
self.latents = latents
self.dgenerate_opts = list()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if self.images is not None:
for i in self.images:
if i is not None:
i.close()
self.images = None
# Latents don't need explicit cleanup like PIL images
@contextlib.contextmanager
def _deep_cache_context(pipeline,
cache_interval: int = 5,
cache_branch_id: int = 1,
skip_mode: str = 'uniform',
enabled: bool = False):
if enabled:
_messages.debug_log(
f'Enabling DeepCache on pipeline: {pipeline.__class__.__name__}')
helper = _deepcache.DeepCacheSDHelper(pipe=pipeline)
helper.set_params(
cache_interval=cache_interval,
cache_branch_id=cache_branch_id,
skip_mode=skip_mode
)
helper.enable()
try:
yield
finally:
_messages.debug_log(
f'Disabling DeepCache on pipeline: {pipeline.__class__.__name__}')
helper.disable()
else:
yield
@contextlib.contextmanager
def _hi_diffusion(
pipeline,
generator,
enabled: bool,
no_raunet: bool | None = None,
no_window_attn: bool | None = None
):
if enabled:
sd15cn = pipeline.__class__.__name__.startswith('StableDiffusionControlNet')
if no_raunet is None:
no_raunet = sd15cn
if no_window_attn is None:
no_window_attn = sd15cn
_messages.debug_log(
f'Enabling HiDiffusion on pipeline: {pipeline.__class__.__name__} '
f'(no_raunet={no_raunet}, no_window_attn={no_window_attn})')
_hidiffusion.apply_hidiffusion(
pipeline,
generator=generator,
apply_raunet=not no_raunet,
apply_window_attn=not no_window_attn
)
try:
yield
finally:
if enabled:
_messages.debug_log(
f'Disabling HiDiffusion on pipeline: {pipeline.__class__.__name__}')
_hidiffusion.remove_hidiffusion(pipeline)
@contextlib.contextmanager
def _sada_context(
pipeline,
width: int,
height: int,
enabled: bool,
max_downsample: int = 1,
sx: int = 2,
sy: int = 2,
acc_range: tuple = (10, 47),
lagrange_term: int = 0,
lagrange_int: int | None = None,
lagrange_step: int | None = None,
max_fix: int = 5 * 1024,
max_interval: int = 4,
):
"""
Context manager for SADA (Stability-guided Adaptive Diffusion Acceleration).
"""
try:
if enabled:
# Calculate latent size for transformer models (SD3, Flux)
latent_size = None
if hasattr(pipeline, 'transformer'):
# For Flux and other transformer models, calculate latent size based on width/height
# Based on sada-icml examples: latent_size = (height // 16, width // 16)
latent_size = (height // 16, width // 16)
def debug_message(args = locals()):
args.pop('pipeline')
return f'Enabling SADA on pipeline: {pipeline.__class__.__name__}, Args: {args}'
_messages.debug_log(debug_message)
_sada.apply_patch(
pipeline,
max_downsample=max_downsample,
sx=sx,
sy=sy,
latent_size=latent_size,
acc_range=acc_range,
lagrange_term=lagrange_term,
lagrange_int=lagrange_int,
lagrange_step=lagrange_step,
max_fix=max_fix,
max_interval=max_interval
)
yield
except _sada.exceptions.SADAUnsupportedError as e:
raise _pipelines.UnsupportedPipelineConfigError(str(e)) from e
finally:
if enabled:
_messages.debug_log(
f'Disabling SADA on pipeline: {pipeline.__class__.__name__}')
_sada.remove_patch(pipeline)
@contextlib.contextmanager
def _freeu(pipeline, params: tuple[float, float, float, float] | None):
if params is not None:
_messages.debug_log(
f'Enabling FreeU on pipeline: {pipeline.__class__.__name__}')
pipeline.enable_freeu(*params)
try:
yield
finally:
if params is not None:
_messages.debug_log(
f'Disabling FreeU on pipeline: {pipeline.__class__.__name__}')
pipeline.disable_freeu()
[docs]
class DiffusionPipelineWrapper:
"""
Monolithic diffusion pipelines wrapper.
"""
__LAST_RECALL_PIPELINE: _pipelines.PipelineFactory = None
__LAST_RECALL_SECONDARY_PIPELINE: _pipelines.PipelineFactory = None
@staticmethod
def _normalize_uris(uris: _types.OptionalUris | str | None) -> _types.OptionalUris:
"""
Normalize URI arguments - convert single strings to lists.
:param uris: Single URI string, list of URIs, or None
:return: List of URIs or None
"""
if uris is None:
return None
if isinstance(uris, str):
return [uris]
return uris
[docs]
def __init__(self,
model_path: _types.Path,
model_type: _enums.ModelType | str = _enums.ModelType.SD,
revision: _types.OptionalName = None,
variant: _types.OptionalName = None,
subfolder: _types.OptionalName = None,
dtype: _enums.DataType | str = _enums.DataType.AUTO,
unet_uri: _types.OptionalUri = None,
second_model_unet_uri: _types.OptionalUri = None,
transformer_uri: _types.OptionalUri = None,
vae_uri: _types.OptionalUri = None,
lora_uris: _types.OptionalUris = None,
lora_fuse_scale: _types.OptionalFloat = None,
image_encoder_uri: _types.OptionalUri = None,
ip_adapter_uris: _types.OptionalUris = None,
textual_inversion_uris: _types.OptionalUris = None,
text_encoder_uris: _types.OptionalUris = None,
second_model_text_encoder_uris: _types.OptionalUris = None,
controlnet_uris: _types.OptionalUris = None,
t2i_adapter_uris: _types.OptionalUris = None,
sdxl_refiner_uri: _types.OptionalUri = None,
s_cascade_decoder_uri: _types.OptionalUri = None,
quantizer_uri: _types.OptionalUri = None,
quantizer_map: _types.OptionalStrings = None,
second_model_quantizer_uri: _types.OptionalUri = None,
second_model_quantizer_map: _types.OptionalStrings = None,
device: str = _torchutil.default_device(),
safety_checker: bool = False,
original_config: _types.OptionalString = None,
second_model_original_config: _types.OptionalString = None,
auth_token: _types.OptionalString = None,
local_files_only: bool = False,
model_extra_modules: dict[str, typing.Any] = None,
second_model_extra_modules: dict[str, typing.Any] = None,
model_cpu_offload: bool = False,
model_sequential_offload: bool = False,
second_model_cpu_offload: bool = False,
second_model_sequential_offload: bool = False,
prompt_weighter_loader: _promptweighters.PromptWeighterLoader | None = None,
latents_processor_loader: _latentsprocessors.LatentsProcessorLoader | None = None,
decoded_latents_image_processor_loader: _imageprocessors.ImageProcessorLoader | None = None,
adetailer_detector_uris: _types.OptionalUris = None,
adetailer_crop_control_image: bool = False):
"""
This is a monolithic wrapper around all supported diffusion pipelines which handles
txt2img, img2img, and inpainting on demand. It spins up the correct pipelines as needed
in order to handle provided pipeline arguments using lazy initialization.
Pipelines and user specified sub models are memoized and their lifetimes are managed via
heuristics based on system memory and available resources.
All arguments to this constructor should be provided as keyword arguments, using this
constructor in any other fashion could result in breakage inbetween semver compatible
versions.
:param model_path: main model path
:param model_type: main model type
:param revision: main model revision
:param variant: main model variant
:param subfolder: main model subfolder (huggingface or disk)
:param dtype: main model dtype
:param unet_uri: main model UNet URI string
:param second_model_unet_uri: secondary model unet uri (SDXL Refiner, Stable Cascade decoder)
:param transformer_uri: Optional transformer URI string for specifying a specific Transformer,
currently this is only supported for Stable Diffusion 3 models.
:param vae_uri: main model VAE URI string
:param lora_uris: One or more LoRA URI strings
:param lora_fuse_scale: Optional global LoRA fuse scale value. Once all LoRAs are merged with
their individual scales, the merged weights will be fused into the pipeline at this scale.
The default value is 1.0.
:param image_encoder_uri: One or more Image Encoder URI strings,
Image Encoders are used with IP Adapters and Stable Cascade
:param ip_adapter_uris: One or more IP Adapter URI strings
:param textual_inversion_uris: One or more Textual Inversion URI strings
:param text_encoder_uris: One or more Text Encoder URIs
("+", or None for default. Or "null" indicating do not load) for the main model
:param second_model_text_encoder_uris: One or more Text Encoder URIs
("+", or None for default. Or "null" indicating do not load) for the secondary
model (SDXL Refiner or Stable Cascade decoder)
:param controlnet_uris: One or more ControlNet URI strings
:param t2i_adapter_uris: One or more T2IAdapter URI strings
:param sdxl_refiner_uri: SDXL Refiner model URI string
:param s_cascade_decoder_uri: Stable Cascade decoder URI string
:param quantizer_uri: Global --quantizer URI value
:param quantizer_map: Collection of pipeline submodule names to which quantization should be applied when
``quantizer_uri`` is provided. Valid values include: ``unet``, ``transformer``, ``text_encoder``,
``text_encoder_2``, ``text_encoder_3``. If ``None``, all supported modules will be quantized.
:param second_model_quantizer_uri: Global --second-model-quantizer URI value
:param second_model_quantizer_map: Collection of pipeline submodule names to which quantization should be
applied when ``second_model_quantizer_uri`` is provided. Valid values include: ``unet``,
``transformer``, ``text_encoder``, ``text_encoder_2``, ``text_encoder_3``.
If ``None``, all supported modules will be quantized.
:param device: Rendering device string, example: ``cuda:0`` or ``cuda``
:param safety_checker: Use safety checker model if available? (antiquated, for SD 1/2, Deep Floyd etc.)
:param original_config: Optional original LDM config .yaml file path when loading a single file checkpoint.
:param second_model_original_config: Optional original LDM config .yaml file path when loading a single file checkpoint
for the secondary model (SDXL Refiner, Stable Cascade Decoder).
:param auth_token: huggingface authentication token.
:param local_files_only: Do not attempt to download files from huggingface?
:param model_extra_modules: Raw extra diffusers modules for the main pipeline
:param second_model_extra_modules: Raw extra diffusers modules for the secondary pipeline (SDXL Refiner, Stable Cascade decoder)
:param model_cpu_offload: Use model CPU offloading for the main pipeline via the accelerate module?
:param model_sequential_offload: Use sequential CPU offloading for the main pipeline via the accelerate module?
:param second_model_cpu_offload: Use CPU offloading for the SDXL Refiner or Stable Cascade Decoder via the accelerate module?
:param second_model_sequential_offload: Use sequential CPU offloading for the SDXL Refiner or Stable Cascade Decoder via the accelerate module?
:param prompt_weighter_loader: Plugin loader for prompt weighter implementations, if you pass ``None`` a default instance will be created.
:param latents_processor_loader: Plugin loader for latents processor implementations, if you pass ``None`` a default instance will be created.
:param decoded_latents_image_processor_loader: Plugin loader for image processor implementations that process images decoded from incoming latents, if you pass ``None`` a default instance will be created.
:param adetailer_detector_uris: adetailer subject detection model URIs, specifying this argument indicates ``img2img`` mode implicitly,
the pipeline wrapper will accept a single image and perform the adetailer inpainting algorithm on it using the provided
detector URIs.
:param adetailer_crop_control_image: Should adetailer crop any provided ControlNet control image
in the same way that it crops the generated mask to the detection area? Otherwise,
use the full control image resized down to the size of the detection area. If you enable
this and your control image is not the same size as your input image, a warning will be
issued and resizing will be used instead of cropping.
:raises UnsupportedPipelineConfigError:
:raises InvalidModelUriError:
"""
# Normalize URI arguments - convert strings to lists where needed
lora_uris = self._normalize_uris(lora_uris)
ip_adapter_uris = self._normalize_uris(ip_adapter_uris)
textual_inversion_uris = self._normalize_uris(textual_inversion_uris)
text_encoder_uris = self._normalize_uris(text_encoder_uris)
second_model_text_encoder_uris = self._normalize_uris(second_model_text_encoder_uris)
controlnet_uris = self._normalize_uris(controlnet_uris)
t2i_adapter_uris = self._normalize_uris(t2i_adapter_uris)
adetailer_detector_uris = self._normalize_uris(adetailer_detector_uris)
# Check that model_path is provided
if model_path is None:
raise ValueError('model_path must be specified')
# Check for valid device string
if not _torchutil.is_valid_device_string(device):
raise _pipelines.UnsupportedPipelineConfigError(
f'Invalid device argument, {_torchutil.invalid_device_message(device, cap=False)}')
# Offload options should not be enabled simultaneously
if model_cpu_offload and model_sequential_offload:
raise _pipelines.UnsupportedPipelineConfigError(
'"model_cpu_offload" and "model_sequential_offload" may not be enabled simultaneously.'
)
if second_model_cpu_offload and second_model_sequential_offload:
raise _pipelines.UnsupportedPipelineConfigError(
'"second_model_cpu_offload" and "second_model_sequential_offload" '
'may not be enabled simultaneously.'
)
# Text encoder check
if not sdxl_refiner_uri and not s_cascade_decoder_uri:
if second_model_text_encoder_uris:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot use "second_model_text_encoder_uris" if "sdxl_refiner_uri" '
'or "s_cascade_decoder_uri" is not specified.'
)
# Incompatible combinations
if controlnet_uris and t2i_adapter_uris:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot use "controlnet_uris" and "t2i_adapter_uris" together.'
)
if image_encoder_uri and not ip_adapter_uris and model_type != _enums.ModelType.S_CASCADE:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot use "image_encoder_uri" without "ip_adapter_uris" '
'if "model_type" is not S_CASCADE.'
)
if not _hfhub.is_single_file_model_load(model_path):
if original_config:
raise _pipelines.UnsupportedPipelineConfigError(
'You cannot specify "original_config" when the main '
'model is not a a single file checkpoint.'
)
if second_model_original_config:
if not sdxl_refiner_uri and not s_cascade_decoder_uri:
raise _pipelines.UnsupportedPipelineConfigError(
'You cannot specify "second_model_original_config" '
'without "sdxl_refiner_uri" or "s_cascade_decoder_uri".'
)
if sdxl_refiner_uri and \
not _hfhub.is_single_file_model_load(
_uris.SDXLRefinerUri.parse(sdxl_refiner_uri).model):
raise _pipelines.UnsupportedPipelineConfigError(
'You cannot specify "second_model_original_config" '
'when the "sdxl_refiner_uri" model is not a '
'single file checkpoint.'
)
if s_cascade_decoder_uri and \
not _hfhub.is_single_file_model_load(
_uris.SCascadeDecoderUri.parse(s_cascade_decoder_uri).model):
raise _pipelines.UnsupportedPipelineConfigError(
'You cannot specify "second_model_original_config" '
'when the "s_cascade_decoder_uri" model is not a '
'single file checkpoint.'
)
if sdxl_refiner_uri is not None:
if not (_enums.model_type_is_sdxl(model_type) or
_enums.model_type_is_kolors(model_type)):
raise _pipelines.UnsupportedPipelineConfigError(
'Only Stable Diffusion XL models support refiners, '
'please use model_type "sdxl" if you are trying to load an sdxl model.'
)
if s_cascade_decoder_uri is not None:
if not _enums.model_type_is_s_cascade(model_type):
raise _pipelines.UnsupportedPipelineConfigError(
'Only Stable Cascade models support decoders, '
'please use model_type "s-cascade" if you are trying to load an Stable Cascade model.'
)
if transformer_uri:
if not _enums.model_type_is_sd3(model_type) and not _enums.model_type_is_flux(model_type):
raise _pipelines.UnsupportedPipelineConfigError(
'--transformer is only supported for --model-type sd3 and flux.')
if adetailer_detector_uris and model_type not in {
_enums.ModelType.SD,
_enums.ModelType.SDXL,
_enums.ModelType.KOLORS,
_enums.ModelType.SD3,
_enums.ModelType.FLUX,
_enums.ModelType.FLUX_FILL
}:
raise _pipelines.UnsupportedPipelineConfigError(
f'--adetailer-detectors is only compatible with '
f'--model-type sd, sdxl, kolors, sd3, and flux')
if quantizer_uri is not None:
try:
_uris.get_quantizer_uri_class(quantizer_uri)
except ValueError as e:
raise _pipelines.UnsupportedPipelineConfigError(str(e)) from e
if second_model_quantizer_uri is not None:
try:
_uris.get_quantizer_uri_class(second_model_quantizer_uri)
except ValueError as e:
raise _pipelines.UnsupportedPipelineConfigError(str(e)) from e
quantizer_map_vals = [
'unet',
'transformer',
'text_encoder',
'text_encoder_2',
'text_encoder_3',
'controlnet'
]
if quantizer_map is not None:
for map_value in quantizer_map:
if map_value not in quantizer_map_vals:
raise _pipelines.UnsupportedPipelineConfigError(
f'Unknown quantizer_map value: {map_value}, '
f'must be one of: {_textprocessing.oxford_comma(quantizer_map_vals, "or")}'
)
if second_model_quantizer_map is not None:
for map_value in second_model_quantizer_map:
if map_value not in quantizer_map_vals:
raise _pipelines.UnsupportedPipelineConfigError(
f'Unknown second_model_quantizer_map value: {map_value}, '
f'must be one of: {_textprocessing.oxford_comma(quantizer_map_vals, "or")}'
)
self._quantizer_uri = quantizer_uri
self._quantizer_map = quantizer_map
self._second_model_quantizer_uri = second_model_quantizer_uri
self._second_model_quantizer_map = second_model_quantizer_map
self._subfolder = subfolder
self._device = device
self._model_type = _enums.get_model_type_enum(model_type)
self._model_path = model_path
self._pipeline = None
self._revision = revision
self._variant = variant
self._dtype = _enums.get_data_type_enum(dtype)
self._unet_uri = unet_uri
self._second_model_unet_uri = second_model_unet_uri
self._transformer_uri = transformer_uri
self._image_encoder_uri = image_encoder_uri
self._vae_uri = vae_uri
self._safety_checker = safety_checker
self._original_config = original_config
self._second_model_original_config = second_model_original_config
self._second_model_cpu_offload = second_model_cpu_offload
self._second_model_sequential_offload = second_model_sequential_offload
self._lora_uris = lora_uris
self._lora_fuse_scale = lora_fuse_scale
self._ip_adapter_uris = ip_adapter_uris
self._textual_inversion_uris = textual_inversion_uris
self._text_encoder_uris = text_encoder_uris
self._second_model_text_encoder_uris = second_model_text_encoder_uris
self._controlnet_uris = controlnet_uris
self._t2i_adapter_uris = t2i_adapter_uris
self._parsed_controlnet_uris = []
self._parsed_t2i_adapter_uris = []
self._sdxl_refiner_pipeline = None
self._s_cascade_decoder_pipeline = None
self._auth_token = auth_token
self._pipeline_type = None
self._local_files_only = local_files_only
self._recall_main_pipeline = None
self._recall_secondary_pipeline = None
self._model_extra_modules = model_extra_modules
self._second_model_extra_modules = second_model_extra_modules
self._model_cpu_offload = model_cpu_offload
self._model_sequential_offload = model_sequential_offload
self._parsed_sdxl_refiner_uri = None
self._sdxl_refiner_uri = sdxl_refiner_uri
if sdxl_refiner_uri is not None:
# up front validation of this URI is optimal
self._parsed_sdxl_refiner_uri = _uris.SDXLRefinerUri.parse(sdxl_refiner_uri)
self._s_cascade_decoder_uri = s_cascade_decoder_uri
self._parsed_s_cascade_decoder_uri = None
if s_cascade_decoder_uri is not None:
# up front validation of this URI is optimal
self._parsed_s_cascade_decoder_uri = _uris.SCascadeDecoderUri.parse(s_cascade_decoder_uri)
self._parsed_ip_adapter_uris = None
if ip_adapter_uris:
# up front validation of these URIs is optimal
self._parsed_ip_adapter_uris = []
for ip_adapter_uri in ip_adapter_uris:
self._parsed_ip_adapter_uris.append(_uris.IPAdapterUri.parse(ip_adapter_uri))
self._prompt_weighter_loader = \
prompt_weighter_loader if prompt_weighter_loader is not None \
else _promptweighters.PromptWeighterLoader()
self._prompt_weighter_cache = dict()
self._latents_processor_loader = \
latents_processor_loader if latents_processor_loader is not None \
else _latentsprocessors.LatentsProcessorLoader()
self._decoded_latents_image_processor_loader = \
decoded_latents_image_processor_loader if decoded_latents_image_processor_loader is not None \
else _imageprocessors.ImageProcessorLoader()
self._adetailer_detector_uris = adetailer_detector_uris
self._parsed_adetailer_detector_uris = None
self._adetailer_crop_control_image = adetailer_crop_control_image
# Initialize inpaint crop info (used internally for crop/paste operations)
self._inpaint_crop_info = None
if adetailer_detector_uris:
self._parsed_adetailer_detector_uris = []
for adetailer_detector_uri in adetailer_detector_uris:
self._parsed_adetailer_detector_uris.append(
_uris.AdetailerDetectorUri.parse(adetailer_detector_uri))
# storage for determination of render width/height
self._inference_width = None
self._inference_height = None
@property
def prompt_weighter_loader(self) -> _promptweighters.PromptWeighterLoader:
"""
Current prompt weighter loader.
"""
return self._prompt_weighter_loader
@property
def latents_processor_loader(self) -> _latentsprocessors.LatentsProcessorLoader:
"""
Current latents processor loader.
"""
return self._latents_processor_loader
@property
def decoded_latents_image_processor_loader(self) -> _imageprocessors.ImageProcessorLoader:
"""
Current decoded latents image processor loader.
"""
return self._decoded_latents_image_processor_loader
@property
def local_files_only(self) -> bool:
"""
Currently set value for ``local_files_only``.
"""
return self._local_files_only
@property
def revision(self) -> _types.OptionalName:
"""
Currently set ``--revision`` for the main model or ``None``.
"""
return self._revision
@property
def safety_checker(self) -> bool:
"""
Safety checker enabled status.
"""
return self._safety_checker
@property
def variant(self) -> _types.OptionalName:
"""
Currently set ``--variant`` for the main model or ``None``.
"""
return self._variant
@property
def dtype(self) -> _enums.DataType:
"""
Currently set ``--dtype`` enum value for the main model.
"""
return self._dtype
@property
def dtype_string(self) -> str:
"""
Currently set ``--dtype`` string value for the main model.
"""
return _enums.get_data_type_string(self._dtype)
@property
def textual_inversion_uris(self) -> _types.OptionalUris:
"""
List of supplied ``--textual-inversions`` URI strings or an empty list.
"""
return list(self._textual_inversion_uris) if self._textual_inversion_uris else []
@property
def controlnet_uris(self) -> _types.OptionalUris:
"""
List of supplied ``--control-nets`` URI strings or an empty list.
"""
return list(self._controlnet_uris) if self._controlnet_uris else []
@property
def t2i_adapter_uris(self) -> _types.OptionalUris:
"""
List of supplied ``--t2i-adapters`` URI strings or an empty list.
"""
return list(self._t2i_adapter_uris) if self._t2i_adapter_uris else []
@property
def ip_adapter_uris(self) -> _types.OptionalUris:
"""
List of supplied ``--ip-adapters`` URI strings or an empty list.
"""
return list(self._ip_adapter_uris) if self._ip_adapter_uris else []
@property
def text_encoder_uris(self) -> _types.OptionalUris:
"""
List of supplied ``--text-encoders`` URI strings or an empty list.
"""
return list(self._text_encoder_uris) if self._text_encoder_uris else []
@property
def second_model_text_encoder_uris(self) -> _types.OptionalUris:
"""
List of supplied ``--second-model-text-encoders`` URI strings or an empty list.
"""
return list(self._second_model_text_encoder_uris) if self._second_model_text_encoder_uris else []
@property
def adetailer_detector_uris(self) -> _types.OptionalUris:
"""
List of supplied ``--adetailer-detectors`` URI strings or an empty list.
"""
return list(self._adetailer_detector_uris) if self._adetailer_detector_uris else []
@property
def adetailer_crop_control_image(self) -> bool:
"""
Should adetailer crop any provided control image in the same way that it crops the
generated mask to the detection area? Otherwise, use the full control image
resized down to the size of the detection area.
"""
return self._adetailer_crop_control_image
@adetailer_crop_control_image.setter
def adetailer_crop_control_image(self, value: bool):
"""
Should adetailer crop any provided control image in the same way that it crops the
generated mask to the detection area? Otherwise, use the full control image
resized down to the size of the detection area.
"""
self._adetailer_crop_control_image = value
@property
def device(self) -> _types.Name:
"""
Currently set ``--device`` string.
"""
return self._device
@property
def model_path(self) -> _types.Path:
"""
Model path for the main model.
"""
return self._model_path
@property
def sdxl_refiner_uri(self) -> _types.OptionalUri:
"""
Model URI for the SDXL refiner or ``None``.
"""
return self._sdxl_refiner_uri
@property
def s_cascade_decoder_uri(self) -> _types.OptionalUri:
"""
Model URI for the Stable Cascade decoder or ``None``.
"""
return self._s_cascade_decoder_uri
@property
def transformer_uri(self) -> _types.OptionalUri:
"""
Model URI for the SD3 Transformer or ``None``.
"""
return self._transformer_uri
@property
def model_type(self) -> _enums.ModelType:
"""
Currently set ``--model-type`` enum value.
"""
return self._model_type
@property
def model_type_string(self) -> str:
"""
Currently set ``--model-type`` string value.
"""
return _enums.get_model_type_string(self._model_type)
@property
def subfolder(self) -> _types.OptionalName:
"""
Selected model ``--subfolder`` for the main model, (remote repo subfolder or local) or ``None``.
"""
return self._subfolder
@property
def vae_uri(self) -> _types.OptionalUri:
"""
Selected ``--vae`` uri for the main model or ``None``.
"""
return self._vae_uri
@property
def image_encoder_uri(self) -> _types.OptionalUri:
"""
Selected ``--image-encoder`` uri for the main model or ``None``.
"""
return self._image_encoder_uri
@property
def unet_uri(self) -> _types.OptionalUri:
"""
Selected ``--unet`` uri for the main model or ``None``.
"""
return self._unet_uri
@property
def second_model_unet_uri(self) -> _types.OptionalUri:
"""
Selected ``--second-model-unet`` uri for the SDXL refiner or Stable Cascade decoder model or ``None``.
"""
return self._second_model_unet_uri
@property
def lora_uris(self) -> _types.OptionalUris:
"""
List of supplied ``--loras`` uri strings or an empty list.
"""
return list(self._lora_uris) if self._lora_uris else []
@property
def lora_fuse_scale(self) -> float:
"""
Supplied ``--lora-fuse-scale`` value.
"""
return self._lora_fuse_scale
@property
def auth_token(self) -> _types.OptionalString:
"""
Current ``--auth-token`` value or ``None``.
"""
return self._auth_token
@property
def model_sequential_offload(self) -> bool:
"""
Current ``--model-sequential-offload`` value.
"""
return self._model_sequential_offload
@property
def model_cpu_offload(self) -> bool:
"""
Current ``--model-cpu-offload`` value.
"""
return self._model_cpu_offload
@property
def second_model_sequential_offload(self) -> bool:
"""
Current ``--second-model-sequential-offload`` value.
"""
return self._second_model_sequential_offload
@property
def second_model_cpu_offload(self) -> bool:
"""
Current ``--second-model-cpu-offload`` value.
"""
return self._second_model_cpu_offload
@property
def quantizer_uri(self) -> _types.OptionalUri:
"""
Current ``--quantizer`` value.
"""
return self._quantizer_uri
@property
def quantizer_map(self) -> _types.OptionalStrings:
"""
Current ``--quantizer-map`` value.
"""
return list(self._quantizer_map) if self._quantizer_map is not None else None
@property
def second_model_quantizer_uri(self) -> _types.OptionalUri:
"""
Current ``--second-model-quantizer`` value.
"""
return self._second_model_quantizer_uri
@property
def second_model_quantizer_map(self) -> _types.OptionalStrings:
"""
Current ``--second-model-quantizer-map`` value.
"""
return list(self._second_model_quantizer_map) if self._second_model_quantizer_map is not None else None
@property
def original_config(self) -> _types.OptionalPath:
"""
Current ``--original-config`` value.
"""
return self._original_config
@property
def second_model_original_config(self) -> _types.OptionalPath:
"""
Current ``--second-model-original-config`` value.
"""
return self._second_model_original_config
[docs]
@staticmethod
def recall_last_used_main_pipeline() -> typing.Optional[_pipelines.PipelineCreationResult]:
"""
Return a reference to the last :py:class:`dgenerate.pipelinewrapper.pipelines.TorchPipelineCreationResult`
for the pipeline that successfully executed an image generation.
This may recreate the pipeline if it is not cached.
If no image generation has occurred, this will return ``None``.
:return: :py:class:`dgenerate.pipelinewrapper.pipelines.TorchPipelineCreationResult` or ``None``
"""
if DiffusionPipelineWrapper.__LAST_RECALL_PIPELINE is None:
return None
return DiffusionPipelineWrapper.__LAST_RECALL_PIPELINE()
[docs]
@staticmethod
def recall_last_used_secondary_pipeline() -> typing.Optional[_pipelines.PipelineCreationResult]:
"""
Return a reference to the last :py:class:`dgenerate.pipelinewrapper.pipelines.TorchPipelineCreationResult`
for the secondary pipeline (refiner / stable cascade decoder) that successfully executed an image generation.
This may recreate the pipeline if it is not cached.
If no image generation has occurred or no secondary pipeline has been called, this will return ``None``.
:return: :py:class:`dgenerate.pipelinewrapper.pipelines.TorchPipelineCreationResult` or ``None``
"""
if DiffusionPipelineWrapper.__LAST_RECALL_SECONDARY_PIPELINE is None:
return None
return DiffusionPipelineWrapper.__LAST_RECALL_SECONDARY_PIPELINE()
[docs]
def reconstruct_dgenerate_opts(self,
args: DiffusionArguments | None = None,
extra_opts:
collections.abc.Sequence[
tuple[str] | tuple[str, typing.Any]] | None = None,
omit_device: bool = False,
shell_quote: bool = True,
overrides: dict[str, typing.Any] = None) -> \
list[tuple[str] | tuple[str, typing.Any]]:
"""
Reconstruct dgenerate's command line arguments from a particular set of pipeline wrapper call arguments.
This does not reproduce ``--image-seeds``, you must include that value in ``extra_opts``,
this is because there is not enough information in :py:class:`.DiffusionArguments` to
accurately reproduce it.
:param args: :py:class:`.DiffusionArguments` object to take values from
:param extra_opts: Extra option pairs to be added to the end of reconstructed options,
this should be a sequence of tuples of length 1 (switch only) or length 2 (switch with args)
:param omit_device: Omit the ``--device`` option? For a shareable configuration it might not
make sense to include the device specification. And instead simply fallback to whatever
the default device is, which is generally ``cuda``
:param shell_quote: Shell quote and format the argument values? or return them raw.
:param overrides: pipeline wrapper keyword arguments, these will override values derived from
any :py:class:`.DiffusionArguments` object given to the *args* argument. See:
:py:class:`.DiffusionArguments.get_pipeline_wrapper_kwargs`
:return: List of tuples of length 1 or 2 representing the option
"""
import dgenerate.pipelinewrapper.argreconstruct as _a
return _a.reconstruct_dgenerate_opts(
self, args, extra_opts, omit_device, shell_quote, overrides
)
[docs]
def gen_dgenerate_config(self,
args: DiffusionArguments | None = None,
extra_opts:
collections.abc.Sequence[tuple[str] | tuple[str, typing.Any]] | None = None,
extra_comments: collections.abc.Iterable[str] | None = None,
omit_device: bool = False,
overrides: dict[str, typing.Any] = None):
"""
Generate a valid dgenerate config file with a single invocation that reproduces the
arguments associated with :py:class:`.DiffusionArguments`.
This does not reproduce ``--image-seeds``, you must include that value in ``extra_opts``,
this is because there is not enough information in :py:class:`.DiffusionArguments` to
accurately reproduce it.
:param args: :py:class:`.DiffusionArguments` object to take values from
:param extra_opts: Extra option pairs to be added to the end of reconstructed options
of the dgenerate invocation, this should be a sequence of tuples of length 1 (switch only)
or length 2 (switch with args)
:param extra_comments: Extra strings to use as comments after the initial
version check directive
:param omit_device: Omit the ``--device`` option? For a shareable configuration it might not
make sense to include the device specification. And instead simply fallback to whatever
the default device is, which is generally ``cuda``
:param overrides: pipeline wrapper keyword arguments, these will override values derived from
any :py:class:`.DiffusionArguments` object given to the *args* argument. See:
:py:class:`.DiffusionArguments.get_pipeline_wrapper_kwargs`
:return: The configuration as a string
"""
import dgenerate.pipelinewrapper.argreconstruct as _a
return _a.gen_dgenerate_config(
self, args, extra_opts, extra_comments, omit_device, overrides
)
[docs]
def gen_dgenerate_command(self,
args: DiffusionArguments | None = None,
extra_opts:
collections.abc.Sequence[tuple[str] | tuple[str, typing.Any]] | None = None,
omit_device: bool = False,
overrides: dict[str, typing.Any] = None):
"""
Generate a valid dgenerate command line invocation that reproduces the
arguments associated with :py:class:`.DiffusionArguments`.
This does not reproduce ``--image-seeds``, you must include that value in ``extra_opts``,
this is because there is not enough information in :py:class:`.DiffusionArguments` to
accurately reproduce it.
:param args: :py:class:`.DiffusionArguments` object to take values from
:param extra_opts: Extra option pairs to be added to the end of reconstructed options
of the dgenerate invocation, this should be a sequence of tuples of length 1 (switch only)
or length 2 (switch with args)
:param omit_device: Omit the ``--device`` option? For a shareable configuration it might not
make sense to include the device specification. And instead simply fallback to whatever
the default device is, which is generally ``cuda``
:param overrides: pipeline wrapper keyword arguments, these will override values derived from
any :py:class:`.DiffusionArguments` object given to the *args* argument. See:
:py:class:`.DiffusionArguments.get_pipeline_wrapper_kwargs`
:return: A string containing the dgenerate command line needed to reproduce this result.
"""
import dgenerate.pipelinewrapper.argreconstruct as _a
return _a.gen_dgenerate_command(
self, args, extra_opts, omit_device, overrides
)
@staticmethod
def _separate_images_and_tensors(items: _types.ImagesOrTensors | None) \
-> tuple[list[PIL.Image.Image] | None, list[torch.Tensor] | None]:
"""
Separate a sequence of images or tensors into separate sequences.
Note: The input should be homogeneous (all images or all tensors), but this method
can handle mixed inputs for validation purposes.
:param items: Sequence of PIL Images or torch Tensors (should be homogeneous), or None
:return: Tuple of (images, tensors) where each can be None if no items of that type exist
"""
if items is None:
return None, None
images, tensors = _mediainput.separate_images_and_tensors(items)
return images if images else None, tensors if tensors else None
def _validate_latent_channels(self, tensors: _types.Tensors):
"""
Validate that latent tensors have the correct number of channels for the current model type.
:param tensors: Sequence of tensors to validate
:raises UnsupportedPipelineConfigError: If tensors have incorrect number of channels
"""
if _enums.model_type_is_s_cascade(self.model_type):
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Cascade does not support accepting latents as input.'
)
# Get expected channels based on model type
if _enums.model_type_is_flux(self.model_type):
# Flux uses unpacked format [B, C, H, W] or [C, H, W] for external interface
# where C is 16 (64/4 from the internal packed format)
expected_channels = 16 # Flux models expect 16 channels in unpacked format
for i, tensor in enumerate(tensors):
if len(tensor.shape) not in (3, 4):
raise _pipelines.UnsupportedPipelineConfigError(
f'Invalid shape for Flux latents tensor at index {i}. '
f'Expected 3D [C, H, W] or 4D [B, C, H, W] tensor in unpacked format, '
f'but got shape {tensor.shape}'
)
channels = tensor.shape[1 - (4 - len(tensor.shape))] # Channel is at index 1 for 4D, 0 for 3D
if channels != expected_channels:
raise _pipelines.UnsupportedPipelineConfigError(
f'Invalid number of channels in Flux latents tensor at index {i}. '
f'Expected {expected_channels} channels in unpacked format, '
f'but got {channels} channels instead. Shape: {tensor.shape}'
)
elif _enums.model_type_is_sd3(self.model_type):
# SD3 uses 16 channels in latent space
expected_channels = self._pipeline.transformer.config.in_channels
for i, tensor in enumerate(tensors):
if len(tensor.shape) not in (3, 4): # Must be [C, H, W] or [B, C, H, W]
raise _pipelines.UnsupportedPipelineConfigError(
f'Invalid shape for SD3 latents tensor at index {i}. '
f'Expected 3D [C, H, W] or 4D tensor [B, C, H, W], but got shape {tensor.shape}'
)
channels = tensor.shape[1 - (4 - len(tensor.shape))]
if channels != expected_channels:
raise _pipelines.UnsupportedPipelineConfigError(
f'Invalid number of channels in SD3 latents tensor at index {i}. '
f'Expected {expected_channels} channels for model type "{self.model_type_string}", '
f'but got {channels} channels instead. Shape: {tensor.shape}'
)
else:
# Standard SD models use channels from VAE config
expected_channels = 4 # Default if not specified in config
if hasattr(self._pipeline.vae, 'config'):
if hasattr(self._pipeline.vae.config, 'latent_channels'):
expected_channels = self._pipeline.vae.config.latent_channels
# Some models use in_channels instead
elif hasattr(self._pipeline.vae.config, 'in_channels'):
expected_channels = self._pipeline.vae.config.in_channels
for i, tensor in enumerate(tensors):
if len(tensor.shape) not in (3, 4): # Must be [C, H, W] or [B, C, H, W]
raise _pipelines.UnsupportedPipelineConfigError(
f'Invalid shape for latents tensor at index {i}. '
f'Expected 3D [C, H, W] or 4D tensor [B, C, H, W], but got shape {tensor.shape}'
)
channels = tensor.shape[1 - (4 - len(tensor.shape))]
if channels != expected_channels:
raise _pipelines.UnsupportedPipelineConfigError(
f'Invalid number of channels in latents tensor at index {i}. '
f'Expected {expected_channels} channels for model type "{self.model_type_string}", '
f'but got {channels} channels instead. Shape: {tensor.shape}')
@staticmethod
def _validate_images_all_same_size(title, images):
first_image_size = images[0].size
# Check if all control images have the same size
for img in images[1:]:
if img.size != first_image_size:
raise _pipelines.UnsupportedPipelineConfigError(
f"All {title} must have the same dimension.")
def _resize_images_to_user_dimensions(
self,
images: _types.Images,
user_args: DiffusionArguments
) -> list[PIL.Image.Image]:
"""
Resize images to user-specified width and height using dgenerate's image resize utility.
:param images: List of PIL Images to resize
:param user_args: DiffusionArguments containing width and height
:return: List of resized PIL Images
"""
if not images:
return []
target_size = self._calc_image_target_size(images[0], user_args)
resized_images = []
for img in images:
new_size = _image.resize_image_calc(
old_size=img.size,
new_size=target_size,
aspect_correct=user_args.aspect_correct,
align=8)
if img.size != new_size:
img = _image.resize_image(img=img, size=new_size)
resized_images.append(img.convert('RGB'))
return resized_images
@staticmethod
def _process_ip_adapter_images(images: _types.OptionalImagesSequence):
"""
Align IP Adapter images by 8
:param images: sequence of image sequences
:return: processed array
"""
if not images:
return None
output = []
for img_s in images:
processed_images = []
for img in img_s:
new_size = _image.resize_image_calc(old_size=img.size, new_size=None, align=8)
if img.size != new_size:
img = _image.resize_image(img=img, size=new_size)
processed_images.append(img.convert('RGB'))
output.append(processed_images)
return output
@staticmethod
def _process_floyd_image(image: _types.ImageOrTensor):
"""
Align floyd image by 8
:param image: floyd image (maybe tensor)
:return: processed image, untouched tensor
"""
if not isinstance(image, torch.Tensor):
new_size = _image.resize_image_calc(old_size=image.size, new_size=None, align=8)
if image.size != new_size:
return _image.resize_image(image, size=None, align=8)
return image
def _apply_inpaint_crop(self,
images: list[PIL.Image.Image],
masks: list[PIL.Image.Image],
control_images: list[PIL.Image.Image] | None,
padding: int | tuple[int, int] | tuple[int, int, int, int],
decoded_latents: bool,
user_args: DiffusionArguments) \
-> tuple[
list[PIL.Image.Image],
list[PIL.Image.Image],
list[PIL.Image.Image] | None,
tuple[int, int, int, int]
]:
"""
Crop images, masks, and control images to mask bounds with padding.
:param images: List of images to crop
:param masks: List of masks to crop
:param control_images: Optional list of control images to crop
:param padding: Padding around mask bounds (left, top, right, bottom)
:param decoded_latents: Were ``images`` decoded from latents?
:param user_args: diffusion arguments for reference
:return: Tuple of (cropped_images, cropped_masks, cropped_control_images, crop_bounds)
"""
if not masks:
raise _pipelines.UnsupportedPipelineConfigError(
"Cannot apply inpaint crop without masks."
)
# Calculate bounds for the single mask
crop_bounds = _image.find_mask_bounds(masks[0], padding)
if crop_bounds is None:
raise _pipelines.UnsupportedPipelineConfigError(
"No white pixels found in mask for inpaint crop."
)
cropped_images = [images[0].crop(crop_bounds)]
cropped_masks = [masks[0].crop(crop_bounds)]
cropped_control_images = None
if control_images:
cropped_control_images = [control_images[0].crop(crop_bounds)]
# Process decoded images if processors are configured
# and they were decoded from latents
# this is here so we can honor processors pre-resize settings
# the cropped image is what we want to process
if decoded_latents and user_args.decoded_latents_image_processor_uris:
cropped_images = self._process_decoded_latents_images(
cropped_images,
user_args.decoded_latents_image_processor_uris,
user_args,
)
# Since we only allow single images with inpaint_crop,
# we'll always have exactly one set of bounds
return cropped_images, cropped_masks, cropped_control_images, crop_bounds
def _paste_inpaint_result(self,
original_images: list[PIL.Image.Image],
generated_images: list[PIL.Image.Image],
crop_bounds: tuple[int, int, int, int],
masks: list[PIL.Image.Image] = None,
feather: int = None) -> list[PIL.Image.Image]:
"""
Paste generated images back onto original images at crop bounds.
:param original_images: List of original uncropped images
:param generated_images: List of generated images to paste
:param crop_bounds: Bounds where to paste (left, top, right, bottom)
:param masks: Optional masks for masked pasting
:param feather: Optional feather value for feathered pasting
:return: List of images with generated content pasted back
"""
result_images = []
for i, generated in enumerate(generated_images):
# Since inpaint_crop doesn't support batching, map generated images to the single original
original = original_images[0]
background_image = original.copy()
# Use the single crop bounds for all generated images
# Resize generated image to fit the crop bounds
crop_size = (crop_bounds[2] - crop_bounds[0], crop_bounds[3] - crop_bounds[1])
if generated.size != crop_size:
_messages.debug_log(
f'Inpaint crop paste: Resizing generated image {i} from {generated.size} to {crop_size} for bounds {crop_bounds}')
resampling = _image.best_pil_resampling(generated.size, crop_size)
generated = generated.resize(crop_size, resampling)
if feather is not None:
# Use feathered pasting
background_image = _image.paste_with_feather(
background=background_image,
foreground=generated,
location=crop_bounds,
feather=feather,
shape='rectangle'
)
elif masks:
# Use masked pasting (single mask since we don't support batching)
mask = masks[0]
# Crop and resize mask to match generated image size
cropped_mask = mask.crop(crop_bounds)
if cropped_mask.size != crop_size:
mask_resampling = _image.best_pil_resampling(cropped_mask.size, crop_size)
cropped_mask = cropped_mask.resize(crop_size, mask_resampling)
# Convert to grayscale if needed
if cropped_mask.mode != 'L':
cropped_mask = cropped_mask.convert('L')
background_image.paste(generated, crop_bounds, cropped_mask)
else:
# Simple paste without transparency
background_image.paste(generated, crop_bounds)
result_images.append(background_image)
return result_images
def _set_pipeline_strength(self, user_args: DiffusionArguments, pipeline_args: dict[str, typing.Any]):
strength = float(_types.default(user_args.image_seed_strength, _constants.DEFAULT_IMAGE_SEED_STRENGTH))
ifs = int(_types.default(user_args.inference_steps, _constants.DEFAULT_INFERENCE_STEPS))
if (strength * ifs) < 1.0:
strength = 1.0 / ifs
_messages.warning(
f'image-seed-strength * inference-steps '
f'was calculated at < 1, image-seed-strength defaulting to (1.0 / inference-steps): {strength}'
)
pipeline_args['strength'] = strength
def _set_pipeline_controlnet_defaults(self, user_args: DiffusionArguments, pipeline_args: dict[str, typing.Any]):
control_images = user_args.control_images
if not control_images:
raise _pipelines.UnsupportedPipelineConfigError(
'Must provide control_images argument when using ControlNet models.')
# sanity check that control images are the same dimension
self._validate_images_all_same_size(
"control guidance images", control_images
)
# Resize control images to user-specified dimensions first thing
control_images = self._resize_images_to_user_dimensions(
control_images, user_args
)
image_arg_inputs = self._get_pipeline_img2img_inputs(user_args)
if image_arg_inputs is not None:
non_latent_input = not _torchutil.is_tensor(image_arg_inputs[0])
if non_latent_input:
if not image_arg_inputs[0].size == control_images[0].size:
raise _pipelines.UnsupportedPipelineConfigError(
'Img2Img images and ControlNet images must be equal in dimension.'
)
else:
if not self.get_decoded_latents_size(image_arg_inputs[0]) == control_images[0].size:
raise _pipelines.UnsupportedPipelineConfigError(
'Img2Img latents must decode to the same dimension as any provided ControlNet images.'
)
control_images_cnt = len(control_images)
controlnet_uris_cnt = len(self._controlnet_uris)
if control_images_cnt != controlnet_uris_cnt:
# User provided a mismatched number of ControlNet models and control_images, behavior is undefined.
raise _pipelines.UnsupportedPipelineConfigError(
f'You specified {control_images_cnt} control guidance images and '
f'only {controlnet_uris_cnt} ControlNet URIs. The amount of '
f'control guidance images must be equal to the amount of ControlNet URIs.')
else:
# set dimensions to match the control image
self._set_pipe_dimensions(
None, None,
control_images[0].width, control_images[0].height,
pipeline_args
)
sdxl_cn_union = _enums.model_type_is_sdxl(self._model_type) and \
any(p.mode is not None for p in self._parsed_controlnet_uris)
if self._pipeline_type == _enums.PipelineType.TXT2IMG:
if _enums.model_type_is_sd3(self._model_type):
# Handle SD3 model specifics for control images
pipeline_args['control_image'] = self._sd3_force_control_to_a16(
pipeline_args, control_images, user_args
)
elif _enums.model_type_is_flux(self._model_type):
pipeline_args['control_image'] = control_images
elif sdxl_cn_union:
# controlnet union pipeline does not use "image"
# it also destructively modifies
# this input value if it is a list for
# whatever reason
pipeline_args['control_image'] = list(control_images)
else:
pipeline_args['image'] = control_images
elif self._pipeline_type in {_enums.PipelineType.IMG2IMG, _enums.PipelineType.INPAINT}:
pipeline_args['image'] = image_arg_inputs
pipeline_args['control_image'] = control_images if not sdxl_cn_union else list(control_images)
self._set_pipeline_strength(user_args, pipeline_args)
mask_images = user_args.mask_images
if mask_images is not None:
# Resize mask images to user-specified dimensions (includes RGB conversion)
self._validate_images_all_same_size("inpaint mask images", mask_images)
mask_images = self._resize_images_to_user_dimensions(mask_images, user_args)
pipeline_args['mask_image'] = mask_images
def _set_pipeline_t2iadapter_defaults(self, user_args: DiffusionArguments, pipeline_args: dict[str, typing.Any]):
adapter_control_images = list(user_args.control_images)
if not adapter_control_images:
raise _pipelines.UnsupportedPipelineConfigError(
'Must provide control_images argument when using T2IAdapter models.')
control_images_cnt = len(adapter_control_images)
t2i_adapter_uris_cnt = len(self._t2i_adapter_uris)
if control_images_cnt != t2i_adapter_uris_cnt:
# User provided a mismatched number of T2IAdapter models and control_images, behavior is undefined.
raise _pipelines.UnsupportedPipelineConfigError(
f'You specified {control_images_cnt} control guidance images and '
f'only {t2i_adapter_uris_cnt} T2IAdapter URIs. The amount of '
f'control guidance images must be equal to the amount of T2IAdapter URIs.')
first_control_image_size = adapter_control_images[0].size
# Check if all control images have the same size
for img in adapter_control_images[1:]:
if img.size != first_control_image_size:
raise _pipelines.UnsupportedPipelineConfigError(
"All control guidance images must have the same dimension.")
# Resize control images to user-specified dimensions first thing
self._validate_images_all_same_size("T2I adapter control images", adapter_control_images)
adapter_control_images = self._resize_images_to_user_dimensions(adapter_control_images, user_args)
if not _image.is_aligned(first_control_image_size, 16):
# noinspection PyTypeChecker
new_size: tuple[int, int] = _image.align_by(first_control_image_size, 16)
_messages.warning(
f'T2I Adapter control image(s) of size {first_control_image_size} being forcefully '
f'aligned by 16 to {new_size} to prevent errors.'
)
for idx, img in enumerate(adapter_control_images):
adapter_control_images[idx] = _image.resize_image(img, new_size)
if _enums.model_type_is_sdxl(self.model_type) and user_args.sdxl_t2i_adapter_factor is not None:
pipeline_args['adapter_conditioning_factor'] = user_args.sdxl_t2i_adapter_factor
self._set_pipe_dimensions(
None, None,
adapter_control_images[0].width, adapter_control_images[0].height,
pipeline_args
)
if self._pipeline_type == _enums.PipelineType.TXT2IMG:
pipeline_args['image'] = adapter_control_images
else:
raise _pipelines.UnsupportedPipelineConfigError(
'T2IAdapter models only work in txt2img mode.'
)
def _get_pipeline_img2img_inputs(self, user_args: DiffusionArguments):
# Separate images and tensors but skip validation initially
images, img2img_latents = self._separate_images_and_tensors(user_args.images)
# Don't allow mixing images and tensors in the same input
if images and img2img_latents:
raise _pipelines.UnsupportedPipelineConfigError(
f'Cannot mix PIL Images and latents tensors in img2img inputs. '
f'All inputs must be either images or latents tensors, not both.'
)
# Process input tensors
if img2img_latents:
img2img_latents = self._process_input_latents(
"img2img", img2img_latents, user_args.img2img_latents_processors
)
# Resize input images to user-specified dimensions first thing
if images:
if not _enums.model_type_is_s_cascade(self._model_type):
self._validate_images_all_same_size('img2img images', images)
images = self._resize_images_to_user_dimensions(images, user_args)
if self._model_type != _enums.ModelType.UPSCALER_X2 and \
hasattr(self._pipeline, 'vae') and self._pipeline.vae is not None:
# we need to decode the latents into an image using the VAE for
# the best img2img result, passing already denoised latents
# in does not make sense to the receiving UNet/Transformer
# except in the case of the X2 latent upscaler, which can
# work with the already denoised latents
if img2img_latents and not (
_supports_native_denoising_start(self._pipeline.__class__)
and user_args.denoising_start is not None
and user_args.denoising_start > 0.0
):
if _enums.model_type_is_flux(self._model_type):
img2img_latents = self._repack_flux_latents(self._stack_latents(img2img_latents))
images = self.decode_latents(img2img_latents)
# Process decoded images if processors are configured (handles pre-resize, resize, post-resize)
images = self._process_decoded_latents_images(
images, user_args.decoded_latents_image_processor_uris, user_args
)
img2img_latents = None
# Use the final result (tensors or images)
if img2img_latents:
inputs = img2img_latents
else:
inputs = images
return inputs
# noinspection PyMethodMayBeStatic
def _aligned_8_user_dimensions(self, user_args: DiffusionArguments):
if user_args.height is not None:
if user_args.height % 8 != 0:
user_height = user_args.height - (user_args.height % 8)
else:
user_height = user_args.height
else:
user_height = None
if user_args.width is not None:
if user_args.width % 8 != 0:
user_width = user_args.width - (user_args.width % 8)
else:
user_width = user_args.width
else:
user_width = None
return user_width, user_height
def _set_pipe_dimensions(
self,
user_width: int | None,
user_height: int | None,
inference_width: int | None,
inference_height: int | None,
pipeline_args: dict | None = None
):
width = user_width if user_width is not None else inference_width
height = user_height if user_height is not None else inference_height
self._inference_width = width
self._inference_height = height
if pipeline_args is not None:
pipeline_args['width'] = width
pipeline_args['height'] = height
# noinspection PyUnresolvedReferences,PyTypeChecker
def _set_pipeline_img2img_defaults(self, user_args: DiffusionArguments, pipeline_args: dict[str, typing.Any]):
user_width, user_height = self._aligned_8_user_dimensions(user_args)
image_arg_inputs = self._get_pipeline_img2img_inputs(user_args)
non_latent_input = not _torchutil.is_tensor(image_arg_inputs[0])
# Calculate dimensions once for reuse
if not non_latent_input:
inference_width, inference_height = self.get_decoded_latents_size(image_arg_inputs[0])
else:
inference_width, inference_height = image_arg_inputs[0].width, image_arg_inputs[0].height
# Handle special model type configurations
floyd_og_image_needed = (self._pipeline_type == _enums.PipelineType.INPAINT and
_enums.model_type_is_floyd_ifs(self._model_type)
) or (self._model_type == _enums.ModelType.IFS_IMG2IMG)
if floyd_og_image_needed:
if user_args.floyd_image is None:
raise _pipelines.UnsupportedPipelineConfigError(
'must specify "floyd_image" to disambiguate this operation, '
'"floyd_image" being the output of a previous floyd stage.')
pipeline_args['original_image'] = image_arg_inputs
pipeline_args['image'] = user_args.floyd_image
self._set_pipe_dimensions(
user_width, user_height,
inference_width, inference_height
)
elif self._model_type == _enums.ModelType.S_CASCADE:
pipeline_args['images'] = image_arg_inputs
# Stable cascade output dimension will not be based on the image input for img2img
self._set_pipe_dimensions(
user_width, user_height,
_constants.DEFAULT_S_CASCADE_OUTPUT_WIDTH, _constants.DEFAULT_S_CASCADE_OUTPUT_HEIGHT
)
else:
pipeline_args['image'] = image_arg_inputs
# Set dimensions for general img2img case - will be used unless overridden later
self._set_pipe_dimensions(
user_width, user_height,
inference_width, inference_height
)
# Handle model-specific settings
def check_no_image_seed_strength():
if user_args.image_seed_strength is not None:
_messages.warning(
f'image_seed_strength is not supported by model_type '
f'"{_enums.get_model_type_string(self._model_type)}" in '
f'mode "{self._pipeline_type.name}" and is being ignored.'
)
def is_sd3_or_flux():
return _enums.model_type_is_sd3(self._model_type) or _enums.model_type_is_flux(self._model_type)
if _enums.model_type_is_upscaler(self._model_type):
if self._model_type == _enums.ModelType.UPSCALER_X4:
pipeline_args['noise_level'] = int(
_types.default(user_args.upscaler_noise_level, _constants.DEFAULT_X4_UPSCALER_NOISE_LEVEL)
)
check_no_image_seed_strength()
elif self._model_type in [_enums.ModelType.FLUX_FILL, _enums.ModelType.FLUX_KONTEXT]:
check_no_image_seed_strength()
elif self._model_type == _enums.ModelType.IFS:
if self._pipeline_type != _enums.PipelineType.INPAINT:
pipeline_args['noise_level'] = int(
_types.default(user_args.upscaler_noise_level, _constants.DEFAULT_FLOYD_SUPERRESOLUTION_NOISE_LEVEL)
)
check_no_image_seed_strength()
else:
pipeline_args['noise_level'] = int(
_types.default(user_args.upscaler_noise_level,
_constants.DEFAULT_FLOYD_SUPERRESOLUTION_INPAINT_NOISE_LEVEL)
)
self._set_pipeline_strength(user_args, pipeline_args)
elif self._model_type == _enums.ModelType.IFS_IMG2IMG:
pipeline_args['noise_level'] = int(
_types.default(user_args.upscaler_noise_level,
_constants.DEFAULT_FLOYD_SUPERRESOLUTION_IMG2IMG_NOISE_LEVEL)
)
self._set_pipeline_strength(user_args, pipeline_args)
elif not _enums.model_type_is_pix2pix(self._model_type) and self._model_type != _enums.ModelType.S_CASCADE:
self._set_pipeline_strength(user_args, pipeline_args)
else:
check_no_image_seed_strength()
# Handle mask images
mask_images = user_args.mask_images
if mask_images is not None:
self._validate_images_all_same_size('inpaint mask images', mask_images)
mask_images = self._resize_images_to_user_dimensions(mask_images, user_args)
images_size = (inference_width, inference_height)
if mask_images[0].size != images_size:
raise _pipelines.UnsupportedPipelineConfigError(
f'Image seed img2img images and inpaint masks must '
f'have the same dimension, got: {images_size}, '
f'and {mask_images[0].size} respectively.'
)
pipeline_args['mask_image'] = mask_images
if not (_enums.model_type_is_floyd(self._model_type) or is_sd3_or_flux()):
# Override dimensions for masked models
self._set_pipe_dimensions(
user_width, user_height,
inference_width, inference_height,
pipeline_args
)
# Handle adetailer (auto-generated masks)
if self._parsed_adetailer_detector_uris:
if not is_sd3_or_flux():
# Override dimensions for adetailer
self._set_pipe_dimensions(
user_width, user_height,
inference_width, inference_height,
pipeline_args
)
# Handle specific model types that need special dimension handling
if self._model_type == _enums.ModelType.SDXL_PIX2PIX:
self._set_pipe_dimensions(
user_width, user_height,
inference_width, inference_height,
pipeline_args
)
elif self._model_type == _enums.ModelType.UPSCALER_X2:
image_arg_inputs = list(image_arg_inputs)
pipeline_args['image'] = image_arg_inputs
if non_latent_input:
for idx, image in enumerate(image_arg_inputs):
if not _image.is_aligned(image.size, 64):
size = _image.align_by(image.size, 64)
_messages.warning(
f'Input image size {image.size} is not aligned by 64. '
f'Output dimensions will be forcefully aligned to 64: {size}.'
)
image_arg_inputs[idx] = _image.resize_image(image, size)
self._set_pipe_dimensions(
None, None,
image_arg_inputs[0].width, image_arg_inputs[0].height
)
elif self._model_type == _enums.ModelType.S_CASCADE:
# Validate output dimensions for stable cascade
if user_width and user_width > 0 and not (user_width % 128) == 0:
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Cascade requires an output dimension that is aligned by 128.')
if user_height and user_height > 0 and not (user_height % 128) == 0:
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Cascade requires an output dimension that is aligned by 128.')
# Override with cascade defaults and set both pipeline and inference dimensions
cascade_width = _types.default(user_width, _constants.DEFAULT_S_CASCADE_OUTPUT_WIDTH)
cascade_height = _types.default(user_height, _constants.DEFAULT_S_CASCADE_OUTPUT_HEIGHT)
self._set_pipe_dimensions(
None, None,
cascade_width, cascade_height,
pipeline_args
)
elif is_sd3_or_flux():
if _enums.model_type_is_sd3(self._model_type):
image_arg_inputs = list(image_arg_inputs)
pipeline_args['image'] = image_arg_inputs
if non_latent_input:
for idx, image in enumerate(image_arg_inputs):
if not _image.is_aligned(image.size, 16):
size = _image.align_by(image.size, 16)
_messages.warning(
f'Input image size {image.size} is not aligned by 16. '
f'Dimensions will be forcefully aligned to 16: {size}.'
)
image_arg_inputs[idx] = _image.resize_image(image, size)
inference_width = image_arg_inputs[0].width
inference_height = image_arg_inputs[0].height
if mask_images:
mask_images = list(mask_images)
pipeline_args['mask_image'] = mask_images
for idx, image in enumerate(mask_images):
if not _image.is_aligned(image.size, 16):
size = _image.align_by(image.size, 16)
_messages.warning(
f'Input mask image size {image.size} is not aligned by 16. '
f'Dimensions will be forcefully aligned to 16: {size}.'
)
mask_images[idx] = _image.resize_image(image, size)
inference_width = mask_images[0].width
inference_height = mask_images[0].height
self._set_pipe_dimensions(
None, None,
inference_width, inference_height,
pipeline_args
)
def _set_pipeline_txt2img_defaults(self, user_args: DiffusionArguments, pipeline_args: dict[str, typing.Any]):
width, height = self._aligned_8_user_dimensions(user_args)
if width != user_args.width:
_messages.warning('Forcing alignment of txt2img generation argument "width" to 8.')
if height != user_args.height:
_messages.warning('Forcing alignment of txt2img generation argument "height" to 8.')
if _enums.model_type_is_sdxl(self._model_type):
self._inference_height = _types.default(height, _constants.DEFAULT_SDXL_OUTPUT_HEIGHT)
self._inference_width = _types.default(width, _constants.DEFAULT_SDXL_OUTPUT_WIDTH)
elif _enums.model_type_is_kolors(self._model_type):
self._inference_height = _types.default(height, _constants.DEFAULT_KOLORS_OUTPUT_HEIGHT)
self._inference_width = _types.default(width, _constants.DEFAULT_KOLORS_OUTPUT_WIDTH)
elif _enums.model_type_is_floyd_if(self._model_type):
self._inference_height = _types.default(height, _constants.DEFAULT_FLOYD_IF_OUTPUT_HEIGHT)
self._inference_width = _types.default(width, _constants.DEFAULT_FLOYD_IF_OUTPUT_WIDTH)
elif self._model_type == _enums.ModelType.S_CASCADE:
self._inference_height = _types.default(height, _constants.DEFAULT_S_CASCADE_OUTPUT_HEIGHT)
self._inference_width = _types.default(width, _constants.DEFAULT_S_CASCADE_OUTPUT_WIDTH)
if not _image.is_aligned((self._inference_height, self._inference_width), 128):
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Cascade requires an output dimension that is aligned by 128.')
elif self._model_type == _enums.ModelType.SD3:
self._inference_height = _types.default(height, _constants.DEFAULT_SD3_OUTPUT_HEIGHT)
self._inference_width = _types.default(width, _constants.DEFAULT_SD3_OUTPUT_WIDTH)
if not _image.is_aligned((self._inference_height, self._inference_width), 16):
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Diffusion 3 requires an output dimension that is aligned by 16.')
elif self._model_type == _enums.ModelType.FLUX:
self._inference_height = _types.default(height, _constants.DEFAULT_FLUX_OUTPUT_HEIGHT)
self._inference_width = _types.default(width, _constants.DEFAULT_FLUX_OUTPUT_WIDTH)
else:
self._inference_height = _types.default(height, _constants.DEFAULT_OUTPUT_HEIGHT)
self._inference_width = _types.default(width, _constants.DEFAULT_OUTPUT_WIDTH)
self._set_pipe_dimensions(
None, None,
self._inference_width, self._inference_height,
pipeline_args
)
def _prepare_inpaint_crop(self, user_args: DiffusionArguments):
"""
Handle inpaint crop preparation including validation and tensor decoding.
:param user_args: user arguments to validate and potentially modify
"""
# Automatically enable inpaint crop if padding, feathering, or masking is specified
if not user_args.inpaint_crop and (
user_args.inpaint_crop_padding is not None or user_args.inpaint_crop_feather is not None or user_args.inpaint_crop_masked):
user_args.inpaint_crop = True
if not user_args.inpaint_crop:
return
if not user_args.aspect_correct:
raise _pipelines.UnsupportedPipelineConfigError(
'aspect_correct=False is not compatible with inpaint_crop=True.'
)
# Validate that inpaint crop has required inputs
if user_args.images is None or len(user_args.images) == 0:
raise _pipelines.UnsupportedPipelineConfigError(
"inpaint_crop requires images to be provided."
)
if user_args.mask_images is None or len(user_args.mask_images) == 0:
raise _pipelines.UnsupportedPipelineConfigError(
"inpaint_crop requires mask_images to be provided."
)
# Check that we're not outputting latents
if user_args.output_latents:
raise _pipelines.UnsupportedPipelineConfigError(
"inpaint_crop is not supported when outputting latents, only images are supported."
)
# Disallow batching multiple different images with inpaint_crop
# (batch_size > 1 is OK for generating variations of a single crop)
if len(user_args.images) > 1 or len(user_args.mask_images) > 1:
raise _pipelines.UnsupportedPipelineConfigError(
"inpaint_crop cannot be used with multiple input images. "
"Each image/mask pair should be processed individually for optimal cropping. "
"Consider processing one image at a time or disable inpaint_crop for batch processing. "
"Note: batch_size > 1 is supported for generating multiple variations of a single crop."
)
decoded_latents = False
# If images are tensors (latents), decode them with the VAE first
if not isinstance(user_args.images[0], PIL.Image.Image):
_messages.debug_log('Inpaint crop: decoding tensor inputs with VAE...')
# Check that we have a VAE for decoding
if not hasattr(self._pipeline, 'vae') or self._pipeline.vae is None:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot decode tensor inputs for inpaint_crop as the pipeline does not have a VAE. '
'Use images instead.'
)
# Handle Flux-specific repacking if needed
latents = user_args.images
if _enums.model_type_is_flux(self._model_type):
latents = self._repack_flux_latents(self._stack_latents(latents))
# Decode the latents to PIL Images
user_args.images = self.decode_latents(latents)
decoded_latents = True
# Apply the actual inpaint crop if we get here
_messages.debug_log('Applying inpaint crop...')
# Store references to original images before cropping
original_images = list(user_args.images)
original_masks = list(user_args.mask_images)
original_control_images = list(user_args.control_images) if user_args.control_images else None
# Get padding from user args or use default, normalize to (left, top, right, bottom)
if user_args.inpaint_crop_padding is not None:
raw_padding = user_args.inpaint_crop_padding
if isinstance(raw_padding, int):
# Same padding on all sides
padding = (raw_padding, raw_padding, raw_padding, raw_padding)
elif isinstance(raw_padding, tuple) and len(raw_padding) == 2:
# (horizontal, vertical) padding
padding = (raw_padding[0], raw_padding[1], raw_padding[0], raw_padding[1])
elif isinstance(raw_padding, tuple) and len(raw_padding) == 4:
# (left, top, right, bottom) padding
padding = raw_padding
else:
raise _pipelines.UnsupportedPipelineConfigError(
f"Invalid inpaint_crop_padding format: {raw_padding}. "
f"Expected int, tuple[int, int], or tuple[int, int, int, int]")
else:
# Default padding on all sides
padding = _constants.DEFAULT_INPAINT_CROP_PADDING
# Apply crop to get the cropped area
(cropped_images,
cropped_masks,
cropped_control_images,
crop_bounds) = self._apply_inpaint_crop(
images=original_images,
masks=original_masks,
control_images=original_control_images,
padding=padding,
decoded_latents=decoded_latents,
user_args=user_args
)
# Store crop info for later pasting
crop_info = _InpaintCropInfo(
original_images=original_images,
original_masks=original_masks,
crop_bounds=crop_bounds,
use_masked=user_args.inpaint_crop_masked,
feather=user_args.inpaint_crop_feather
)
self._inpaint_crop_info = crop_info
# Replace user_args with cropped images so all downstream logic handles them transparently
user_args.images = cropped_images
user_args.mask_images = cropped_masks
if cropped_control_images is not None:
user_args.control_images = cropped_control_images
_messages.debug_log(
f'Inpaint crop applied: {original_images[0].size if original_images else None} -> crop bounds {crop_bounds}')
def _get_pipeline_defaults(self, user_args: DiffusionArguments):
"""
Get a default arrangement of arguments to be passed to a huggingface
diffusers pipeline call that are somewhat universal.
:param user_args: user arguments to the pipeline wrapper
:return: kwargs dictionary
"""
self._inference_width = None
self._inference_height = None
# Apply inpaint crop if enabled and we have the necessary inputs
# This must happen first so all downstream methods work with cropped images
self._prepare_inpaint_crop(user_args)
pipeline_args: dict[str, typing.Any] = dict()
pipeline_args['guidance_scale'] = float(
_types.default(user_args.guidance_scale, _constants.DEFAULT_GUIDANCE_SCALE))
pipeline_args['num_inference_steps'] = int(
_types.default(user_args.inference_steps, _constants.DEFAULT_INFERENCE_STEPS))
# Create generator once and reuse it throughout
pipeline_args['generator'] = torch.Generator(device=self._device).manual_seed(
_types.default(user_args.seed, _constants.DEFAULT_SEED))
if self._controlnet_uris:
self._set_pipeline_controlnet_defaults(user_args, pipeline_args)
elif self._t2i_adapter_uris:
self._set_pipeline_t2iadapter_defaults(user_args, pipeline_args)
elif user_args.images is not None:
self._set_pipeline_img2img_defaults(user_args, pipeline_args)
else:
self._set_pipeline_txt2img_defaults(user_args, pipeline_args)
if user_args.latents:
# this uses 'width' and 'height' from pipeline_args as input
pipeline_args['latents'] = self._process_raw_input_latents(user_args, pipeline_args)
if user_args.ip_adapter_images:
user_args.ip_adapter_images = self._process_ip_adapter_images(user_args.ip_adapter_images)
if user_args.floyd_image is not None:
user_args.floyd_image = self._process_floyd_image(user_args.floyd_image)
return pipeline_args
def _process_raw_input_latents(self, user_args: DiffusionArguments,
pipeline_args: dict[str, typing.Any]) -> torch.Tensor:
"""
Process and validate incoming raw / noisy latents from ``latents``
:param user_args: Diffusion arguments
:return: Batched latents tensor
"""
latents = self._process_input_latents("raw", user_args.latents, user_args.latents_processors)
latents = self._stack_latents(latents)
decoded_latents_size = self.get_decoded_latents_size(latents)
expected_width = pipeline_args.get('width', None)
expected_height = pipeline_args.get('height', None)
if user_args.images:
if not _torchutil.is_tensor(user_args.images[0]):
expected_width, expected_height = _image.resize_image_calc(
user_args.images[0].size,
self._calc_image_target_size(user_args.images[0], user_args),
aspect_correct=user_args.aspect_correct,
align=8
)
else:
expected_width, expected_height = self.get_decoded_latents_size(user_args.images[0])
output_size_expected = (expected_width, expected_height)
if output_size_expected != decoded_latents_size:
raise _pipelines.UnsupportedPipelineConfigError(
f"Render width / height not compatible with "
f"given raw latents, output size: {_textprocessing.format_size(output_size_expected)}, "
f"latents decoded size: {_textprocessing.format_size(decoded_latents_size)}. This can "
f"be caused by an explicitly set width / height that is incorrect for the incoming raw "
f"latents, or a missmatch in the size of incoming img2img images / latents with the "
f"raw latents."
)
# Store dimensions for optimizations
self._inference_width = expected_width
self._inference_height = expected_height
if latents.dtype != self._pipeline.dtype:
_messages.debug_log(
f'Casting incoming raw latents from: {latents.dtype}, to: {self._pipeline.dtype}'
)
latents = latents.to(self._device, dtype=self._pipeline.dtype)
else:
latents = latents.to(self._device)
if _enums.model_type_is_flux(self._model_type):
latents = self._repack_flux_latents(latents)
return latents
@staticmethod
def _stack_latents(latents):
if latents[0].ndim == 4:
# List of [B, C, H, W] tensors - concatenate along batch dimension
latents = torch.cat(list(latents), dim=0) # [total_B, C, H, W]
elif latents[0].ndim == 3:
# List of [C, H, W] tensors - stack to create batch dimension
latents = torch.stack(list(latents), dim=0) # [num_tensors, C, H, W]
else:
raise _pipelines.UnsupportedPipelineConfigError(
f'Unsupported latents shape: {latents[0].shape}')
return latents
@staticmethod
def _sd3_force_control_to_a16(args, control_images, user_args):
processed_control_images = list(control_images)
for idx, img in enumerate(processed_control_images):
if not _image.is_aligned(img.size, 16):
# noinspection PyTypeChecker
size: tuple[int, int] = _image.align_by(img.size, 16)
if user_args.width:
if not (user_args.width % 16) == 0:
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Diffusion 3 requires an output dimension aligned by 16.')
if user_args.height:
if not (user_args.height % 16) == 0:
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Diffusion 3 requires an output dimension aligned by 16.')
args['width'] = size[0]
args['height'] = size[1]
_messages.warning(
f'Control image size {img.size} is not aligned by 16. '
f'Output dimensions will be forcefully aligned by 16: {size}.'
)
processed_control_images[idx] = _image.resize_image(img, size)
return processed_control_images
def _get_adapter_conditioning_scale(self):
if not self._parsed_t2i_adapter_uris:
return 1.0
return [p.scale for p in self._parsed_t2i_adapter_uris] if \
len(self._parsed_t2i_adapter_uris) > 1 else self._parsed_t2i_adapter_uris[0].scale
def _get_controlnet_conditioning_scale(self):
if not self._parsed_controlnet_uris:
return 1.0
return [p.scale for p in self._parsed_controlnet_uris] if \
len(self._parsed_controlnet_uris) > 1 else self._parsed_controlnet_uris[0].scale
def _get_controlnet_mode(self):
if not self._parsed_controlnet_uris:
return None
return [p.mode for p in self._parsed_controlnet_uris] if \
len(self._parsed_controlnet_uris) > 1 else self._parsed_controlnet_uris[0].mode
def _get_controlnet_guidance_start(self):
if not self._parsed_controlnet_uris:
return 0.0
return [p.start for p in self._parsed_controlnet_uris] if \
len(self._parsed_controlnet_uris) > 1 else self._parsed_controlnet_uris[0].start
def _get_controlnet_guidance_end(self):
if not self._parsed_controlnet_uris:
return 1.0
return [p.end for p in self._parsed_controlnet_uris] if \
len(self._parsed_controlnet_uris) > 1 else self._parsed_controlnet_uris[0].end
def _check_for_invalid_model_specific_opts(self, user_args: DiffusionArguments):
if not _enums.model_type_is_flux(self.model_type):
for arg, val in _types.get_public_attributes(user_args).items():
if arg.startswith('flux') and val is not None:
raise _pipelines.UnsupportedPipelineConfigError(
f'{arg} may only be used with Flux models.')
if not (_enums.model_type_is_sdxl(self.model_type) or
_enums.model_type_is_kolors(self.model_type)):
for arg, val in _types.get_public_attributes(user_args).items():
if arg.startswith('sdxl') and val is not None:
raise _pipelines.UnsupportedPipelineConfigError(
f'{arg} may only be used with SDXL models.')
if not _enums.model_type_is_sd3(self.model_type):
for arg, val in _types.get_public_attributes(user_args).items():
if arg.startswith('sd3') and val is not None:
raise _pipelines.UnsupportedPipelineConfigError(
f'{arg} may only be used with Stable Diffusion 3 models.')
if not _enums.model_type_is_s_cascade(self.model_type):
for arg, val in _types.get_public_attributes(user_args).items():
if arg.startswith('s_cascade') and val is not None:
raise _pipelines.UnsupportedPipelineConfigError(
f'{arg} may only be used with Stable Cascade models.')
@staticmethod
def _set_prompt_weighter_extra_supported_args(
pipeline_args: dict,
prompt_weighter: _promptweighters.PromptWeighter | None,
diffusion_args: DiffusionArguments,
second_model: bool,
) -> list[str]:
if prompt_weighter is None:
return []
poppable_args = []
second_prompt_arg = 'second_prompt' if not second_model else 'second_model_second_prompt'
arg_map = {
'prompt_2': second_prompt_arg,
'negative_prompt_2': second_prompt_arg,
'prompt_3': 'third_prompt',
'negative_prompt_3': 'third_prompt',
'clip_skip': 'clip_skip'
}
prompt_weighter_extra_args = prompt_weighter.get_extra_supported_args()
for arg_name in prompt_weighter_extra_args:
if arg_name not in arg_map:
raise RuntimeError(
f'Prompt weighter plugin: {prompt_weighter.__class__.__name__}, '
f'returned invalid "get_extra_supported_args()" value: {arg_name}. '
f'This is a bug, acceptable values are: {", ".join(arg_map.keys())}')
source = arg_map[arg_name]
if 'negative' in arg_name:
user_value = getattr(diffusion_args, source, None)
if user_value:
pipeline_args[arg_name] = user_value.negative
poppable_args.append(arg_name)
elif 'prompt' in arg_name:
user_value = getattr(diffusion_args, source, None)
if user_value:
pipeline_args[arg_name] = user_value.positive
poppable_args.append(arg_name)
else:
user_value = getattr(diffusion_args, source, None)
if user_value:
pipeline_args[arg_name] = user_value
poppable_args.append(arg_name)
return poppable_args
def _set_non_universal_pipeline_arg(self,
pipeline,
pipeline_args: dict,
user_args: DiffusionArguments,
pipeline_arg_name: str,
user_arg_name: str,
option_name: str,
transform: typing.Callable[
[typing.Any], typing.Any] = None):
pipeline_kwargs = user_args.get_pipeline_wrapper_kwargs()
if pipeline.__call__.__wrapped__ is not None:
# torch.no_grad()
func = pipeline.__call__.__wrapped__
else:
func = pipeline.__call__
if pipeline_arg_name in inspect.getfullargspec(func).args:
if user_arg_name in pipeline_kwargs:
# Only provide if the user provided the option
# otherwise, defer to the pipelines default value
val = getattr(user_args, user_arg_name)
val = val if not transform else transform(val)
pipeline_args[pipeline_arg_name] = val
else:
if pipeline_arg_name in pipeline_args:
# we are forcing it to be allowed.
return
val = _types.default(getattr(user_args, user_arg_name), None)
if val is not None:
raise _pipelines.UnsupportedPipelineConfigError(
f'{option_name} cannot be used with --model-type "{self.model_type_string}" in '
f'{_enums.get_pipeline_type_string(self._pipeline_type)} mode with the current '
f'combination of arguments and model.')
def _get_sdxl_conditioning_args(self, pipeline, pipeline_args: dict, user_args: DiffusionArguments,
user_prefix=None):
if user_prefix:
user_prefix += '_'
option_prefix = _textprocessing.dashup(user_prefix)
else:
user_prefix = ''
option_prefix = ''
def _hw_swizzle(x):
return x[1], x[0]
self._set_non_universal_pipeline_arg(pipeline, pipeline_args, user_args,
'aesthetic_score', f'sdxl_{user_prefix}aesthetic_score',
f'--sdxl-{option_prefix}aesthetic-scores')
self._set_non_universal_pipeline_arg(pipeline, pipeline_args, user_args,
'original_size', f'sdxl_{user_prefix}original_size',
f'--sdxl-{option_prefix}original-sizes', _hw_swizzle)
self._set_non_universal_pipeline_arg(pipeline, pipeline_args, user_args,
'target_size', f'sdxl_{user_prefix}target_size',
f'--sdxl-{option_prefix}target-sizes', _hw_swizzle)
self._set_non_universal_pipeline_arg(pipeline, pipeline_args, user_args,
'crops_coords_top_left',
f'sdxl_{user_prefix}crops_coords_top_left',
f'--sdxl-{option_prefix}crops-coords-top-left')
self._set_non_universal_pipeline_arg(pipeline, pipeline_args, user_args,
'negative_aesthetic_score',
f'sdxl_{user_prefix}negative_aesthetic_score',
f'--sdxl-{option_prefix}negative-aesthetic-scores')
self._set_non_universal_pipeline_arg(pipeline, pipeline_args, user_args,
'negative_original_size',
f'sdxl_{user_prefix}negative_original_size',
f'--sdxl-{option_prefix}negative-original-sizes', _hw_swizzle)
self._set_non_universal_pipeline_arg(pipeline, pipeline_args, user_args,
'negative_target_size',
f'sdxl_{user_prefix}negative_target_size',
f'--sdxl-{option_prefix}negative-target-sizes', _hw_swizzle)
self._set_non_universal_pipeline_arg(pipeline, pipeline_args, user_args,
'negative_crops_coords_top_left',
f'sdxl_{user_prefix}negative_crops_coords_top_left',
f'--sdxl-{option_prefix}negative-crops-coords-top-left')
@staticmethod
def _pop_sdxl_conditioning_args(pipeline_args):
pipeline_args.pop('aesthetic_score', None)
pipeline_args.pop('target_size', None)
pipeline_args.pop('original_size', None)
pipeline_args.pop('crops_coords_top_left', None)
pipeline_args.pop('negative_aesthetic_score', None)
pipeline_args.pop('negative_target_size', None)
pipeline_args.pop('negative_original_size', None)
pipeline_args.pop('negative_crops_coords_top_left', None)
def _unpack_flux_latents(self,
latents: torch.Tensor,
height: int | None = None,
width: int | None = None) -> torch.Tensor:
"""
Unpack Flux latents from internal packed format [B, L, C] to external unpacked format [B, C, H, W].
This method converts from the packed sequence format that Flux pipelines use internally
to the standard spatial format used as the external interface.
:param latents: Input latents in packed shape [B, L, C] or [L, C]
:param height: Optional target height, will use default if not specified
:param width: Optional target width, will use default if not specified
:return: Unpacked latents in shape [B, C, H, W]
"""
# Add batch dimension if needed
if len(latents.shape) == 2: # If [L, C] add batch dimension
latents = latents.unsqueeze(0)
# Calculate dimensions
height = height or self._pipeline.default_sample_size * self._pipeline.vae_scale_factor
width = width or self._pipeline.default_sample_size * self._pipeline.vae_scale_factor
# VAE applies 8x compression on images, but we must also account for packing which requires
# latent height and width to be divisible by 2
height = 2 * (int(height) // (self._pipeline.vae_scale_factor * 2))
width = 2 * (int(width) // (self._pipeline.vae_scale_factor * 2))
# Unpack from [B, L, C] to [B, C, H, W]
batch_size, num_patches, channels = latents.shape
latents = latents.view(batch_size, height // 2, width // 2, channels // 4, 2, 2)
latents = latents.permute(0, 3, 1, 4, 2, 5)
latents = latents.reshape(batch_size, channels // (2 * 2), height, width)
return latents
@staticmethod
def _repack_flux_latents(latents: torch.Tensor) -> torch.Tensor:
"""
Repack Flux latents from external unpacked format [B, C, H, W] to internal packed format [B, L, C].
This method converts from the standard spatial format used as the external interface
to the packed sequence format that Flux pipelines expect internally.
This is the inverse operation of _unpack_flux_latents.
:param latents: Input latents in unpacked shape [B, C, H, W]
:return: Repacked latents in shape [B, L, C]
"""
batch_size, channels, height, width = latents.shape
# Repack from [B, C, H, W] to [B, L, C]
# This reverses the operations in _unpack_flux_latents
latents = latents.reshape(batch_size, channels, height // 2, 2, width // 2, 2)
latents = latents.permute(0, 2, 4, 1, 3, 5)
latents = latents.reshape(batch_size, (height // 2) * (width // 2), channels * 4)
return latents
def _call_torch_flux(self, pipeline_args, user_args: DiffusionArguments):
self._check_for_invalid_model_specific_opts(user_args)
if user_args.clip_skip is not None and user_args.clip_skip > 0:
raise _pipelines.UnsupportedPipelineConfigError('Flux does not support clip skip.')
prompt: _prompt.Prompt = _types.default(user_args.prompt, _prompt.Prompt())
prompt_2: _prompt.Prompt = _types.default(user_args.second_prompt, _prompt.Prompt())
pipeline_args['prompt'] = prompt.positive if prompt.positive else ''
pipeline_args['prompt_2'] = prompt_2.positive if prompt_2.positive else None
if inspect.signature(self._pipeline.__call__).parameters.get('negative_prompt') is None:
if prompt.negative:
_messages.warning(
'Flux is ignoring the provided negative prompt as it '
'does not support negative prompting in the current configuration.'
)
if prompt_2.negative:
_messages.warning(
'Flux is ignoring the provided second negative prompt as it '
'does not support negative prompting in the current configuration.'
)
else:
pipeline_args['negative_prompt'] = prompt.negative if prompt.negative else None
pipeline_args['negative_prompt_2'] = prompt_2.negative if prompt_2.negative else None
if user_args.max_sequence_length is not None:
pipeline_args['max_sequence_length'] = user_args.max_sequence_length
batch_size = _types.default(user_args.batch_size, 1)
# Adjust batch size to match raw latents if provided
if 'latents' in pipeline_args:
latents_batch_size = pipeline_args['latents'].shape[0]
if latents_batch_size != batch_size:
batch_size = latents_batch_size
if user_args.batch_size is not None:
# only warn if the user specified a value
_messages.warning(
f'Setting --batch-size to {batch_size} because '
f'raw latents batch size did not match the specified batch size.'
)
if user_args.images:
if batch_size % len(user_args.images) != 0:
batch_size = len(user_args.images)
if user_args.batch_size is not None:
# only warn if the user specified a value
_messages.warning(
f'Setting --batch-size to {batch_size} because '
f'given batch size did not divide evenly with the '
f'provided number of input images.'
)
pipeline_args['num_images_per_prompt'] = batch_size
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'ip_adapter_image', 'ip_adapter_images',
'IP Adapter images')
self._set_non_universal_pipeline_arg(
self._pipeline,
pipeline_args, user_args,
'sigmas', 'sigmas',
'--sigmas',
transform=functools.partial(
self._sigmas_eval,
'primary',
self._pipeline,
_types.default(
user_args.inference_steps,
_constants.DEFAULT_INFERENCE_STEPS)
))
if hasattr(self._pipeline, 'controlnet'):
pipeline_args['controlnet_conditioning_scale'] = \
self._get_controlnet_conditioning_scale()
pipeline_args['control_guidance_start'] = \
self._get_controlnet_guidance_start()
pipeline_args['control_guidance_end'] = \
self._get_controlnet_guidance_end()
pipeline_args['control_mode'] = \
self._get_controlnet_mode()
prompt_weighter = self._get_prompt_weighter(user_args)
self._set_prompt_weighter_extra_supported_args(
pipeline_args=pipeline_args,
prompt_weighter=prompt_weighter,
diffusion_args=user_args,
second_model=False
)
with _teacache_flux.teacache_context(
self._pipeline,
user_args.inference_steps,
rel_l1_thresh=_types.default(
user_args.tea_cache_rel_l1_threshold,
_constants.DEFAULT_TEA_CACHE_REL_L1_THRESHOLD
),
enable=_types.default(user_args.tea_cache, False),
), _sada_context(
self._pipeline,
width=self._inference_width,
height=self._inference_height,
enabled=user_args.sada,
**self._get_sada_args(user_args)
), _denoise_range(
self._pipeline,
user_args.denoising_start,
user_args.denoising_end
):
output_type = 'latent' if user_args.output_latents else 'pil'
if self._parsed_adetailer_detector_uris:
return self._call_asdff(
user_args=user_args,
pipeline_args=pipeline_args,
batch_size=batch_size,
prompt_weighter=prompt_weighter
)
else:
pipeline_output = _pipelines.call_pipeline(
pipeline=self._pipeline,
prompt_weighter=prompt_weighter,
device=self._device,
output_type=output_type,
**pipeline_args
)
return self._create_pipeline_result(pipeline_output, output_type, user_args, pipeline_args)
def _call_asdff(self,
user_args: DiffusionArguments,
prompt_weighter: _promptweighters.PromptWeighter,
pipeline_args: dict[str, typing.Any],
batch_size: int):
asdff_pipe = _asdff_base.AdPipelineBase(self._pipeline)
# use the provided pipe as is, it must be
# some sort of inpainting pipe
asdff_pipe.auto_detect_pipe = False
# should we crop any control image the same way that we crop the mask?
asdff_pipe.crop_control_image = self._adetailer_crop_control_image
asdff_output = None
for detector_uri in self._parsed_adetailer_detector_uris:
input_images = pipeline_args['image'] if asdff_output is None else asdff_output.images
input_images *= (batch_size // len(input_images))
model_masks = _types.default(user_args.adetailer_model_masks, _constants.DEFAULT_ADETAILER_MODEL_MASKS)
if detector_uri.model_masks is not None:
model_masks = detector_uri.model_masks
_messages.log(f'Overriding global adetailer model-masks '
f'value with adetailer detector URI value: {model_masks}')
mask_blur = int(_types.default(user_args.adetailer_mask_blur, _constants.DEFAULT_ADETAILER_MASK_BLUR))
if detector_uri.mask_blur is not None:
mask_blur = detector_uri.mask_blur
_messages.log(f'Overriding global adetailer mask-blur '
f'value with adetailer detector URI value: {mask_blur}')
mask_dilation = int(
_types.default(user_args.adetailer_mask_dilation, _constants.DEFAULT_ADETAILER_MASK_DILATION))
if detector_uri.mask_dilation is not None:
mask_dilation = detector_uri.mask_dilation
_messages.log(f'Overriding global adetailer mask-dilation '
f'value with adetailer detector URI value: {mask_dilation}')
mask_padding = _types.default(user_args.adetailer_mask_padding, _constants.DEFAULT_ADETAILER_MASK_PADDING)
if detector_uri.mask_padding is not None:
mask_padding = detector_uri.mask_padding
_messages.log(f'Overriding global adetailer mask-dilation '
f'value with adetailer detector URI value: {mask_dilation}')
detector_padding = _types.default(user_args.adetailer_detector_padding,
_constants.DEFAULT_ADETAILER_DETECTOR_PADDING)
if detector_uri.detector_padding is not None:
detector_padding = detector_uri.detector_padding
_messages.log(f'Overriding global adetailer detector-padding '
f'value with adetailer detector URI value: {detector_padding}')
mask_shape = str(_types.default(user_args.adetailer_mask_shape, _constants.DEFAULT_ADETAILER_MASK_SHAPE))
if detector_uri.mask_shape is not None:
mask_shape = detector_uri.mask_shape
_messages.log(f'Overriding global adetailer mask-shape '
f'value with adetailer detector URI value: {mask_shape}')
index_filter = _types.default(user_args.adetailer_index_filter, None)
if detector_uri.index_filter is not None:
index_filter = detector_uri.index_filter
_messages.log(f'Overriding global adetailer index-filter '
f'value with adetailer detector URI value: {index_filter}')
class_filter = _types.default(user_args.adetailer_class_filter, None)
if detector_uri.class_filter is not None:
class_filter = detector_uri.class_filter
_messages.log(f'Overriding global adetailer class-filter '
f'value with adetailer detector URI value: {class_filter}')
if detector_uri.prompt is not None:
pipeline_args['prompt'] = detector_uri.prompt
_messages.log(f'Overriding global positive prompt '
f'value with adetailer detector URI value: "{detector_uri.prompt}"')
if detector_uri.negative_prompt is not None:
pipeline_args['negative_prompt'] = detector_uri.negative_prompt
_messages.log(f'Overriding global negative prompt '
f'value with adetailer detector URI value: "{detector_uri.negative_prompt}"')
processing_size = user_args.adetailer_size
if detector_uri.size is not None:
processing_size = detector_uri.size
_messages.log(f'Overriding global adetailer size '
f'value with adetailer detector URI value: {detector_uri.size}')
asdff_output = asdff_pipe(
pipeline_args=pipeline_args,
model_path=detector_uri.get_model_path(
local_files_only=self._local_files_only, use_auth_token=self._auth_token),
images=input_images,
device=self._device,
detector_device=_types.default(detector_uri.device, self._device),
confidence=detector_uri.confidence,
prompt_weighter=prompt_weighter,
index_filter=index_filter,
class_filter=class_filter,
mask_blur=mask_blur,
mask_shape=mask_shape,
detector_padding=detector_padding,
mask_padding=mask_padding,
mask_dilation=mask_dilation,
model_masks=model_masks,
processing_size=processing_size
)
return self._create_pipeline_result(asdff_output, user_args=user_args, pipeline_kwargs=pipeline_args)
def _call_torch_s_cascade(self, pipeline_args, user_args: DiffusionArguments):
self._check_for_invalid_model_specific_opts(user_args)
if user_args.clip_skip is not None and user_args.clip_skip > 0:
prompt_weighter_name = getattr(user_args, 'prompt_weighter', None)
if not prompt_weighter_name:
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Cascade only supports clip skip through '
'prompt weighters (such as compel or sd-embed).')
if user_args.sigmas is not None:
raise _pipelines.UnsupportedPipelineConfigError('Stable Cascade does not support sigmas.')
prompt: _prompt.Prompt = _types.default(user_args.prompt, _prompt.Prompt())
pipeline_args['prompt'] = prompt.positive if prompt.positive else ''
pipeline_args['negative_prompt'] = prompt.negative
pipeline_args['num_images_per_prompt'] = _types.default(user_args.batch_size, 1)
prompt_weighter = self._get_prompt_weighter(user_args)
self._set_prompt_weighter_extra_supported_args(
pipeline_args=pipeline_args,
prompt_weighter=prompt_weighter,
diffusion_args=user_args,
second_model=False
)
prior = _pipelines.call_pipeline(
pipeline=self._pipeline,
device=self._device,
prompt_weighter=prompt_weighter,
**pipeline_args)
pipeline_args['num_inference_steps'] = user_args.second_model_inference_steps
pipeline_args['guidance_scale'] = user_args.second_model_guidance_scale
pipeline_args.pop('height', None)
pipeline_args.pop('width', None)
pipeline_args.pop('images', None)
if self._parsed_s_cascade_decoder_uri.dtype is not None:
image_embeddings = prior.image_embeddings.to(
_enums.get_torch_dtype(self._parsed_s_cascade_decoder_uri.dtype))
else:
image_embeddings = prior.image_embeddings
if user_args.second_model_prompt:
prompt: _prompt.Prompt = user_args.second_model_prompt
pipeline_args['prompt'] = prompt.positive if prompt.positive else ''
pipeline_args['negative_prompt'] = prompt.negative
pipeline_args.pop('num_images_per_prompt')
output_type = 'latent' if user_args.output_latents else 'pil'
pipeline_output = _pipelines.call_pipeline(
image_embeddings=image_embeddings,
pipeline=self._s_cascade_decoder_pipeline,
device=self._device,
prompt_weighter=self._get_second_model_prompt_weighter(user_args),
output_type=output_type,
**pipeline_args)
return self._create_pipeline_result(pipeline_output, output_type, user_args, pipeline_args)
@staticmethod
def _flux_sigmas_calculate_shift(
image_seq_len,
base_seq_len: int = 256,
max_seq_len: int = 4096,
base_shift: float = 0.5,
max_shift: float = 1.15,
):
# mu calculation for use_dynamic_shifting=True with Flux
# This code comes from the Flux pipelines
m = (max_shift - base_shift) / (max_seq_len - base_seq_len)
b = base_shift - m * base_seq_len
mu = image_seq_len * m + b
return mu
@staticmethod
def _sigmas_eval(model_title: str, pipeline, steps: int, val: str | list):
accept_sigmas = "sigmas" in set(
inspect.signature(pipeline.scheduler.set_timesteps).parameters.keys()
)
if not accept_sigmas:
raise _pipelines.UnsupportedPipelineConfigError(
f'The current {model_title} model scheduler "{pipeline.scheduler.__class__.__name__}" '
f'does not support custom sigmas schedules. Please ensure that '
f'you are using a supported scheduler.'
)
if not isinstance(val, str):
return val
try:
if pipeline.__class__.__name__.startswith('Flux'):
# This code comes from the Flux pipelines
mu = DiffusionPipelineWrapper._flux_sigmas_calculate_shift(
pipeline.transformer.config.in_channels // 4, # latents.shape[1]
pipeline.scheduler.config.get("base_image_seq_len", 256),
pipeline.scheduler.config.get("max_image_seq_len", 4096),
pipeline.scheduler.config.get("base_shift", 0.5),
pipeline.scheduler.config.get("max_shift", 1.15),
)
pipeline.scheduler.set_timesteps(steps, mu=mu)
else:
pipeline.scheduler.set_timesteps(steps)
except Exception as e:
raise _pipelines.UnsupportedPipelineConfigError(
f'Custom sigmas not supported for the {model_title} model and scheduler combination.'
) from e
try:
sigmas = pipeline.scheduler.sigmas
except AttributeError as e:
raise _pipelines.UnsupportedPipelineConfigError(
f'Selected {model_title} model scheduler '
f'{pipeline.scheduler.__class__.__name__} did not produce sigmas.'
) from e
interpreter = _eval.standard_interpreter(
symtable=_eval.safe_builtins()
)
interpreter.symtable['np'] = numpy
interpreter.symtable['sigmas'] = numpy.array(sigmas)
try:
val = interpreter.eval(val, show_errors=False, raise_errors=True)
except Exception as e:
raise _pipelines.UnsupportedPipelineConfigError(
f'Error interpreting sigmas expression "{val}":\n{e}'
)
if not isinstance(val, collections.abc.Iterable):
raise _pipelines.UnsupportedPipelineConfigError(
f'Sigmas expression for the {model_title} model '
f'did not evaluate to an array, got: {val}'
)
else:
return list(val)
def _call_torch(self, pipeline_args, user_args: DiffusionArguments):
self._check_for_invalid_model_specific_opts(user_args)
prompt: _prompt.Prompt = _types.default(user_args.prompt, _prompt.Prompt())
pipeline_args['prompt'] = prompt.positive if prompt.positive else ''
pipeline_args['negative_prompt'] = prompt.negative
self._get_sdxl_conditioning_args(self._pipeline, pipeline_args, user_args)
prompt_weighter = self._get_prompt_weighter(user_args)
prompt_weighter_pop_args = self._set_prompt_weighter_extra_supported_args(
pipeline_args=pipeline_args,
prompt_weighter=prompt_weighter,
diffusion_args=user_args,
second_model=False
)
if _enums.model_type_is_sd3(self.model_type):
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'max_sequence_length', 'max_sequence_length',
'--max-sequence-length')
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'prompt_2', 'second_prompt',
'--second-prompts',
transform=lambda p: p.positive)
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'negative_prompt_2', 'second_prompt',
'--second-prompts',
transform=lambda p: p.negative)
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'prompt_3', 'third_prompt',
'--third-prompts',
transform=lambda p: p.positive)
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'negative_prompt_3', 'third_prompt',
'--third-prompts',
transform=lambda p: p.negative)
else:
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'prompt_2', 'second_prompt',
'--second-prompts',
transform=lambda p: p.positive)
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'negative_prompt_2', 'second_prompt',
'--second-prompts',
transform=lambda p: p.negative)
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'pag_scale', 'pag_scale',
'--pag-scale')
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'pag_adaptive_scale', 'pag_adaptive_scale',
'--pag-adaptive-scale')
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'guidance_rescale', 'guidance_rescale',
'--guidance-rescales')
self._set_non_universal_pipeline_arg(
self._pipeline,
pipeline_args, user_args,
'sigmas', 'sigmas',
'--sigmas',
transform=functools.partial(
self._sigmas_eval,
'primary',
self._pipeline,
_types.default(
user_args.inference_steps,
_constants.DEFAULT_INFERENCE_STEPS)
))
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'clip_skip', 'clip_skip',
'--clip-skips')
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'image_guidance_scale', 'image_guidance_scale',
'--image-guidance-scales')
self._set_non_universal_pipeline_arg(self._pipeline,
pipeline_args, user_args,
'ip_adapter_image', 'ip_adapter_images',
'IP Adapter images')
if user_args.ip_adapter_images is not None:
if not self._parsed_ip_adapter_uris:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot specify IP Adapter images without loading any IP Adapter models.'
)
if _enums.model_type_is_sd3(self.model_type):
self._pipeline.set_ip_adapter_scale(self._parsed_ip_adapter_uris[0].scale)
else:
self._pipeline.set_ip_adapter_scale([u.scale for u in self._parsed_ip_adapter_uris])
batch_size = _types.default(user_args.batch_size, 1)
# Adjust batch size to match raw latents if provided
if 'latents' in pipeline_args:
latents_batch_size = pipeline_args['latents'].shape[0]
if latents_batch_size != batch_size:
batch_size = latents_batch_size
if user_args.batch_size is not None:
# only warn if the user specified a value
_messages.warning(
f'Setting --batch-size to {batch_size} because '
f'raw latents batch size did not match the specified batch size.'
)
if user_args.images:
if batch_size % len(user_args.images) != 0:
batch_size = len(user_args.images)
if user_args.batch_size is not None:
# only warn if the user specified a value
_messages.warning(
f'Setting --batch-size to {batch_size} because '
f'given batch size did not divide evenly with the '
f'provided number of input images.'
)
if self._model_type != _enums.ModelType.UPSCALER_X2:
pipeline_args['num_images_per_prompt'] = batch_size
else:
in_img_cnt = len(pipeline_args['image'])
if batch_size > in_img_cnt:
batch_mul = batch_size // in_img_cnt
else:
batch_mul = 1
in_imgs = pipeline_args['image'] * batch_mul
num_prompts = len(in_imgs)
pipeline_args['image'] = in_imgs
pipeline_args['prompt'] = \
[pipeline_args['prompt']] * num_prompts
if pipeline_args.get('negative_prompt', None) is not None:
pipeline_args['negative_prompt'] = \
[pipeline_args['negative_prompt']] * num_prompts
generator = pipeline_args['generator']
if isinstance(self._pipeline, diffusers.StableDiffusionInpaintPipelineLegacy):
# Not necessary, will cause an error
pipeline_args.pop('width', None)
pipeline_args.pop('height', None)
has_controlnet = hasattr(self._pipeline, 'controlnet')
has_t2i_adapter = hasattr(self._pipeline, 'adapter') and \
isinstance(self._pipeline.adapter,
(diffusers.T2IAdapter, diffusers.MultiAdapter))
sd_edit = user_args.sdxl_refiner_edit or \
has_controlnet or has_t2i_adapter or \
self._parsed_adetailer_detector_uris or \
isinstance(self._pipeline,
(diffusers.StableDiffusionXLInpaintPipeline,
diffusers.StableDiffusionXLPAGInpaintPipeline))
if has_controlnet:
is_xl_union_model = isinstance(
self._pipeline.controlnet, diffusers.ControlNetUnionModel) \
and len(self.controlnet_uris) > 1
pipeline_args.update({
'controlnet_conditioning_scale': self._get_controlnet_conditioning_scale(),
'control_guidance_start': self._get_controlnet_guidance_start()[0] if
is_xl_union_model else self._get_controlnet_guidance_start(),
'control_guidance_end': self._get_controlnet_guidance_end()[0] if
is_xl_union_model else self._get_controlnet_guidance_end()
})
if 'control_mode' in inspect.signature(self._pipeline.__call__).parameters:
pipeline_args['control_mode'] = self._get_controlnet_mode()
if has_t2i_adapter:
pipeline_args['adapter_conditioning_scale'] = \
self._get_adapter_conditioning_scale()
# T2I adapters require a specific number of input channels
# in the PIL image, or they will choke, we need to convert
# it to a 1 channel image if the T2I Adapter model only
# expects 1 channel
if isinstance(self._pipeline.adapter, diffusers.T2IAdapter):
if hasattr(self._pipeline.adapter.config, 'in_channels'):
if self._pipeline.adapter.config.in_channels == 1:
pipeline_args['image'] = pipeline_args['image'][0].convert('L')
elif isinstance(self._pipeline.adapter, diffusers.MultiAdapter):
pipeline_args['image'] = list(pipeline_args['image'])
for idx, adapter in enumerate(self._pipeline.adapter.adapters):
if hasattr(adapter.config, 'in_channels'):
if adapter.config.in_channels == 1:
pipeline_args['image'][idx] = pipeline_args['image'][idx].convert('L')
def generate_asdff():
return self._call_asdff(
user_args=user_args,
pipeline_args=pipeline_args,
batch_size=batch_size,
prompt_weighter=prompt_weighter
)
if self._sdxl_refiner_pipeline is None:
ras_args = self._get_sd3_ras_args(user_args)
with _freeu(self._pipeline, user_args.freeu_params), \
_sd3_ras_context(self._pipeline, args=ras_args, enabled=user_args.ras), \
_deep_cache_context(self._pipeline,
cache_interval=_types.default(
user_args.deep_cache_interval, _constants.DEFAULT_DEEP_CACHE_INTERVAL),
cache_branch_id=_types.default(
user_args.deep_cache_branch_id, _constants.DEFAULT_DEEP_CACHE_BRANCH_ID),
enabled=user_args.deep_cache), \
_hi_diffusion(self._pipeline,
generator=generator,
enabled=user_args.hi_diffusion,
no_raunet=user_args.hi_diffusion_no_raunet,
no_window_attn=user_args.hi_diffusion_no_win_attn), \
_sada_context(self._pipeline,
width=self._inference_width,
height=self._inference_height,
enabled=user_args.sada,
**self._get_sada_args(user_args)), \
_denoise_range(self._pipeline, user_args.denoising_start, user_args.denoising_end):
if self._parsed_adetailer_detector_uris:
return generate_asdff()
else:
output_type = 'latent' if user_args.output_latents else 'pil'
pipeline_output = _pipelines.call_pipeline(
pipeline=self._pipeline,
prompt_weighter=prompt_weighter,
device=self._device,
output_type=output_type,
**pipeline_args
)
return self._create_pipeline_result(
pipeline_output, output_type, user_args, pipeline_args
)
if user_args.denoising_start is not None or user_args.denoising_end is not None:
raise _pipelines.UnsupportedPipelineConfigError(
'denoising_start and denoising_end are not supported when using an SDXL refiner.'
)
high_noise_fraction = _types.default(user_args.sdxl_high_noise_fraction,
_constants.DEFAULT_SDXL_HIGH_NOISE_FRACTION)
if sd_edit:
i_start = dict()
i_end = dict()
else:
i_start = {'denoising_start': high_noise_fraction}
i_end = {'denoising_end': high_noise_fraction}
output_type = 'latent'
if isinstance(self._sdxl_refiner_pipeline,
diffusers.StableDiffusionXLPAGInpaintPipeline):
# cannot handle latent input
output_type = 'pil'
with _freeu(self._pipeline, user_args.freeu_params), \
_deep_cache_context(self._pipeline,
cache_interval=_types.default(
user_args.deep_cache_interval, _constants.DEFAULT_DEEP_CACHE_INTERVAL),
cache_branch_id=_types.default(
user_args.deep_cache_branch_id, _constants.DEFAULT_DEEP_CACHE_BRANCH_ID),
enabled=user_args.deep_cache), \
_hi_diffusion(self._pipeline,
generator=generator,
enabled=user_args.hi_diffusion,
no_raunet=user_args.hi_diffusion_no_raunet,
no_window_attn=user_args.hi_diffusion_no_win_attn):
if self._parsed_adetailer_detector_uris:
image = generate_asdff().images
else:
image = _pipelines.call_pipeline(
pipeline=self._pipeline,
device=self._device,
prompt_weighter=prompt_weighter,
output_type=output_type,
**pipeline_args,
**i_end
).images
pipeline_args['image'] = image
if not isinstance(self._sdxl_refiner_pipeline,
(diffusers.StableDiffusionXLInpaintPipeline,
diffusers.StableDiffusionXLPAGInpaintPipeline)):
# Width / Height not necessary for any other refiner
if not (isinstance(self._pipeline,
(diffusers.StableDiffusionXLImg2ImgPipeline,
diffusers.StableDiffusionXLPAGImg2ImgPipeline,
diffusers.KolorsImg2ImgPipeline)) and
isinstance(self._sdxl_refiner_pipeline,
(diffusers.StableDiffusionXLImg2ImgPipeline,
diffusers.StableDiffusionXLPAGImg2ImgPipeline))):
# Width / Height does not get passed to img2img
pipeline_args.pop('width', None)
pipeline_args.pop('height', None)
# Or any of these
self._pop_sdxl_conditioning_args(pipeline_args)
pipeline_args.pop('ip_adapter_image', None)
pipeline_args.pop('guidance_rescale', None)
pipeline_args.pop('sigmas', None)
pipeline_args.pop('controlnet_conditioning_scale', None)
pipeline_args.pop('control_guidance_start', None)
pipeline_args.pop('control_guidance_end', None)
pipeline_args.pop('image_guidance_scale', None)
pipeline_args.pop('control_image', None)
# these are only passed if set for the refiner specifically
pipeline_args.pop('pag_scale', None)
pipeline_args.pop('pag_adaptive_scale', None)
# we will handle the strength parameter if it is necessary below
pipeline_args.pop('strength', None)
# We do not want to override the refiner secondary prompt
# with that of --second-prompts by default
pipeline_args.pop('prompt_2', None)
pipeline_args.pop('negative_prompt_2', None)
if prompt_weighter_pop_args:
for arg_name in prompt_weighter_pop_args:
if arg_name in pipeline_args:
pipeline_args.pop(arg_name)
second_model_prompt_weighter = self._get_second_model_prompt_weighter(user_args)
self._set_prompt_weighter_extra_supported_args(
pipeline_args=pipeline_args,
prompt_weighter=second_model_prompt_weighter,
diffusion_args=user_args,
second_model=True
)
self._set_non_universal_pipeline_arg(
self._sdxl_refiner_pipeline,
pipeline_args, user_args,
'prompt', 'second_model_prompt',
'--second-model-prompts',
transform=lambda p: p.positive)
self._set_non_universal_pipeline_arg(
self._sdxl_refiner_pipeline,
pipeline_args, user_args,
'negative_prompt', 'second_model_prompt',
'--second-model-prompts',
transform=lambda p: p.negative)
self._set_non_universal_pipeline_arg(
self._sdxl_refiner_pipeline,
pipeline_args, user_args,
'prompt_2', 'second_model_second_prompt',
'--second-model-second-prompts',
transform=lambda p: p.positive)
self._set_non_universal_pipeline_arg(
self._sdxl_refiner_pipeline,
pipeline_args, user_args,
'negative_prompt_2', 'second_model_second_prompt',
'--second-model-second-prompts',
transform=lambda p: p.negative)
self._get_sdxl_conditioning_args(
self._sdxl_refiner_pipeline,
pipeline_args, user_args,
user_prefix='refiner')
self._set_non_universal_pipeline_arg(
self._sdxl_refiner_pipeline,
pipeline_args, user_args,
'guidance_rescale', 'sdxl_refiner_guidance_rescale',
'--sdxl-refiner-guidance-rescales')
if user_args.second_model_inference_steps is not None:
pipeline_args['num_inference_steps'] = user_args.second_model_inference_steps
if user_args.sdxl_refiner_pag_scale is not None:
pipeline_args['pag_scale'] = user_args.sdxl_refiner_pag_scale
if user_args.sdxl_refiner_pag_adaptive_scale is not None:
pipeline_args['pag_adaptive_scale'] = user_args.sdxl_refiner_pag_adaptive_scale
if user_args.second_model_guidance_scale is not None:
pipeline_args['guidance_scale'] = user_args.second_model_guidance_scale
if user_args.sdxl_refiner_guidance_rescale is not None:
pipeline_args['guidance_rescale'] = user_args.sdxl_refiner_guidance_rescale
if user_args.sdxl_refiner_clip_skip is not None:
pipeline_args['clip_skip'] = user_args.sdxl_refiner_clip_skip
if sd_edit:
strength = float(decimal.Decimal('1.0') - decimal.Decimal(str(high_noise_fraction)))
if strength <= 0.0:
strength = 0.2
_messages.warning(
f'Refiner edit mode image seed strength (1.0 - high-noise-fraction) '
f'was calculated at <= 0.0, defaulting to {strength}'
)
else:
_messages.log(f'Running refiner in edit mode with '
f'refiner image seed strength = {strength}, IE: (1.0 - high-noise-fraction)')
inference_steps = pipeline_args.get('num_inference_steps')
if (strength * inference_steps) < 1.0:
strength = 1.0 / inference_steps
_messages.warning(
f'Refiner edit mode image seed strength (1.0 - high-noise-fraction) * inference-steps '
f'was calculated at < 1, defaulting to (1.0 / inference-steps): {strength}'
)
pipeline_args['strength'] = strength
if isinstance(self._sdxl_refiner_pipeline.scheduler, diffusers.LCMScheduler):
# This will error out catastrophically if we let it happen.
original_steps = self._sdxl_refiner_pipeline.scheduler.config['original_inference_steps']
inference_steps = pipeline_args.get('num_inference_steps')
if sd_edit:
float_limit = strength * original_steps
limit = int(math.floor(float_limit))
if limit < inference_steps:
_messages.warning(
f'Refiner inference-steps is being reduced to {limit} '
f'due to LCMScheduler requirements. "LCMScheduler;original-inference-steps={original_steps}" and '
f'refiner inference-steps must less than or equal to "strength" (inverse high-noise-fraction) * original-inference-steps. '
f'i.e. refiner inference-steps <= ({strength} * {original_steps} = {float_limit}).'
)
else:
limit = original_steps
if limit < inference_steps:
_messages.warning(
f'Refiner inference-steps is being reduced to {limit} '
f'due to LCMScheduler requirements. "LCMScheduler;original-inference-steps={original_steps}" and '
f'refiner inference-steps must less than or equal to that.'
)
pipeline_args['num_inference_steps'] = limit
self._set_non_universal_pipeline_arg(
self._sdxl_refiner_pipeline,
pipeline_args, user_args,
'sigmas', 'sdxl_refiner_sigmas',
'--sdxl-refiner-sigmas',
transform=functools.partial(
self._sigmas_eval,
'refiner',
self._sdxl_refiner_pipeline,
pipeline_args.get('num_inference_steps', _constants.DEFAULT_INFERENCE_STEPS)
)
)
with _freeu(self._sdxl_refiner_pipeline, user_args.sdxl_refiner_freeu_params), \
_deep_cache_context(self._sdxl_refiner_pipeline,
cache_interval=_types.default(
user_args.deep_cache_interval,
_constants.DEFAULT_SDXL_REFINER_DEEP_CACHE_INTERVAL),
cache_branch_id=_types.default(
user_args.deep_cache_branch_id,
_constants.DEFAULT_SDXL_REFINER_DEEP_CACHE_BRANCH_ID),
enabled=user_args.sdxl_refiner_deep_cache):
output_type = 'latent' if user_args.output_latents else 'pil'
pipeline_output = _pipelines.call_pipeline(
pipeline=self._sdxl_refiner_pipeline,
device=self._device,
prompt_weighter=self._get_second_model_prompt_weighter(user_args),
output_type=output_type,
**pipeline_args,
**i_start
)
return self._create_pipeline_result(
pipeline_output, output_type, user_args, pipeline_args
)
def _get_sd3_ras_args(self, user_args) -> _RASArgs | None:
if user_args.ras:
ras_args = _RASArgs(
num_inference_steps=user_args.inference_steps,
patch_size=self._pipeline.transformer.config.patch_size,
sample_ratio=_types.default(user_args.ras_sample_ratio, _constants.DEFAULT_RAS_SAMPLE_RATIO),
high_ratio=_types.default(user_args.ras_high_ratio, _constants.DEFAULT_RAS_HIGH_RATIO),
starvation_scale=_types.default(user_args.ras_starvation_scale,
_constants.DEFAULT_RAS_STARVATION_SCALE),
error_reset_steps=_types.default(user_args.ras_error_reset_steps,
_constants.DEFAULT_RAS_ERROR_RESET_STEPS),
width=self._inference_width,
height=self._inference_height,
enable_index_fusion=user_args.ras_index_fusion,
metric=_types.default(user_args.ras_metric, _constants.DEFAULT_RAS_METRIC),
scheduler_start_step=_types.default(user_args.ras_start_step, _constants.DEFAULT_RAS_START_STEP),
scheduler_end_step=_types.default(user_args.ras_end_step, user_args.inference_steps),
skip_num_step=_types.default(
user_args.ras_skip_num_step, _constants.DEFAULT_RAS_SKIP_NUM_STEP),
skip_num_step_length=_types.default(
user_args.ras_skip_num_step_length, _constants.DEFAULT_RAS_SKIP_NUM_STEP_LENGTH),
replace_with_flash_attn=importlib.util.find_spec('flash-attn') is not None
)
else:
ras_args = None
return ras_args
def _get_sada_args(self, user_args: DiffusionArguments) -> dict:
model_defaults = _util.get_sada_model_defaults(self.model_type)
return {
'max_downsample': _types.default(user_args.sada_max_downsample, model_defaults['max_downsample']),
'sx': _types.default(user_args.sada_sx, model_defaults['sx']),
'sy': _types.default(user_args.sada_sy, model_defaults['sy']),
'acc_range': _types.default(user_args.sada_acc_range, model_defaults['acc_range']),
'lagrange_term': _types.default(user_args.sada_lagrange_term, model_defaults['lagrange_term']),
'lagrange_int': user_args.sada_lagrange_int or model_defaults['lagrange_int'],
'lagrange_step': user_args.sada_lagrange_step or model_defaults['lagrange_step'],
'max_fix': _types.default(user_args.sada_max_fix, model_defaults['max_fix']),
'max_interval': _types.default(user_args.sada_max_interval, model_defaults['max_interval']),
}
[docs]
def recall_main_pipeline(self) -> _pipelines.PipelineCreationResult:
"""
Fetch the last used main pipeline creation result, possibly the pipeline
will be recreated if no longer in the in memory cache. If there is no
pipeline currently created, which will be the case if an image was
never generated yet, :py:exc:`RuntimeError` will be raised.
:raises RuntimeError:
:return: :py:class:`dgenerate.pipelinewrapper.PipelineCreationResult`
"""
if self._recall_main_pipeline is None:
raise RuntimeError('Cannot recall main pipeline as one has not been created.')
return self._recall_main_pipeline()
[docs]
def recall_secondary_pipeline(self) -> _pipelines.PipelineCreationResult:
"""
Fetch the last used refiner / stable cascade decoder pipeline creation result,
possibly the pipeline will be recreated if no longer in the in memory cache.
If there is no refiner / decoder pipeline currently created, which will be the
case if an image was never generated yet or a refiner / decoder model was not
specified, :py:exc:`RuntimeError` will be raised.
:raises RuntimeError:
:return: :py:class:`dgenerate.pipelinewrapper.PipelineCreationResult`
"""
if self._recall_secondary_pipeline is None:
raise RuntimeError('Cannot recall refiner pipeline as one has not been created.')
return self._recall_secondary_pipeline()
def _lazy_init_pipeline(self, args: DiffusionArguments):
pag = args.pag_scale is not None or args.pag_adaptive_scale is not None
sdxl_refiner_pag = args.sdxl_refiner_pag_scale is not None or args.sdxl_refiner_pag_adaptive_scale is not None
pipeline_type = args.determine_pipeline_type()
if self._pipeline is not None:
if self._pipeline_type == pipeline_type:
return False
if pag:
if not (self.model_type == _enums.ModelType.SD or
self.model_type == _enums.ModelType.SDXL or
self.model_type == _enums.ModelType.SD3 or
self.model_type == _enums.ModelType.KOLORS):
raise _pipelines.UnsupportedPipelineConfigError(
'Perturbed attention guidance (pag arguments) are only supported with '
'--model-type sd, sdxl, kolors (txt2img), and sd3.')
if self.t2i_adapter_uris:
raise _pipelines.UnsupportedPipelineConfigError(
'Perturbed attention guidance (pag* arguments) are is not supported '
'with --t2i-adapters.')
args.pag_scale = _types.default(
args.pag_scale, _constants.DEFAULT_PAG_SCALE)
args.pag_adaptive_scale = _types.default(
args.pag_adaptive_scale, _constants.DEFAULT_PAG_ADAPTIVE_SCALE)
if sdxl_refiner_pag:
if not self._sdxl_refiner_uri:
raise _pipelines.UnsupportedPipelineConfigError(
'sdxl_refiner_pag* arguments are not supported when '
'an SDXL refiner is not specified.')
args.sdxl_refiner_pag_scale = _types.default(
args.sdxl_refiner_pag_scale, _constants.DEFAULT_SDXL_REFINER_PAG_SCALE)
args.sdxl_refiner_pag_adaptive_scale = _types.default(
args.sdxl_refiner_pag_adaptive_scale, _constants.DEFAULT_SDXL_REFINER_PAG_ADAPTIVE_SCALE)
self._pipeline_type = pipeline_type
self._recall_main_pipeline = None
self._recall_secondary_pipeline = None
if self._parsed_adetailer_detector_uris:
pipeline_type = _enums.PipelineType.INPAINT
if self._model_type == _enums.ModelType.S_CASCADE:
if self._s_cascade_decoder_uri is None:
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Cascade must be used with a decoder model.')
self._recall_main_pipeline = _pipelines.PipelineFactory(
model_path=self._model_path,
model_type=self._model_type,
pipeline_type=pipeline_type,
subfolder=self._subfolder,
revision=self._revision,
variant=self._variant,
dtype=self._dtype,
original_config=self._original_config,
unet_uri=self._unet_uri,
vae_uri=self._vae_uri,
lora_uris=self._lora_uris,
lora_fuse_scale=self._lora_fuse_scale,
quantizer_uri=self._quantizer_uri,
quantizer_map=self._quantizer_map,
safety_checker=self._safety_checker,
auth_token=self._auth_token,
device=self._device,
sequential_cpu_offload=self._model_sequential_offload,
model_cpu_offload=self._model_cpu_offload,
local_files_only=self._local_files_only,
extra_modules=self._model_extra_modules
)
creation_result = self._recall_main_pipeline()
self._pipeline = creation_result.pipeline
self._recall_secondary_pipeline = _pipelines.PipelineFactory(
model_path=self._parsed_s_cascade_decoder_uri.model,
model_type=_enums.ModelType.S_CASCADE_DECODER,
pipeline_type=_enums.PipelineType.TXT2IMG,
subfolder=self._parsed_s_cascade_decoder_uri.subfolder,
revision=self._parsed_s_cascade_decoder_uri.revision,
unet_uri=self._second_model_unet_uri,
text_encoder_uris=self._second_model_text_encoder_uris,
quantizer_uri=self._second_model_quantizer_uri,
quantizer_map=self._second_model_quantizer_map,
variant=self._parsed_s_cascade_decoder_uri.variant if
self._parsed_s_cascade_decoder_uri.variant is not None else self._variant,
dtype=self._parsed_s_cascade_decoder_uri.dtype if
self._parsed_s_cascade_decoder_uri.dtype is not None else self._dtype,
original_config=self._second_model_original_config,
safety_checker=self._safety_checker,
extra_modules=self._second_model_extra_modules,
auth_token=self._auth_token,
device=self._device,
local_files_only=self._local_files_only,
model_cpu_offload=self._second_model_cpu_offload,
sequential_cpu_offload=self._second_model_sequential_offload)
creation_result = self._recall_secondary_pipeline()
self._s_cascade_decoder_pipeline = creation_result.pipeline
elif self._sdxl_refiner_uri is not None:
self._recall_main_pipeline = _pipelines.PipelineFactory(
model_path=self._model_path,
model_type=self._model_type,
pipeline_type=pipeline_type,
subfolder=self._subfolder,
revision=self._revision,
variant=self._variant,
dtype=self._dtype,
original_config=self._original_config,
unet_uri=self._unet_uri,
vae_uri=self._vae_uri,
lora_uris=self._lora_uris,
lora_fuse_scale=self._lora_fuse_scale,
image_encoder_uri=self._image_encoder_uri,
ip_adapter_uris=self._ip_adapter_uris,
textual_inversion_uris=self._textual_inversion_uris,
text_encoder_uris=self._text_encoder_uris,
controlnet_uris=self._controlnet_uris,
t2i_adapter_uris=self._t2i_adapter_uris,
quantizer_uri=self._quantizer_uri,
quantizer_map=self._quantizer_map,
pag=pag,
safety_checker=self._safety_checker,
auth_token=self._auth_token,
device=self._device,
local_files_only=self._local_files_only,
extra_modules=self._model_extra_modules,
model_cpu_offload=self._model_cpu_offload,
sequential_cpu_offload=self._model_sequential_offload)
creation_result = self._recall_main_pipeline()
self._pipeline = creation_result.pipeline
self._parsed_controlnet_uris = creation_result.parsed_controlnet_uris
self._parsed_t2i_adapter_uris = creation_result.parsed_t2i_adapter_uris
if pipeline_type is _enums.PipelineType.TXT2IMG or self._parsed_adetailer_detector_uris:
refiner_pipeline_type = _enums.PipelineType.IMG2IMG
else:
refiner_pipeline_type = pipeline_type
if self._pipeline is not None:
if _enums.model_type_is_sdxl(self.model_type):
refiner_extra_modules = {'vae': self._pipeline.vae,
'text_encoder_2': self._pipeline.text_encoder_2}
else:
refiner_extra_modules = {'vae': self._pipeline.vae}
if self._second_model_extra_modules is not None:
refiner_extra_modules.update(self._second_model_extra_modules)
else:
refiner_extra_modules = self._second_model_extra_modules
self._recall_secondary_pipeline = _pipelines.PipelineFactory(
model_path=self._parsed_sdxl_refiner_uri.model,
model_type=_enums.ModelType.SDXL,
pipeline_type=refiner_pipeline_type,
subfolder=self._parsed_sdxl_refiner_uri.subfolder,
revision=self._parsed_sdxl_refiner_uri.revision,
unet_uri=self._second_model_unet_uri,
text_encoder_uris=self._second_model_text_encoder_uris,
quantizer_uri=self._second_model_quantizer_uri,
quantizer_map=self._second_model_quantizer_map,
variant=self._parsed_sdxl_refiner_uri.variant if
self._parsed_sdxl_refiner_uri.variant is not None else self._variant,
dtype=self._parsed_sdxl_refiner_uri.dtype if
self._parsed_sdxl_refiner_uri.dtype is not None else self._dtype,
original_config=self._second_model_original_config,
pag=sdxl_refiner_pag,
safety_checker=self._safety_checker,
auth_token=self._auth_token,
device=self._device,
extra_modules=refiner_extra_modules,
local_files_only=self._local_files_only,
model_cpu_offload=self._second_model_cpu_offload,
sequential_cpu_offload=self._second_model_sequential_offload
)
self._sdxl_refiner_pipeline = self._recall_secondary_pipeline().pipeline
else:
self._recall_main_pipeline = _pipelines.PipelineFactory(
model_path=self._model_path,
model_type=self._model_type,
pipeline_type=pipeline_type,
subfolder=self._subfolder,
revision=self._revision,
variant=self._variant,
dtype=self._dtype,
original_config=self._original_config,
unet_uri=self._unet_uri,
transformer_uri=self._transformer_uri,
vae_uri=self._vae_uri,
lora_uris=self._lora_uris,
lora_fuse_scale=self._lora_fuse_scale,
image_encoder_uri=self._image_encoder_uri,
ip_adapter_uris=self._ip_adapter_uris,
textual_inversion_uris=self._textual_inversion_uris,
text_encoder_uris=self._text_encoder_uris,
quantizer_uri=self._quantizer_uri,
quantizer_map=self._quantizer_map,
controlnet_uris=self._controlnet_uris,
t2i_adapter_uris=self._t2i_adapter_uris,
pag=pag,
safety_checker=self._safety_checker,
auth_token=self._auth_token,
device=self._device,
sequential_cpu_offload=self._model_sequential_offload,
model_cpu_offload=self._model_cpu_offload,
local_files_only=self._local_files_only,
extra_modules=self._model_extra_modules,
)
creation_result = self._recall_main_pipeline()
self._pipeline = creation_result.pipeline
self._parsed_controlnet_uris = creation_result.parsed_controlnet_uris
self._parsed_t2i_adapter_uris = creation_result.parsed_t2i_adapter_uris
return True
def _load_prompt_weighter(
self,
uri: str,
model_type: _enums.ModelType,
dtype: _enums.DataType,
device: str | None = None
):
return self._prompt_weighter_loader.load(
uri,
model_type=model_type,
dtype=dtype,
device=device,
local_files_only=self.local_files_only
)
def _default_prompt_weighter(self, *sources):
for source in sources:
if isinstance(source, str): # Direct URI case
return self._load_prompt_weighter(
source,
model_type=self.model_type,
dtype=self._dtype,
device=self._device
)
elif source is not None and source.weighter: # Object case with weighter
return self._load_prompt_weighter(
source.weighter,
model_type=self.model_type,
dtype=self._dtype,
device=self._device
)
return None
def _get_prompt_weighter(self, args: DiffusionArguments):
# prioritize in descending order
return self._default_prompt_weighter(
args.prompt,
args.prompt_weighter_uri
)
def _get_second_model_prompt_weighter(self, args: DiffusionArguments):
# prioritize in descending order
return self._default_prompt_weighter(
args.second_model_prompt,
args.second_model_prompt_weighter_uri,
args.prompt,
args.prompt_weighter_uri
)
def _load_latents_processors_with_batching(self, processors):
if not processors:
return None
processor_chain = [[]]
for processor in processors:
if processor != _constants.LATENTS_PROCESSOR_SEP:
processor_chain[-1].append(processor)
else:
processor_chain.append([])
return [
self._latents_processor_loader.load(
p, device=self._device,
model_type=self._model_type,
local_files_only=self._local_files_only) for p in processor_chain
]
def _process_input_latents(self,
title: str, latents: _types.Tensors,
processor_uris: _types.OptionalUris
) -> list[torch.Tensor]:
"""
Process input latents using configured latents input processors.
:param title: Title for logging purposes
:param latents: Input latents tensor
:param processor_uris: List of processor URIs to apply
:return: Processed latents tensor
"""
if not processor_uris:
return list(t.unsqueeze(0) if t.dim() == 3 else t for t in latents)
processors = self._load_latents_processors_with_batching(processor_uris)
_messages.debug_log(f'Processing {title} input latents with processors: {processor_uris}')
if processors is not None:
processed = []
for idx, t in enumerate(latents):
processor = processors[idx] if idx < len(processors) else None
t = t.unsqueeze(0) if t.dim() == 3 else t
# Process the latents
if processor is not None:
processed.append(processor.process(self._pipeline, t))
else:
processed.append(t)
else:
return []
self._validate_latent_channels(processed)
return processed
def _process_output_latents(self, latents: torch.Tensor, processor_uris: _types.OptionalUris) -> torch.Tensor:
"""
Process output latents using configured latents output processors.
:param latents: Output latents tensor in unpacked format
:param processor_uris: List of processor URIs to apply
:return: Processed latents tensor
"""
if not processor_uris:
return latents
processor = self._latents_processor_loader.load(
processor_uris,
model_type=self.model_type,
device=self.device,
local_files_only=self.local_files_only
)
# Ensure proper batch dimension for processing, also always output with a batch dimension
_messages.debug_log(f'Processing output latents with processors: {processor_uris}')
if processor is not None:
return processor.process(self._pipeline, latents.unsqueeze(0) if latents.ndim == 3 else latents)
else:
return latents.unsqueeze(0) if latents.ndim == 3 else latents
def _process_decoded_latents_images(
self,
images: _types.Images,
processor_uris: _types.OptionalUris,
user_args: DiffusionArguments) -> list[PIL.Image.Image]:
"""
Process images decoded from latents using configured image processors.
The processor handles the full flow: pre-resize processing, resizing to user dimensions, post-resize processing.
:param images: List of PIL Images decoded from latents
:param processor_uris: List of processor URIs to apply
:param user_args: User arguments containing target dimensions
:return: Processed images
"""
if not processor_uris:
# No processors configured, still need to resize to user dimensions
return self._resize_images_to_user_dimensions(images, user_args)
processor = self._decoded_latents_image_processor_loader.load(
processor_uris,
device=self.device,
local_files_only=self.local_files_only
)
_messages.debug_log(
f'Processing decoded latents images with processors: {processor_uris}'
)
if processor is not None:
processed_images = []
for image in images:
if not _enums.model_type_is_s_cascade(self._model_type):
target_size = self._calc_image_target_size(image, user_args)
# The processor handles pre-resize, resize, and post-resize steps
image = processor.process(
image,
resize_resolution=target_size if target_size != image.size else None,
aspect_correct=user_args.aspect_correct,
align=8
)
else:
# just align to 8
image = processor.process(image, align=8)
processed_images.append(image.convert('RGB')) # Ensure images are in RGB format
return processed_images
else:
# Processor loader returned None, fallback to simple resize
return self._resize_images_to_user_dimensions(images, user_args)
@staticmethod
def _calc_image_target_size(image: PIL.Image.Image, user_args: DiffusionArguments):
if user_args.width is not None and user_args.height is not None:
target_size = (user_args.width, user_args.height)
elif user_args.width is not None:
target_size = (user_args.width, image.height)
elif user_args.height is not None:
target_size = (image.width, user_args.height)
else:
target_size = image.size
return target_size
def _create_pipeline_result(self,
pipeline_output,
output_type: str = 'pil',
user_args: DiffusionArguments = None,
pipeline_kwargs: dict = None) -> PipelineWrapperResult:
"""
Create a PipelineWrapperResult from pipeline output and process output latents if needed.
:param pipeline_output: The output from a diffusers pipeline call
:param output_type: The output type that was used ('pil' or 'latent')
:param user_args: DiffusionArguments to get processor URIs from
:param pipeline_kwargs: Pipeline keyword arguments, used for dimension prioritization
:return: PipelineWrapperResult instance with processed latents if applicable
"""
# Initialize variables for final object creation
final_images = None
final_latents = None
# Process based on output type
if output_type == 'latent':
# Extract latents
if hasattr(pipeline_output, 'images'):
raw_latents = pipeline_output.images
else:
raw_latents = getattr(pipeline_output, 'latents', None)
# Normalize latents to torch tensors on CPU
if raw_latents is not None:
normalized_latents = []
for latent in raw_latents:
if isinstance(latent, numpy.ndarray):
latent_tensor = torch.from_numpy(latent).cpu()
elif isinstance(latent, torch.Tensor):
latent_tensor = latent.cpu()
else:
raise TypeError(
f"Unexpected latent type: {type(latent)}. Expected numpy.ndarray or torch.Tensor"
)
normalized_latents.append(latent_tensor)
final_latents = normalized_latents
else:
# Extract PIL images
final_images = getattr(pipeline_output, 'images', None)
# Process latents if we have them
if final_latents is not None:
# For Flux models, unpack latents to external unpacked format
if _enums.model_type_is_flux(self._model_type):
# Get dimensions with priority: pipeline_kwargs > user_args > None
height = None
width = None
if pipeline_kwargs:
height = pipeline_kwargs.get('height')
width = pipeline_kwargs.get('width')
if height is None and user_args:
height = user_args.height
if width is None and user_args:
width = user_args.width
unpacked_latents = []
for latent in final_latents:
unpacked_latent = self._unpack_flux_latents(latent, height, width)
unpacked_latents.append(unpacked_latent)
final_latents = unpacked_latents
# Apply post-processors if configured
if user_args and user_args.latents_post_processors:
if len(final_latents) == 1:
# Single latent, process directly
processed_latent = self._process_output_latents(
final_latents[0], user_args.latents_post_processors
)
final_latents = [processed_latent]
else:
# Multiple latents, batch them together for processing
# Ensure all tensors have batch dimension before concatenating
tensors_with_batch = []
for latent in final_latents:
if latent.ndim == 3: # [C, H, W] - add batch dimension
tensors_with_batch.append(latent.unsqueeze(0)) # [1, C, H, W]
else: # Already has batch dimension
tensors_with_batch.append(latent)
batched_latents = torch.cat(tensors_with_batch, dim=0)
processed_batched = self._process_output_latents(
batched_latents, user_args.latents_post_processors
)
# Split back into individual tensors matching original shapes
processed_latents = []
start_idx = 0
for original_latent in final_latents:
# Determine how many batch items this original tensor contributed
batch_size = 1 if original_latent.ndim == 3 else original_latent.shape[0]
end_idx = start_idx + batch_size
processed_tensor = processed_batched[start_idx:end_idx]
# If original was 3D, squeeze back to 3D
if original_latent.ndim == 3:
processed_tensor = processed_tensor.squeeze(0)
processed_latents.append(processed_tensor)
start_idx = end_idx
final_latents = processed_latents
# Apply inpaint crop pasting if we cropped earlier
if final_images is not None and self._inpaint_crop_info is not None:
crop_info = self._inpaint_crop_info
# Paste generated images back onto originals
pasted_images = self._paste_inpaint_result(
original_images=crop_info.original_images,
generated_images=final_images,
crop_bounds=crop_info.crop_bounds,
masks=crop_info.original_masks if crop_info.use_masked else None,
feather=crop_info.feather
)
final_images = pasted_images
# Clean up temporary crop info
self._inpaint_crop_info = None
# Create and return the result object at the end
return PipelineWrapperResult(images=final_images, latents=final_latents)
def _argument_help_check(self, args: DiffusionArguments):
scheduler_help = _help.scheduler_is_help(args.scheduler_uri)
second_model_scheduler_help = _help.scheduler_is_help(args.second_model_scheduler_uri)
text_encoder_help = _help.text_encoder_is_help(self.text_encoder_uris)
second_model_text_encoder_help = _help.text_encoder_is_help(self.second_model_text_encoder_uris)
help_text = []
model_path = self.model_path
if scheduler_help or second_model_scheduler_help:
pipe_class = _pipelines.get_pipeline_class(
model_type=self.model_type,
pipeline_type=args.determine_pipeline_type(),
unet_uri=self.unet_uri,
transformer_uri=self.transformer_uri,
vae_uri=self.vae_uri,
lora_uris=self.lora_uris,
image_encoder_uri=self.image_encoder_uri,
ip_adapter_uris=self.ip_adapter_uris,
textual_inversion_uris=self.textual_inversion_uris,
controlnet_uris=self.controlnet_uris,
t2i_adapter_uris=self.t2i_adapter_uris,
pag=args.pag_scale is not None or args.pag_adaptive_scale is not None,
help_mode=True
)
if scheduler_help:
help_text.append(
f'Schedulers compatible with: {model_path}\n\n' +
_help.get_scheduler_help(
pipe_class,
help_args=_help.scheduler_is_help_args(
args.scheduler_uri),
indent=4
))
if text_encoder_help:
help_text.append(
f'Text encoders compatible with: {model_path}\n\n' +
_help.text_encoder_help(
pipe_class,
indent=4
))
if second_model_scheduler_help or second_model_text_encoder_help:
second_pipe_class = _pipelines.get_pipeline_class(
model_type=_enums.ModelType.SDXL if
self.sdxl_refiner_uri else _enums.ModelType.S_CASCADE_DECODER,
pipeline_type=_enums.PipelineType.IMG2IMG,
unet_uri=self.second_model_unet_uri,
vae_uri=self.vae_uri,
pag=args.pag_scale is not None or args.pag_adaptive_scale is not None,
help_mode=True
)
second_model_path = self.sdxl_refiner_uri or self.s_cascade_decoder_uri
if second_model_scheduler_help:
help_text.append(
f'Schedulers compatible with: {second_model_path}\n\n' +
_help.get_scheduler_help(
second_pipe_class,
help_args=_help.scheduler_is_help_args(
args.second_model_scheduler_uri),
indent=4
))
if second_model_text_encoder_help:
help_text.append(
f'Text encoders compatible with: {second_model_path}\n\n' +
_help.text_encoder_help(
second_pipe_class,
indent=4
))
return '\n\n'.join(help_text)
def _set_scheduler_and_vae_settings(self, args):
second_model_scheduler_uri = _types.default(
args.second_model_scheduler_uri,
args.scheduler_uri
)
_schedulers.load_scheduler(
pipeline=self._pipeline,
scheduler_uri=args.scheduler_uri
)
if self._sdxl_refiner_pipeline:
_schedulers.load_scheduler(
pipeline=self._sdxl_refiner_pipeline,
scheduler_uri=second_model_scheduler_uri
)
if self._s_cascade_decoder_pipeline:
_schedulers.load_scheduler(
pipeline=self._s_cascade_decoder_pipeline,
scheduler_uri=second_model_scheduler_uri
)
_pipelines.set_vae_tiling_and_slicing(
pipeline=self._pipeline,
tiling=args.vae_tiling,
slicing=args.vae_slicing
)
def _auto_denoise_range_check(self, args: DiffusionArguments):
if _enums.model_type_is_sdxl(self._model_type):
have_latent_input = any(
_torchutil.is_tensor(i) for i in args.images
) if args.images else False
have_image_input = any(
isinstance(i, PIL.Image.Image) for i in args.images
) if args.images else False
if args.denoising_start is not None and args.denoising_start != 0.0:
if args.mask_images and have_latent_input:
raise _pipelines.UnsupportedPipelineConfigError(
'Denoising start parameter is not supported for SDXL models '
'with latent input and inpaint mask images defined. In order '
'to refine an inpainted image, just pass in the generated image '
'and use normal inpainting mode on it.'
)
if not have_latent_input:
raise _pipelines.UnsupportedPipelineConfigError(
'Denoising start parameter is not supported for SDXL models '
'without latents being passed as image inputs.'
)
if have_image_input:
raise _pipelines.UnsupportedPipelineConfigError(
'Denoising start parameter is not supported for SDXL models '
'with image inputs for img2img, it can only accept latents.'
)
def _auto_latents_check(self, args: DiffusionArguments):
if args.output_latents:
if self.adetailer_detector_uris:
raise _pipelines.UnsupportedPipelineConfigError(
'Adetailer does not support outputting to latents.'
)
if _enums.model_type_is_floyd(self.model_type):
raise _pipelines.UnsupportedPipelineConfigError(
'Deep Floyd model types do not support outputting to latents.'
)
else:
if args.latents_post_processors:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot use latents post processors when not outputting to latents.'
)
if args.images and any(_torchutil.is_tensor(i) for i in args.images):
# validation that input type is not mixed happens in _get_pipeline_defaults
if _enums.model_type_is_floyd(self.model_type):
raise _pipelines.UnsupportedPipelineConfigError(
'Deep Floyd model types do not support accepting latents as input.'
)
if _enums.model_type_is_s_cascade(self.model_type):
raise _pipelines.UnsupportedPipelineConfigError(
'Stable Cascade does not support accepting latents as input.'
)
def _auto_ras_check(self, args: DiffusionArguments):
for prop in args.__dict__.keys():
if prop.startswith('ras_'):
value = getattr(args, prop)
if value is not None or (isinstance(value, bool) and value is True):
args.ras = True
break
if args.ras:
if not _enums.model_type_is_sd3(self.model_type):
raise _pipelines.UnsupportedPipelineConfigError(
'RAS is only supported for SD3.')
if args.ras_index_fusion and self._pipeline.transformer.config.qk_norm == 'rms_norm':
raise _pipelines.UnsupportedPipelineConfigError(
'RAS index fusion not supported with SD3.5, only SD3.'
)
if args.ras_index_fusion and not importlib.util.find_spec('triton'):
raise _pipelines.UnsupportedPipelineConfigError(
'RAS index fusion is only supported with triton / triton-windows installed.')
if self.model_cpu_offload:
raise _pipelines.UnsupportedPipelineConfigError(
'RAS does not support model CPU offloading.')
if args.ras_index_fusion and self.model_sequential_offload:
raise _pipelines.UnsupportedPipelineConfigError(
'Index fusion is not supported for RAS when sequential offloading is enabled.')
if args.ras_index_fusion and (
self.quantizer_uri or (self._unet_uri and _uris.UNetUri.parse(self._unet_uri).quantizer)
):
raise _pipelines.UnsupportedPipelineConfigError(
'Index fusion is not supported for RAS when UNet quantization is enabled, '
'quantize the text encoders individually.')
start_step = _types.default(args.ras_start_step, _constants.DEFAULT_RAS_START_STEP)
end_step = _types.default(args.ras_end_step, args.inference_steps)
if start_step > end_step:
raise _pipelines.UnsupportedPipelineConfigError(
'RAS start step must be less than or equal to end step.')
def _auto_deep_cache_check(self, args: DiffusionArguments):
# Auto-enable deep_cache if any deep_cache_ parameters are set
for prop in args.__dict__.keys():
if prop.startswith('deep_cache_'):
value = getattr(args, prop)
if value is not None or (isinstance(value, bool) and value is True):
args.deep_cache = True
break
if args.deep_cache:
if not (
self.model_type == _enums.ModelType.SDXL or
self.model_type == _enums.ModelType.SDXL_PIX2PIX or
self.model_type == _enums.ModelType.KOLORS or
self.model_type == _enums.ModelType.SD or
self.model_type == _enums.ModelType.PIX2PIX or
self.model_type == _enums.ModelType.UPSCALER_X4):
raise _pipelines.UnsupportedPipelineConfigError(
f'DeepCache is only supported with Stable Diffusion, Stable Diffusion XL, '
f'Stable Diffusion Upscaler X4, Kolors, and Pix2Pix variants.'
)
for prop in args.__dict__.keys():
if prop.startswith('sdxl_refiner_deep_cache_'):
value = getattr(args, prop)
if value is not None or (isinstance(value, bool) and value is True):
args.sdxl_refiner_deep_cache = True
break
def _auto_hi_diffusion_check(self, args: DiffusionArguments):
if args.hi_diffusion:
if not (
self.model_type == _enums.ModelType.SDXL or
self.model_type == _enums.ModelType.KOLORS or
self.model_type == _enums.ModelType.SD):
raise _pipelines.UnsupportedPipelineConfigError(
'HiDiffusion is only supported for '
'--model-type sd, sdxl, and kolors'
)
if self.t2i_adapter_uris:
raise _pipelines.UnsupportedPipelineConfigError(
'HiDiffusion is not supported with T2I Adapters'
)
else:
if args.hi_diffusion_no_raunet is not None:
raise _pipelines.UnsupportedPipelineConfigError(
'HiDiffusion no-raunet option is only supported when HiDiffusion is enabled.'
)
if args.hi_diffusion_no_win_attn is not None:
raise _pipelines.UnsupportedPipelineConfigError(
'HiDiffusion no-window-attention option is only supported when HiDiffusion is enabled.'
)
def _auto_sada_check(self, args: DiffusionArguments):
for prop in args.__dict__.keys():
if prop.startswith('sada_'):
value = getattr(args, prop)
if value is not None or (isinstance(value, bool) and value is True):
args.sada = True
break
if args.sada:
# SADA supports SD, SDXL/Kolors, and Flux
if not (
self.model_type == _enums.ModelType.SD or
self.model_type == _enums.ModelType.SDXL or
self.model_type == _enums.ModelType.KOLORS or
_enums.model_type_is_flux(self.model_type)):
raise _pipelines.UnsupportedPipelineConfigError(
'SADA is only supported for '
'--model-type sd, sdxl, kolors, and flux*'
)
# Check for conflicts with other acceleration methods
if args.tea_cache:
raise _pipelines.UnsupportedPipelineConfigError(
'SADA cannot be used simultaneously with TeaCache'
)
if args.deep_cache:
raise _pipelines.UnsupportedPipelineConfigError(
'SADA cannot be used simultaneously with DeepCache'
)
if args.hi_diffusion:
raise _pipelines.UnsupportedPipelineConfigError(
'SADA cannot be used simultaneously with HiDiffusion'
)
# Validate Lagrangian interpolation parameters
sada_args = self._get_sada_args(args)
if sada_args['lagrange_term'] != 0:
if (sada_args['lagrange_int'] is None or
sada_args['lagrange_step'] is None):
raise _pipelines.UnsupportedPipelineConfigError(
'When using SADA Lagrangian interpolation (lagrange_term != 0), '
'both lagrange_int and lagrange_step must be specified'
)
if sada_args['lagrange_step'] % sada_args['lagrange_int'] != 0:
raise _pipelines.UnsupportedPipelineConfigError(
'SADA lagrange_step must be divisible by lagrange_int'
)
def _auto_tea_cache_check(self, args: DiffusionArguments):
for prop in args.__dict__.keys():
if prop.startswith('tea_cache_'):
value = getattr(args, prop)
if value is not None or (isinstance(value, bool) and value is True):
args.tea_cache = True
break
if args.tea_cache:
if not _enums.model_type_is_flux(self.model_type):
raise _pipelines.UnsupportedPipelineConfigError(
'TeaCache is only supported for Flux.'
)
if self.model_cpu_offload:
raise _pipelines.UnsupportedPipelineConfigError(
'TeaCache does not support model CPU offloading.'
)
def _auto_freeu_check(self, args: DiffusionArguments):
freeu_model_types = {
_enums.ModelType.SD,
_enums.ModelType.SDXL,
_enums.ModelType.KOLORS,
_enums.ModelType.PIX2PIX,
_enums.ModelType.SDXL_PIX2PIX,
_enums.ModelType.UPSCALER_X2,
_enums.ModelType.UPSCALER_X4
}
if args.freeu_params is not None:
if self._model_type not in freeu_model_types:
raise _pipelines.UnsupportedPipelineConfigError(
'Current primary model does not utilize a UNet, and therefore does not support FreeU parameters.'
)
if args.sdxl_refiner_freeu_params is not None:
if self._sdxl_refiner_uri is None:
raise _pipelines.UnsupportedPipelineConfigError(
'SDXL refiner is not in use, so cannot supply FreeU parameters to it.'
)
[docs]
def get_decoded_latents_size(self, latents: torch.Tensor) -> _types.Size:
"""
Given a latent tensor return the expected decoded image (width, height) in pixels.
:param latents: Latent tensor of shape [B, C, H, W] or [C, H, W].
:return: width, height
"""
if self._pipeline is None:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot decode latents as a pipeline has not been initialized, you must perform a generation first.'
)
if not hasattr(self._pipeline, 'vae') or self._pipeline.vae is None:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot decode latents as the initialized pipeline does not have a VAE.'
)
# Get the latent dimensions
if len(latents.shape) == 4:
_, _, h, w = latents.shape
else:
_, h, w = latents.shape
# The scale factor is fixed at 8 due to the VAE architecture having 3 downsampling blocks
# This is true for SD1.5, SDXL, and most other stable diffusion VAEs
scale_factor = 8
# Calculate the decoded size
height = h * scale_factor
width = w * scale_factor
return width, height
[docs]
@torch.inference_mode()
def decode_latents(
self,
latents: _types.TensorsOrTensor,
) -> list[PIL.Image.Image]:
"""
Decode latents using the main pipeline's VAE.
A generation must have occurred at least once for this method to be usable.
You must be using a model type that utilizes a VAE, Stable Cascade and Deep Floyd model types
are not supported by this method.
:param latents: Latents to decode, can be a sequence of tensors (batched), or a single tensor.
A single tensor with a batch dimension [B, C, H, W] will be assumed to be a batch of latents
and batched if the batch dimension is > 1, [C, H, W] will be assumed to be a single latent tensor.
For Flux models, latents should be in unpacked format [B, C, H, W] where C=16.
:raise dgenerate.pipelinewrapper.UnsupportedPipelineConfigError: If the decoding the latents is not supported.
"""
if self._pipeline is None:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot decode latents as a pipeline has not been initialized, you must perform a generation first.'
)
if not hasattr(self._pipeline, 'vae') or self._pipeline.vae is None:
raise _pipelines.UnsupportedPipelineConfigError(
'Cannot decode latents as the initialized pipeline does not have a VAE.'
)
if isinstance(latents, torch.Tensor):
if latents.ndim == 3:
latents = latents.unsqueeze(0)
elif latents:
if latents[0].ndim == 4:
# List of [B, C, H, W] tensors - concatenate along batch dimension
latents = torch.cat(list(latents), dim=0) # [total_B, C, H, W]
elif latents[0].ndim == 3:
# List of [C, H, W] tensors - stack to create batch dimension
latents = torch.stack(list(latents), dim=0) # [num_tensors, C, H, W]
vae = self._pipeline.vae
vae_og_dtype = vae.dtype
needs_upcasting = vae.dtype == torch.float16 and getattr(vae.config, 'force_upcast', False)
if needs_upcasting:
try:
vae.to(self._device, dtype=torch.float32)
except NotImplementedError:
vae.to(dtype=torch.float32)
try:
if latents.dtype != vae.dtype:
latents = latents.to(dtype=vae.dtype)
if latents.device != vae.device:
latents = latents.to(self._device)
if _enums.model_type_is_sdxl(self.model_type) or _enums.model_type_is_kolors(self.model_type):
# SDXL and Kolors
has_latents_mean = \
hasattr(vae.config, "latents_mean") and \
vae.config.latents_mean is not None
has_latents_std = \
hasattr(vae.config, "latents_std") and \
vae.config.latents_std is not None
if has_latents_mean and has_latents_std:
latents_mean = (
torch.tensor(vae.config.latents_mean).reshape(1, 4, 1, 1).expand(latents.shape[0], -1, 1, 1).to(
latents.device, latents.dtype
)
)
latents_std = (
torch.tensor(vae.config.latents_std).reshape(1, 4, 1, 1).expand(latents.shape[0], -1, 1, 1).to(
latents.device, latents.dtype
)
)
latents = latents * latents_std / vae.config.scaling_factor + latents_mean
else:
latents = latents / vae.config.scaling_factor
elif _enums.model_type_is_sd15(self.model_type) or _enums.model_type_is_sd2(self.model_type):
# SD15 and SD2
latents = latents / vae.config.scaling_factor
elif _enums.model_type_is_sd3(self.model_type):
# SD3
latents = (latents / vae.config.scaling_factor) + vae.config.shift_factor
elif _enums.model_type_is_flux(self.model_type):
# Flux - latents are already in unpacked format [B, C, H, W]
# Apply VAE scaling and shift
latents = (latents / vae.config.scaling_factor) + vae.config.shift_factor
else:
raise _pipelines.UnsupportedPipelineConfigError(
f'Unable to decode latents for unsupported model type: {_enums.get_model_type_string(self.model_type)}'
)
decoded_images = vae.decode(latents).sample
finally:
if needs_upcasting:
vae.to(dtype=vae_og_dtype)
return self._pipeline.image_processor.postprocess(decoded_images)
[docs]
def __call__(self, args: DiffusionArguments | None = None, **kwargs) -> PipelineWrapperResult:
"""
Call the pipeline and generate a result.
:param args: Optional :py:class:`.DiffusionArguments`
:param kwargs: See :py:meth:`.DiffusionArguments.get_pipeline_wrapper_kwargs`,
any keyword arguments given here will override values derived from the
:py:class:`.DiffusionArguments` object given to the *args* parameter.
:raises InvalidModelFileError:
:raises InvalidModelUriError:
:raises InvalidSchedulerNameError:
:raises dgenerate.OutOfMemoryError:
:raises UnsupportedPipelineConfigError:
:return: :py:class:`.PipelineWrapperResult`
"""
# always reset inpaint crop state per call
self._inpaint_crop_info = None
copy_args = DiffusionArguments()
if args is not None:
copy_args.set_from(args)
copy_args.set_from(kwargs, missing_value_throws=False)
self._auto_freeu_check(copy_args)
self._auto_tea_cache_check(copy_args)
self._auto_deep_cache_check(copy_args)
self._auto_hi_diffusion_check(copy_args)
self._auto_sada_check(copy_args)
self._auto_latents_check(copy_args)
self._auto_denoise_range_check(copy_args)
help_text = self._argument_help_check(copy_args)
if help_text:
raise DiffusionArgumentsHelpException(help_text)
_messages.debug_log(f'Calling Pipeline Wrapper: "{self}"')
_messages.debug_log(f'Pipeline Wrapper Args: ',
lambda: _textprocessing.debug_format_args(
copy_args.get_pipeline_wrapper_kwargs()))
self._lazy_init_pipeline(copy_args)
# this needs to happen even if a cached pipeline
# was loaded, since the settings for scheduler
# and vae tiling / slicing may be different
self._set_scheduler_and_vae_settings(copy_args)
pipeline_args = \
self._get_pipeline_defaults(user_args=copy_args)
# needs the pipeline initialized
self._auto_ras_check(copy_args)
try:
if self.model_type == _enums.ModelType.S_CASCADE:
result = self._call_torch_s_cascade(
pipeline_args=pipeline_args,
user_args=copy_args)
elif _enums.model_type_is_flux(self.model_type):
result = self._call_torch_flux(pipeline_args=pipeline_args,
user_args=copy_args)
else:
result = self._call_torch(pipeline_args=pipeline_args,
user_args=copy_args)
except _DenoiseRangeError as e:
raise _pipelines.UnsupportedPipelineConfigError(e) from e
DiffusionPipelineWrapper.__LAST_RECALL_PIPELINE = self._recall_main_pipeline
DiffusionPipelineWrapper.__LAST_RECALL_SECONDARY_PIPELINE = self._recall_secondary_pipeline
return result
def __str__(self):
return f'{self.__class__.__name__}({str(_types.get_public_attributes(self))})'
def __repr__(self):
return str(self)
__all__ = _types.module_all()