Source code for dgenerate.memory

# Copyright (c) 2023, Teriks
#
# dgenerate is distributed under the following BSD 3-Clause License
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import ast
import collections.abc
import os
import typing

import psutil
import torch

import dgenerate.messages as _messages
import dgenerate.textprocessing as _textprocessing
import dgenerate.types as _types
import dgenerate.memoize as _memoize
import dgenerate.eval as _eval
import dgenerate.torchutil as _torchutil

__doc__ = """
System memory information and memory constraint expressions.
"""


[docs] class MemoryConstraintSyntaxError(Exception): """ Thrown by :py:func:`.memory_constraints` on syntax errors or if an expression returns a non-boolean value """ pass
[docs] def memory_constraint_syntax_check(expression: str): """ Syntax check an expression given to :py:func:`memory_constraints` :param expression: the expression string :raises MemoryConstraintSyntaxError: on syntax errors. """ if len(expression) > 128: raise MemoryConstraintSyntaxError(f'Given expression "{expression[:24]} ..." is too long.') try: tree = ast.parse(expression) if tree.body: if not isinstance(tree.body[0], ast.Expr): raise MemoryConstraintSyntaxError( f'Expression "{expression}" is invalid. ' f'Only simple expressions accepted, no control statements, etc.') # noinspection PyUnresolvedReferences if not isinstance(tree.body[0].value, (ast.BoolOp, ast.Compare)): raise MemoryConstraintSyntaxError( f'Expression "{expression}" is invalid. ' 'Only expressions returning boolean values accepted.') except SyntaxError as e: raise MemoryConstraintSyntaxError( f'Syntax error in expression "{expression}": {str(e).strip()}')
[docs] def memory_constraints(expressions: collections.abc.Iterable[str], extra_vars: dict[str, int | float] | None = None, mode=any, pid: int | None = None) -> bool: """ Evaluate a user boolean expression involving the processes used memory in bytes, used memory percent, and available system memory in bytes. Available functions are: * kb(bytes to kilobytes) * mb(bytes to megabytes) * gb(bytes to gigabytes) * kib(bytes to kibibytes) * mib(bytes to mebibytes) * gib(bytes to gibibytes) Available values are: * used / u (memory currently used by the process in bytes) * used_total_percent / utp (memory used by the process, as percent of total system memory, example: 25.4) * used_percent / up (memory used by the process, as a percent of used + available memory, example 75.4) * available / a (available memory remaining on the system in bytes that can be used without going to the swap) * total / t (total memory on the system in bytes) Example expressions: * ``used > gb(1)`` (when the process has used more than 1GB of memory) * ``used_total_percent > 25`` (when the process has used more than 25 percent of system memory) * ``used_percent > 25`` (when the process has used more than 25 percent of virtual memory available to it) * ``available < gb(2)`` (when the available memory on the system is less than 2GB) Expressions may not be longer than 128 characters. However, multiple expressions may be provided. :raise ValueError: if extra_vars overwrites a reserved variable name :raise MemoryConstraintSyntaxError: on syntax errors or if the return value of an expression is not a boolean value. :param expressions: a list of expressions, if expressions is ``None`` or empty this function will return ``False``. :param extra_vars: extra integer or float variables :param mode: the standard library function 'any' (equating to OR all expressions) or the standard library function 'all' (equating to AND all expressions). The default is 'any' which ORs all expressions. :param pid: PID of the process from which to acquire the 'used' and 'used_percent' variable values from, defaults to the current process. :return: Boolean result of the expression """ if not expressions: return False for expr in expressions: memory_constraint_syntax_check(expr) if pid is None: pid = os.getpid() p_info = psutil.Process(pid) used = p_info.memory_info().rss used_total_percent = p_info.memory_percent() mem_info = psutil.virtual_memory() available = mem_info.available total = mem_info.total used_percent = (used / (used + available)) * 100.0 functions = { 'gb': lambda x: x * 1000 ** 3, 'mb': lambda x: x * 1000 ** 2, 'kb': lambda x: x * 1000, 'gib': lambda x: x * 1024 ** 3, 'mib': lambda x: x * 1024 ** 2, 'kib': lambda x: x * 1024 } variables = { 'used': used, 'u': used, 'used_percent': used_percent, 'up': used_percent, 'used_total_percent': used_total_percent, 'utp': used_total_percent, 'available': available, 'a': available, 'total': total, 't': total } if extra_vars: for key, value in extra_vars.items(): if key in variables or key in functions: raise ValueError( f'extra_vars cannot redefine reserved attribute: {key}') variables[key] = value interpreter = _eval.standard_interpreter( symtable=_eval.safe_builtins() | variables.copy() ) interpreter.symtable.update(functions) _messages.debug_log( f'CPU MEMORY CONSTRAINT TEST: {_types.fullname(memory_constraints)} constraint = ' f'[{", ".join(_textprocessing.quote_spaces(expressions))}], ' f'vars = {str(variables)}, mode={mode.__name__}') try: value = mode(interpreter( e, raise_errors=True, show_errors=False) for e in expressions) if not isinstance(value, bool): raise MemoryConstraintSyntaxError('Memory constraint must return a boolean value.') _messages.debug_log(f'CPU MEMORY CONSTRAINT RESULT: {value}') return value except (Exception, NameError) as e: raise MemoryConstraintSyntaxError( f'Memory constraint syntax error: {e}')
_MEM_FACTORS = { 'b': 1, 'kb': 1000, 'mb': 1000 ** 2, 'gb': 1000 ** 3, 'kib': 1024, 'mib': 1024 ** 2, 'gib': 1024 ** 3, }
[docs] def get_used_memory(unit: str = 'b', pid: int | None = None): """ Get the memory used by a process in a selectable unit. :param unit: one of (case insensitive): b (bytes), kb (kilobytes), mb (megabytes), gb (gigabytes), kib (kibibytes), mib (mebibytes), gib (gibibytes) :param pid: The process PID to retrieve this information from, defaults to the current process. :return: Requested value. """ if pid is None: pid = os.getpid() return psutil.Process(pid).memory_info().rss / _MEM_FACTORS[unit.strip().lower()]
[docs] def get_used_total_memory_percent(pid: int | None = None) -> float: """ Get the percentage of memory used by a process as a percentage of total system memory. :param pid: PID of the process, defaults to the current process. :return: A whole percentage, for example: 25.4 """ if pid is None: pid = os.getpid() return psutil.Process(pid).memory_percent()
[docs] def get_used_memory_percent(pid: int | None = None) -> float: """ Get the percentage of memory used by a process as a percentage of already used memory plus available virtual memory. :param pid: PID of the process, defaults to the current process. :return: A whole percentage, for example: 25.4 """ if pid is None: pid = os.getpid() p_info = psutil.Process(pid) used = p_info.memory_info().rss mem_info = psutil.virtual_memory() available = mem_info.available return (used / (used + available)) * 100.0
[docs] def get_available_memory(unit: str = 'b'): """ Get the available memory remaining on the system in a selectable unit. :param unit: one of (case insensitive): b (bytes), kb (kilobytes), mb (megabytes), gb (gigabytes), kib (kibibytes), mib (mebibytes), gib (gibibytes) :return: Requested value. """ return psutil.virtual_memory().available / _MEM_FACTORS[unit.strip().lower()]
[docs] def get_total_memory(unit: str = 'b'): """ Get the total physical memory on the system. :param unit: one of (case insensitive): b (bytes), kb (kilobytes), mb (megabytes), gb (gigabytes), kib (kibibytes), mib (mebibytes), gib (gibibytes) :return: Requested value. """ return psutil.virtual_memory().total / _MEM_FACTORS[unit.strip().lower()]
[docs] def is_supported_gpu_device(device: str | torch.device) -> bool: """ Check if a device is a supported GPU device (CUDA or XPU) that can be used with the GPU memory functions in this module. MPS statistics are unsupported due to using a unified memory model. :param device: The device to check (string like 'cuda:0', 'xpu:1' or torch.device object) :return: True if the device is a supported GPU device, False otherwise """ if isinstance(device, str): device = device.strip() return device.startswith('cuda') or device.startswith('xpu') elif isinstance(device, torch.device): return device.type in ('cuda', 'xpu') else: return False
def _parse_gpu_device(device: str | torch.device) -> tuple[str, int]: """ Parse a GPU device identifier and return the device type and index. :param device: The device to parse (string like 'cuda:0', 'xpu:1' or torch.device object) :return: Tuple of (device_type, device_index) :raises ValueError: If device is not a valid GPU device identifier """ if isinstance(device, str): device = device.strip() device_type = device.split(':')[0] if ':' in device: device_index = int(device.split(':')[1].strip()) else: device_index = 0 # default to device 0 if no index is specified elif isinstance(device, torch.device): device_type = device.type device_index = device.index if device.index is not None else 0 else: raise ValueError('device must be a str or torch.device object.') return device_type, device_index
[docs] def get_gpu_total_memory(device: str | torch.device, unit: str = 'b'): """ Return the total memory processed by a GPU device. Non GPU devices always return 0. :param device: The device. :param unit: one of (case insensitive): b (bytes), kb (kilobytes), mb (megabytes), gb (gigabytes), kib (kibibytes), mib (mebibytes), gib (gibibytes) :return: Requested value. """ if not is_supported_gpu_device(device): return 0 device_type, device_index = _parse_gpu_device(device) if device_type == 'cuda': return torch.cuda.get_device_properties(device_index).total_memory / _MEM_FACTORS[unit.strip().lower()] elif device_type == 'xpu': return torch.xpu.get_device_properties(device_index).total_memory / _MEM_FACTORS[unit.strip().lower()]
[docs] def get_gpu_allocated_memory(device: str | torch.device, unit: str = 'b'): """ Return the total memory allocated on a GPU device. Non GPU devices always return 0. :param device: The device. :param unit: one of (case insensitive): b (bytes), kb (kilobytes), mb (megabytes), gb (gigabytes), kib (kibibytes), mib (mebibytes), gib (gibibytes) :return: Requested value. """ if not is_supported_gpu_device(device): return 0 device_type, device_index = _parse_gpu_device(device) if device_type == 'cuda': with torch.cuda.device(device_index): return torch.cuda.memory_allocated() / _MEM_FACTORS[unit.strip().lower()] elif device_type == 'xpu': with torch.xpu.device(device_index): return torch.xpu.memory_allocated() / _MEM_FACTORS[unit.strip().lower()]
[docs] def get_gpu_free_memory(device: str | torch.device, unit: str = 'b'): """ Return the amount of free memory available on a GPU device. Non GPU devices always return 0. :param device: The device. :param unit: one of (case insensitive): b (bytes), kb (kilobytes), mb (megabytes), gb (gigabytes), kib (kibibytes), mib (mebibytes), gib (gibibytes) :return: Requested value. """ if not is_supported_gpu_device(device): return 0 device_type, device_index = _parse_gpu_device(device) if device_type == 'cuda': total_memory = torch.cuda.get_device_properties(device_index).total_memory with torch.cuda.device(device_index): reserved_memory = torch.cuda.memory_reserved() return (total_memory - reserved_memory) / _MEM_FACTORS[unit.strip().lower()] elif device_type == 'xpu': total_memory = torch.xpu.get_device_properties(device_index).total_memory with torch.xpu.device(device_index): reserved_memory = torch.xpu.memory_reserved() return (total_memory - reserved_memory) / _MEM_FACTORS[unit.strip().lower()]
[docs] def get_gpu_reserved_memory(device: str | torch.device, unit: str = 'b'): """ Return the amount of reserved memory on a GPU device. Non GPU devices always return 0. :param device: The device. :param unit: one of (case insensitive): b (bytes), kb (kilobytes), mb (megabytes), gb (gigabytes), kib (kibibytes), mib (mebibytes), gib (gibibytes) :return: Requested value. """ if not is_supported_gpu_device(device): return 0 device_type, device_index = _parse_gpu_device(device) if device_type == 'cuda': with torch.cuda.device(device_index): return torch.cuda.memory_reserved() / _MEM_FACTORS[unit.strip().lower()] elif device_type == 'xpu': with torch.xpu.device(device_index): return torch.xpu.memory_reserved() / _MEM_FACTORS[unit.strip().lower()]
[docs] def bytes_best_human_unit(byte_count: int, delimiter='') -> str: """ Return a string for humans from a byte count using an appropriate unit: IE 1KB, 1MB, 1GB etc. :param delimiter: add this string between the value and the unit :param byte_count: the byte count :return: formatted string """ gb = byte_count / 1000 ** 3 mb = byte_count / 1000 ** 2 kb = byte_count / 1000 if gb > 1: return f'{round(gb, 2)}{delimiter}GB' if mb > 1: return f'{round(mb, 2)}{delimiter}MB' if kb > 1: return f'{round(kb, 2)}{delimiter}KB' return f'{byte_count}{delimiter}B'
[docs] def memory_use_debug_string(pid=None): """ Return a debug string using describing the memory consumption of a process and also available system memory. Example: "Used Memory: 465.25MB, Available Memory: 50.94GB, Used Percent: 0.91%, Total Memory: 68.64GB, Used Total Percent: 0.68%" Where: * Used Memory = :py:func:`.get_used_memory` * Available Memory = :py:func:`.get_available_memory` * Used Percent = :py:func:`.get_used_memory_percent` * Total Memory = :py:func:`.get_total_memory` * Used Percent Total = :py:func:`.get_used_total_memory_percent` :param pid: PID of the process to describe, defaults to the current process. :return: formatted string """ if pid is None: pid = os.getpid() return (f'Used Memory (CPU Side): ' f'{bytes_best_human_unit(get_used_memory(pid=pid))}, ' f'Available Memory: ' f'{bytes_best_human_unit(get_available_memory())}, ' f'Used Percent: ' f'{round(get_used_memory_percent(pid=pid), 2)}%, ' f'Total Memory: ' f'{bytes_best_human_unit(get_total_memory())}, ' f'Used Total Percent: ' f'{round(get_used_total_memory_percent(pid=pid), 2)}%')
[docs] def calculate_chunk_size(file_size): """ Calculate the chunk size for downloading / copying a file based on the file size and available memory. :param file_size: The size of the file to be downloaded / copied. :return: The calculated chunk size. """ # Get the total available virtual memory (in bytes) total_memory = psutil.virtual_memory().available # If the file size is less than 1% of the total memory, all in one chunk if file_size <= total_memory * 0.01: return file_size # If the file size is between 1% and 10% of the total memory, use 1% of the total memory as the chunk size elif file_size <= total_memory * 0.1: return int(total_memory * 0.01) # If the file size is larger than 10% of the total memory, use 0.1% of the total memory as the chunk size else: return int(total_memory * 0.001)
[docs] def gpu_memory_constraints(expressions: collections.abc.Iterable[str], extra_vars: dict[str, int | float] | None = None, mode=any, device: str | torch.device = 'cuda:0') -> bool: """ Evaluate a user boolean expression involving a GPU device's memory in bytes, used memory percent, and available VRAM memory in bytes. If you pass a non GPU device identifier to this method, it will always return ``False`` Available functions are: * kb(bytes to kilobytes) * mb(bytes to megabytes) * gb(bytes to gigabytes) * kib(bytes to kibibytes) * mib(bytes to mebibytes) * gib(bytes to gibibytes) Available values are: * used / u (memory currently used by the GPU device in bytes) * used_total_percent / utp (memory used by the GPU device, as percent of total VRAM memory, example: 25.4) * available / a (available memory remaining on the GPU device in bytes that can be used) * total / t (total memory on the GPU device in bytes) Example expressions: * ``used > gb(1)`` (when the device has used more than 1GB of memory) * ``used_total_percent > 25`` (when the device has used more than 25 percent of VRAM memory) * ``available < gb(2)`` (when the available memory on the device is less than 2GB) Expressions may not be longer than 128 characters. However, multiple expressions may be provided. :raise ValueError: if extra_vars overwrites a reserved variable name, or if ``device`` is not a ``str`` or ``torch.device`` object. :raise MemoryConstraintSyntaxError: on syntax errors or if the return value of an expression is not a boolean value. :param expressions: a list of expressions, if expressions is ``None`` or empty this function will return ``False``. :param extra_vars: extra integer or float variables :param mode: the standard library function 'any' (equating to OR all expressions) or the standard library function 'all' (equating to AND all expressions). The default is 'any' which ORs all expressions. :param device: GPU device string or torch.device object, defaults to 'cuda:0'. Can be CUDA (e.g., 'cuda:0') or XPU (e.g., 'xpu:0') devices. :return: Boolean result of the expression """ if not expressions: return False for expr in expressions: memory_constraint_syntax_check(expr) if not is_supported_gpu_device(device): return False device_type, device_index = _parse_gpu_device(device) if device_type == 'cuda': total_memory = torch.cuda.get_device_properties(device_index).total_memory with torch.cuda.device(device_index): reserved_memory = torch.cuda.memory_reserved() allocated_memory = torch.cuda.memory_allocated() free_memory = total_memory - reserved_memory elif device_type == 'xpu': total_memory = torch.xpu.get_device_properties(device_index).total_memory with torch.xpu.device(device_index): reserved_memory = torch.xpu.memory_reserved() allocated_memory = torch.xpu.memory_allocated() free_memory = total_memory - reserved_memory used = allocated_memory used_total_percent = (used / total_memory) * 100.0 available = free_memory total = total_memory functions = { 'gb': lambda x: x * 1000 ** 3, 'mb': lambda x: x * 1000 ** 2, 'kb': lambda x: x * 1000, 'gib': lambda x: x * 1024 ** 3, 'mib': lambda x: x * 1024 ** 2, 'kib': lambda x: x * 1024 } variables = { 'used': used, 'u': used, 'used_total_percent': used_total_percent, 'utp': used_total_percent, 'available': available, 'a': available, 'total': total, 't': total } if extra_vars: for key, value in extra_vars.items(): if key in variables or key in functions: raise ValueError( f'extra_vars cannot redefine reserved attribute: {key}') variables[key] = value interpreter = _eval.standard_interpreter( symtable=_eval.safe_builtins() | variables.copy() ) interpreter.symtable.update(functions) _messages.debug_log( f'GPU MEMORY CONSTRAINT TEST: {_types.fullname(gpu_memory_constraints)} constraint = ' f'[{", ".join(_textprocessing.quote_spaces(expressions))}], ' f'vars = {str(variables)}, mode={mode.__name__}') try: value = mode(interpreter( e, raise_errors=True, show_errors=False) for e in expressions) if not isinstance(value, bool): raise MemoryConstraintSyntaxError('Memory constraint must return a boolean value.') _messages.debug_log(f'GPU MEMORY CONSTRAINT TEST RESULT: {value}') return value except (Exception, NameError) as e: raise MemoryConstraintSyntaxError( f'Memory constraint syntax error: {e}')
[docs] class SizedConstrainedObjectCache(_memoize.ObjectCache): """ An object cache that can track cache memory use via the cached objects returned metadata. Your memoized function should return at least: ``object, dgenerate.memoize.CachedObjectMetadata(size=the_size)`` You must return a metadata object with the attribute ``size`` at the minimum. You may attach other metadata to the object as needed. """
[docs] def __init__(self, name): super().__init__(name) self._size = 0 self.register_on_un_cache(self._on_un_cache) self.register_on_cache(self._on_cache) self.register_on_clear(self._on_clear)
@property def size(self): """ Return the current cache size. """ return self._size @size.setter def size(self, value): """ Set the current cache size. """ self._size = value if self._size < 0: self._size = 0
[docs] def enforce_cpu_mem_constraints( self, constraints: typing.Iterable[str], size_var: str, new_object_size: int, mode: typing.Callable[[typing.Iterable], bool] = any ): """ Clear the cache if these CPU side memory constraints are met. See: :py:func:`memory_constraints` The constraint variable ``cache_size`` equates to the current cache size. :param constraints: :param size_var: Memory constraint expression variable name containing the ``new_object_size`` value. :param new_object_size: Size of the new object. :param mode: Logical and/or function on constraint expressions, ``any`` for or, ``all`` for and. :return: ``True`` if the cache was cleared, ``False`` otherwise """ _messages.debug_log( f'Object Cache: "{self.name}", enforcing CPU side memory constraints: {constraints}, mode={mode.__name__}') if memory_constraints(constraints, {size_var: new_object_size, 'cache_size': self.size}, mode=mode): _messages.debug_log( f'Object Cache: "{self.name}", cleared due to CPU side memory constraints being met.') self.clear() return True return False
[docs] def enforce_gpu_mem_constraints( self, constraints: typing.Iterable[str], size_var: str, new_object_size: int, device: str | torch.device, mode: typing.Callable[[typing.Iterable], bool] = any ): """ Clear the cache if these GPU side memory constraints are met. See: :py:func:`gpu_memory_constraints` The constraint variable ``cache_size`` equates to the current cache size. :param constraints: :param size_var: Memory constraint expression variable name containing the ``new_object_size`` value. :param new_object_size: Size of the new object. :param device: Device to check :param mode: Logical and/or function on constraint expressions, ``any`` for or, ``all`` for and. :return: ``True`` if the cache was cleared, ``False`` otherwise """ _messages.debug_log( f'Object Cache: "{self.name}", enforcing GPU side memory constraints: {constraints}, mode={mode.__name__}') if gpu_memory_constraints(constraints, {size_var: new_object_size, 'cache_size': self.size}, mode=mode, device=device): _messages.debug_log( f'Object Cache: "{self.name}", cleared due to GPU side memory constraints being met.') self.clear() return True return False
def _on_un_cache(self, cache, cached_object): self.size -= self.get_metadata(cached_object).size def _on_clear(self, cache): self.size = 0 def _on_cache(self, cache, cached_object): self.size += self.get_metadata(cached_object).size
[docs] def torch_gc(): """ Call ``torch.cuda.empty_cache()`` and ``torch.cuda.ipc_collect()`` for CUDA, and ``torch.xpu.empty_cache()`` for XPU devices. """ if _torchutil.is_cuda_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() if _torchutil.is_xpu_available(): torch.xpu.empty_cache()
# Note: torch.xpu does not have ipc_collect() equivalent