Source code for dgenerate.batchprocess.batchprocessor

# Copyright (c) 2023, Teriks
#
# dgenerate is distributed under the following BSD 3-Clause License
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections.abc
import inspect
import io
import os
import re
import typing

import asteval
import jinja2

import dgenerate.arguments as _arguments
import dgenerate.files as _files
import dgenerate.messages as _messages
import dgenerate.textprocessing as _textprocessing
import dgenerate.types as _types


[docs] class BatchProcessError(Exception): """ Thrown by :py:meth:`.BatchProcessor.run_file` and :py:meth:`.BatchProcessor.run_string` when an error in a batch processing script is encountered. """ pass
[docs] class BatchProcessor: """ Implements dgenerates batch processing scripts in a generified manner. This is the bare-bones implementation of the shell with nothing implemented for you except the ``\\print``, ``\\set``, ``\\setp``, and `\\unset`` directives. If you wish to create this object to run a dgenerate configuration, use :py:class:`dgenerate.batchprocess.ConfigRunner` """ invoker: typing.Callable[[collections.abc.Sequence[str]], int] """ Invoker function, responsible for executing lines recognized as shell commands. """ name: _types.Name """ Name of this batch processor, currently used in the hash bang version check directive and messages. """ version: _types.Version """ Version tuple for the version check hash bang directive. """ template_variables: dict[str, typing.Any] """ Live template variables. """ reserved_template_variables: set[str] """ These template variables cannot be set with the ``\\set`` or ``\\setp`` directive, or un-defined with the ``\\unset`` directive. """ template_functions: dict[str, typing.Callable[[typing.Any], typing.Any]] """ Functions available when templating is occurring. """ builtins: dict[str, typing.Callable[[typing.Any], typing.Any]] """ Safe python builtins that are always available as template functions and also usable with ``\\setp`` They may be overridden by functions defined in :py:attr:`dgenerate.batchprocess.BatchProcessor.template_functions` """ directives: dict[str, typing.Optional[typing.Callable[[collections.abc.Sequence[str]], int]]] """ Batch process directives, shell commands starting with a backslash. Dictionary of callable(list) -> int. The function should return a return code, 0 for success, anything else for failure. """ injected_args: collections.abc.Sequence[str] """ Shell arguments to inject at the end of every invocation. """ expand_vars: typing.Callable[[str], str] """ A function for expanding environmental variables, defaults to :py:func:`os.path.expandvars` """
[docs] def __init__(self, invoker: typing.Callable[[collections.abc.Sequence[str]], int], name: _types.Name, version: typing.Union[_types.Version, str], template_variables: typing.Optional[dict[str, typing.Any]] = None, reserved_template_variables: typing.Optional[set[str]] = None, template_functions: typing.Optional[ dict[str, typing.Callable[[typing.Any], typing.Any]]] = None, directives: dict[str, typing.Optional[typing.Callable[[list], None]]] = None, builtins: typing.Optional[dict[str, typing.Callable[[typing.Any], typing.Any]]] = None, injected_args: typing.Optional[collections.abc.Sequence[str]] = None): """ :param invoker: A function for invoking lines recognized as shell commands, should return a return code. :param name: The name of this batch processor, currently used in the version check directive and messages :param version: Version for version check hash bang directive. :param template_variables: Live template variables, the initial environment, this dictionary will be modified during runtime. :param reserved_template_variables: These template variable names cannot be set with the ``\\set`` or ``\\setp`` directive, or un-defined with the ``\\unset`` directive. :param template_functions: Functions available to Jinja2 :param directives: batch processing directive handlers, for: ``\\directives``. This is a dictionary of names to functions which accept a single parameter, a list of directive arguments, and return a return code. :param builtins: builtin functions available as template functions and ``\\setp`` functions. A safe default collection of functions is used if this is not specified. Builtins may be overridden by functions defined in ``template_functions`` :param injected_args: Arguments to be injected at the end of user specified arguments for every shell invocation. If ``-v/--verbose`` is present in ``injected_args`` debugging output will be enabled globally while the config runs, and not just for invocations. Passing ``-v/--verbose`` also disables handling of unhandled non :py:exc:`SystemExit` exceptions raised by config directive implementations, a stack trace will be printed when these exceptions are encountered. """ self._template_functions = None self.invoker = invoker self.name = name self.template_variables = template_variables if template_variables else dict() self.reserved_template_variables = reserved_template_variables if reserved_template_variables else set() self.template_functions = template_functions if template_functions else dict() self.directives = directives if directives else dict() self._directive_exceptions = False self.injected_args = injected_args if injected_args else [] self._current_line = 0 self._executing_text = None self._running_template_continuation = False if isinstance(version, str): self.version = _textprocessing.parse_version(version) else: self.version: tuple[int, int, int] = tuple(version) if len(self.version) != 3: raise ValueError( f'version tuple expected to contain three components: (major, minor, patch). received: {self.version}') self.expand_vars = os.path.expandvars if builtins is None: self.builtins = { 'abs': abs, 'all': all, 'any': any, 'ascii': ascii, 'bin': bin, 'bool': bool, 'bytearray': bytearray, 'bytes': bytes, 'callable': callable, 'chr': chr, 'complex': complex, 'dict': dict, 'divmod': divmod, 'enumerate': enumerate, 'filter': filter, 'float': float, 'format': format, 'frozenset': frozenset, 'getattr': getattr, 'hasattr': hasattr, 'hash': hash, 'hex': hex, 'int': int, 'iter': iter, 'len': len, 'list': list, 'map': map, 'max': max, 'min': min, 'next': next, 'object': object, 'oct': oct, 'ord': ord, 'pow': pow, 'range': range, 'repr': repr, 'reversed': reversed, 'round': round, 'set': set, 'slice': slice, 'sorted': sorted, 'str': str, 'sum': sum, 'tuple': tuple, 'type': type, 'zip': zip, } else: self.builtins = builtins
@property def directives_builtins_help(self): """ Returns a dictionary of help strings for directives that are built into the :py:class:`BatchProcessor` base class. """ return { 'set': 'Sets a template variable, accepts two arguments, the variable name and the value. ' 'Attempting to set a reserved template variable such as those pre-defined by dgenerate ' 'will result in an error. The second argument is accepted as a raw value, it is not shell ' 'parsed in any way, only stripped of leading and trailing whitespace.', 'sete': 'Sets a template variable to an array of shell arguments using shell parsing and expansion. ' 'For example, this could be utilized for convenient shell globbing: ' '\\setp my_files my_directory1/* my_directory2/*', 'setp': 'Sets a template variable to a (safely) evaluated Python expression, accepts two arguments, ' 'the variable name and the value. Attempting to set a reserved template variable such ' 'as those pre-defined by dgenerate will result in an error. Template variables can be ' 'referred to by name within a definition, EG: \\setp my_list [1, 2, my_var, 4]. Template ' 'functions are also available, EG: \\setp working_dir cwd(). Python unary and binary ' 'expression operators, python list slicing, and comprehensions are supported. ' 'This functionality is provided by the asteval package.', 'unset': 'Undefines a template variable previously set with \\set or \\setp, accepts one argument, ' 'the variable name. Attempting to unset a reserved variable such as those ' 'pre-defined by dgenerate will result in an error.', 'print': 'Prints all content to the right to stdout, no shell parsing of the argument occurs.', 'echo': 'Echo shell arguments with shell parsing and expansion.' } @property def current_line(self) -> int: """ The current line number in the file being processed. """ return self._current_line @property def executing_text(self) -> typing.Union[None, str]: """ The text / command line that is currently being executed, or that was last executed. """ return self._executing_text
[docs] def render_template(self, string: str, stream: bool = False) -> typing.Union[str, typing.Iterator[str]]: """ Render a template from a string :param string: the string containing the Jinja2 template. :param stream: Stream the results of generating this template line by line? :return: rendered string """ jinja_env = jinja2.Environment() for name, func in self.builtins.items(): if name in jinja_env.globals: continue jinja_env.globals[name] = func for name, func in self.template_functions.items(): jinja_env.globals[name] = func if len(inspect.signature(func).parameters) > 0: jinja_env.filters[name] = func if stream: def stream_generator(): buffer = '' try: for piece in jinja_env.from_string(string). \ stream(**self.template_variables): buffer += piece while '\n' in buffer: line, buffer = buffer.split('\n', 1) yield os.path.expandvars(line) if buffer: yield os.path.expandvars(buffer) except Exception as e: raise BatchProcessError(f'Template Render Error: {str(e).strip()}') return stream_generator() else: try: return self.expand_vars( jinja_env.from_string(string). render(**self.template_variables)) except Exception as e: raise BatchProcessError(f'Template Render Error: {str(e).strip()}')
def _look_for_version_mismatch(self, line_idx, line): versioning = re.match(r'#!\s+' + self.name + r'\s+([0-9]+\.[0-9]+\.[0-9]+)', line) if versioning: config_file_version = versioning.group(1) config_file_version_parts = config_file_version.split('.') cur_major_version = self.version[0] config_major_version = int(config_file_version_parts[0]) cur_minor_version = self.version[1] config_minor_version = int(config_file_version_parts[1]) version_str = '.'.join(str(i) for i in self.version) if cur_major_version != config_major_version: _messages.log( f'Failed version check (major version missmatch) on line {line_idx}, ' f'running an incompatible version of {self.name}! You are running version {version_str} ' f'and the config file specifies the required version: {config_file_version}' , underline=True, level=_messages.WARNING) elif cur_minor_version < config_minor_version: _messages.log( f'Failed version check (current minor version less than requested) ' f'on line {line_idx}, running an incompatible version of {self.name}! ' f'You are running version {version_str} and the config file specifies ' f'the required version: {".".join(config_file_version)}' , underline=True, level=_messages.WARNING) def _jinja_user_define(self, name, value): if not name.isidentifier(): raise BatchProcessError( f'Cannot define template variable "{name}" on line {self.current_line}, ' f'invalid identifier/name token, must be a valid python variable name / identifier.') if name in self.template_functions: raise BatchProcessError( f'Cannot define template variable "{name}" on line {self.current_line}, ' f'as that name is taken by a template function.') if name in self.reserved_template_variables: raise BatchProcessError( f'Cannot define template variable "{name}" on line {self.current_line}, ' f'as that name is a reserved variable name.') if name in self.builtins: raise BatchProcessError( f'Cannot define template variable "{name}" on line {self.current_line}, ' f'as that name is the name of a builtin function.') self.template_variables[name] = value def _jinja_user_undefine(self, name): if not name.isidentifier(): raise BatchProcessError( f'Cannot un-define template variable "{name}" on line {self.current_line}, ' f'invalid identifier/name token, must be a valid python variable name / identifier.') if name in self.template_functions: raise BatchProcessError( f'Cannot un-define template variable "{name}" on line {self.current_line}, ' f'as that name is taken by a template function.') if name in self.reserved_template_variables: raise BatchProcessError( f'Cannot un-define template variable "{name}" on line {self.current_line}, ' f'as that name is a reserved variable name.') if name in self.builtins: raise BatchProcessError( f'Cannot un-define template variable "{name}" on line {self.current_line}, ' f'as that name is the name of a builtin function.') try: self.template_variables.pop(name) except KeyError: raise BatchProcessError( f'Cannot un-define template variable "{name}" on line {self.current_line}, ' f'variable does not exist.') def _intepret_setp_value(self, value): interpreter = asteval.Interpreter( minimal=True, with_listcomp=True, with_dictcomp=True, with_setcomp=True, symtable=self.template_variables.copy()) if 'print' in interpreter.symtable: del interpreter.symtable['print'] interpreter.symtable.update(self.builtins) interpreter.symtable.update(self.template_functions) try: return interpreter.eval(value, show_errors=False, raise_errors=True) except Exception as e: raise BatchProcessError(f'\\setp eval error: {e}') def _directive_handlers(self, line): if line.startswith('\\setp'): directive_args = line.split(' ', 2) if len(directive_args) == 3: self._jinja_user_define( directive_args[1].strip(), self._intepret_setp_value( self.render_template(directive_args[2].strip()))) return True else: raise BatchProcessError( f'\\setp directive received less than 2 arguments, ' f'syntax is: \\setp name value') elif line.startswith('\\sete'): directive_args = line.split(' ', 2) if len(directive_args) == 3: try: self._jinja_user_define( directive_args[1].strip(), _textprocessing.shell_parse( self.render_template(directive_args[2].strip()))) except _textprocessing.ShellParseSyntaxError as e: raise BatchProcessError(e) return True else: raise BatchProcessError( f'\\sete directive received less than 2 arguments, ' f'syntax is: \\sete name args...') elif line.startswith('\\set'): directive_args = line.split(' ', 2) if len(directive_args) == 3: self._jinja_user_define( directive_args[1].strip(), self.render_template(directive_args[2].strip())) return True else: raise BatchProcessError( f'\\set directive received less than 2 arguments, ' f'syntax is: \\set name value') elif line.startswith('\\unset'): directive_args = line.split(' ', 1) if len(directive_args) == 2: self._jinja_user_undefine(directive_args[1].strip()) return True else: raise BatchProcessError( f'\\unset directive received less than 1 arguments, ' f'syntax is: \\unset name') elif line.startswith('\\print'): directive_args = line.split(' ', 1) if len(directive_args) == 2: _messages.log(self.render_template(directive_args[1].strip())) return True else: raise BatchProcessError( f'\\print directive received no arguments, ' f'syntax is: \\print value') elif line.startswith('\\echo'): directive_args = line.split(' ', 1) if len(directive_args) == 2: try: _messages.log(*_textprocessing.shell_parse( self.render_template(directive_args[1].strip()), expand_vars=False)) except _textprocessing.ShellParseSyntaxError as e: raise BatchProcessError(e) return True else: raise BatchProcessError( f'\\echo directive received no arguments, ' f'syntax is: \\echo args...') if line.startswith('{'): try: self._running_template_continuation = True self.run_file(self.render_template(line, stream=True)) finally: self._running_template_continuation = False return True elif line.startswith('\\'): directive_args = line.split(' ', 1) directive = directive_args[0].lstrip('\\') impl = self.directives.get(directive) if impl is None: raise BatchProcessError(f'Unknown directive "\\{directive}".') directive_args = directive_args[1:] try: if directive_args: return_code = impl( _textprocessing.shell_parse( self.render_template(directive_args[0].strip()), expand_vars=False)) else: return_code = impl([]) if return_code != 0: raise BatchProcessError( f'Directive error return code: {return_code}') except Exception as e: if self._directive_exceptions: raise e raise BatchProcessError(e) return True return False def _lex_and_run_invocation(self, invocation_string): raw_templated_string = self.render_template(invocation_string) try: shell_lexed = _textprocessing.shell_parse( raw_templated_string, expand_vars=False) except _textprocessing.ShellParseSyntaxError as e: raise BatchProcessError(e) for arg in self.injected_args: shell_lexed.append(arg) raw_injected_args = ' '.join(str(a) for a in self.injected_args) if raw_injected_args: cmd_info = raw_templated_string + ' ' + raw_injected_args else: cmd_info = raw_templated_string header = 'Processing Arguments: ' args_wrapped = \ _textprocessing.wrap( cmd_info, width=_textprocessing.long_text_wrap_width() - len(header), subsequent_indent=' ' * len(header)) _messages.log(header + args_wrapped, underline=True) return_code = self.invoker(shell_lexed) if return_code != 0: raise BatchProcessError( f'Invocation error return code: {return_code}') def _run_file(self, stream: typing.Iterator[str]): continuation = '' template_continuation = False normal_continuation = False def run_continuation(cur_line): nonlocal continuation, template_continuation, normal_continuation if not template_continuation: completed_continuation = (continuation + ' ' + cur_line).strip() else: completed_continuation = (continuation + cur_line).strip() template_continuation = False normal_continuation = False continuation = '' self._executing_text = completed_continuation if self._directive_handlers(completed_continuation): return self._lex_and_run_invocation(completed_continuation) last_line = None for line_idx, line_and_next in enumerate(_files.PeekReader(stream)): line: str next_line: typing.Optional[str] line, next_line = line_and_next line_strip = _textprocessing.remove_tail_comments(line)[1].strip() if not self._running_template_continuation: self._current_line = line_idx if line_strip == '' and not template_continuation: if continuation and last_line is not None: if last_line.startswith('-') and \ not last_line.endswith('\\'): run_continuation('') elif line_strip.startswith('#') and not template_continuation: self._look_for_version_mismatch(line_idx, line) elif line_strip.startswith('{') and not template_continuation and not normal_continuation: continuation += line template_continuation = True elif not template_continuation and (line_strip.endswith('\\') or next_line and next_line.lstrip().startswith('-')): continuation += ' ' + line_strip.strip().removesuffix('\\').strip() normal_continuation = True elif template_continuation: line_rstrip = _textprocessing.remove_tail_comments(line)[1].rstrip() if line_rstrip.endswith('!END'): run_continuation(line_rstrip.removesuffix('!END')) else: continuation += line else: run_continuation(line_strip) last_line = line_strip if continuation: run_continuation('')
[docs] def run_file(self, stream: typing.Iterator[str]): """ Process a batch processing script from a file stream. Technically, from an iterator over lines of text. :raise BatchProcessError: :param stream: A filestream in text read mode """ try: parsed, _ = _arguments.parse_known_args( self.injected_args, log_error=False ) except _arguments.DgenerateUsageError as e: raise BatchProcessError(f'Error parsing injected arguments: {str(e).strip()}') directive_exceptions_last = self._directive_exceptions if parsed.verbose: _messages.push_level(_messages.DEBUG) self._directive_exceptions = True try: self._run_file(stream) except BatchProcessError as e: raise BatchProcessError(f'Error on line {self.current_line}: {str(e).strip()}') finally: _messages.pop_level() self._directive_exceptions = directive_exceptions_last
[docs] def run_string(self, string: str): """ Process a batch processing script from a string :raise BatchProcessError: :param string: a string containing the script """ self.run_file(io.StringIO(string))
__all__ = _types.module_all()