"""Main module."""
import hashlib
import json
import operator
import os
import warnings
import ruamel.yaml
import ruamel.yaml.constructor
import ruamel.yaml.error
from jinja2 import (
Environment,
FileSystemLoader,
StrictUndefined,
Template,
select_autoescape,
)
from ruamel.yaml import MappingNode, SafeConstructor, ScalarNode, SequenceNode
from modularyze.construct import from_yaml_constructor, from_yaml_multi_constructor
from modularyze.utils import is_class, is_local, is_public
[docs]class ConfBuilder:
"""This configuration builder works with a jinja2 templated yaml
file. It first renders the template, then parses the yaml file.
It can be used to directly instantiate almost any object, provided
that it's type has been registered first.
"""
def __init__(self, yaml=None, attr_sep="."):
self.yaml = yaml if yaml else ruamel.yaml.YAML(typ="safe")
self.attr_sep = attr_sep
self.registered_callables = {}
self.registered_multi_callables = {}
@staticmethod
def _get_template(spec, root_path=None):
if isinstance(spec, str):
if os.path.isfile(os.path.join(root_path or "", spec)):
return ConfBuilder._template_from_file(spec, root_path=root_path)
return ConfBuilder._template_from_document(spec, root_path=root_path)
@staticmethod
def _template_from_file(conf_file, root_path=None):
if root_path is None:
root_path = os.path.dirname(conf_file)
conf_file = os.path.basename(conf_file)
env = Environment(
loader=FileSystemLoader(root_path),
autoescape=select_autoescape(["html", "xml"]),
trim_blocks=True,
lstrip_blocks=True,
undefined=StrictUndefined,
)
template = env.get_template(conf_file)
return template
@staticmethod
def _template_from_document(document, root_path=None):
if root_path:
env = Environment(
loader=FileSystemLoader(root_path),
autoescape=select_autoescape(["html", "xml"]),
trim_blocks=True,
lstrip_blocks=True,
undefined=StrictUndefined,
)
template = env.from_string(document)
return template
return Template(document)
@staticmethod
def _vanilla_yaml_constructors():
"""The default parser created by `ruamel.yaml.YAML` inherits any
constructors that were added since the last kernel restart. This is
due to the odd fact that these constructors are a class method instead
of an instance method.
This method returns the default constructors added to the SafeConstructor
class when first initialized. Return a dict that maps the tag to it's constructor.
"""
default_types = [
"null",
"bool",
"int",
"float",
"binary",
"timestamp",
"omap",
"pairs",
"set",
"str",
"seq",
"map",
]
default_constructors = {
f"tag:yaml.org,2002:{t}": getattr(SafeConstructor, f"construct_yaml_{t}")
for t in default_types
}
default_constructors.update({None: SafeConstructor.construct_undefined})
return default_constructors
@staticmethod
def _vanilla_yaml_multi_constructors():
"""This method returns the default multi-constructors added to the SafeConstructor
class when first initialized (there are none).
See :meth:`_vanilla_yaml_constructors` for more."""
return {}
def _reset_yaml_constructors(self):
self.yaml.Constructor.yaml_constructors = self._vanilla_yaml_constructors()
self.yaml.Constructor.yaml_multi_constructors = (
self._vanilla_yaml_multi_constructors()
)
@property
def constructors(self):
"""Return registered constructors and multi-constructors"""
return {
"constructors": self.yaml.Constructor.yaml_constructors,
"multi_constructors": self.yaml.Constructor.yaml_multi_constructors,
}
[docs] @staticmethod
def default_tag_name(obj):
"""The default tag name for an object is simply the
object's name prefixed by an exclamation point"""
return f"!{obj.__name__}"
[docs] @staticmethod
def get_data(loader, node):
"""Convert node to it's builtin representation
ScalarNode -> one of bool, int, float, etc...
SequenceNode -> list, tuple, ...
MappingNode -> dictionary, omap, ...
"""
if isinstance(node, ScalarNode):
return loader.construct_scalar(node)
if isinstance(node, SequenceNode):
return loader.construct_sequence(node, deep=True)
if isinstance(node, MappingNode):
return loader.construct_mapping(node, deep=True)
raise ruamel.yaml.constructor.ConstructorError(
f"Type of node not understood. Received {type(node)} but expected "
f"`ScalarNode`, `SequenceNode`, `MappingNode` or subtype"
)
[docs] def vanilla_load(self, document, ignore_unknown=False):
"""Reset the yaml loader by temporarily clearing it's constructors, load the
document, then add back the constructors.
If `ignore_unknown`, unrecognized tags will be replaced by None, otherwise they will
be converted to their text representation.
*Note:* This relies on monkey-patching the yaml parser, so use with care. See code for more."""
# Temporarily reset the constructors to their defaults, add a
# wildcard constructor in order to not actually instantiate foreign objects
# Capture custom constructors
registered_constructors = self.yaml.Constructor.yaml_constructors
registered_multi_constructors = self.yaml.Constructor.yaml_multi_constructors
# Reset them to their defaults
self._reset_yaml_constructors()
# Add wildcard constructor to ignore unknown tags or turn them to strings
def wildcard_constructor(loader, node):
if not ignore_unknown:
return {node.tag: self.get_data(loader, node)}
self.yaml.Constructor.add_constructor(None, wildcard_constructor)
# Parse document to get an un-instantiated/normalized one
normalized_conf = self.yaml.load(document)
# Restore all constructors
self.yaml.Constructor.yaml_constructors = registered_constructors
self.yaml.Constructor.yaml_multi_constructors = registered_multi_constructors
return normalized_conf
@staticmethod
def _warn_override(new_callables, old_callables, warning_msg):
updated_tags_from_named = set(new_callables.keys()) & set(old_callables.keys())
for tag in updated_tags_from_named:
src, dst = old_callables[tag], new_callables[tag]
if src != dst:
msg = warning_msg.format(tag=tag, src=src, dst=dst)
warnings.warn(msg, RuntimeWarning)
def _validate_callables(self, *callables, warning_msg=None, **named_callables):
"""Name all callables with unspecified tags, merge with name callables, and
warn if any un-named callables clobber the named ones."""
default_callables = {self.default_tag_name(cls): cls for cls in callables}
all_callables = {}
all_callables.update(default_callables)
all_callables.update(
{k.rstrip(self.attr_sep): v for k, v in named_callables.items()}
)
if warning_msg is None:
warning_msg = (
"Constructor for YAML tag '{tag}' overridden from {src} to {dst}"
)
# Check for overrides of callables by named_callables
self._warn_override(named_callables, default_callables, warning_msg)
# Check for overrides of existing callables by new callables
self._warn_override(all_callables, self.registered_callables, warning_msg)
# Check for overrides of existing multi-callables by new callables
self._warn_override(all_callables, self.registered_multi_callables, warning_msg)
return all_callables
[docs] def register_constructors(self, *callables, **named_callables):
"""For an object to be instantiated, or a function to be called,
when loading the configuration from yaml, it needs to be first
registered by the builder. This registration creates a default
constructor for that callable that yaml will use when it parses
that callable's tag.
A tag can either be specified, or if left out, a default tag will
be used (see :meth:`default_tag_name`).
Args:
callables: classes or functions to register_constructors with a default tag
named_callables: other callables with specified tag names
"""
all_callables = self._validate_callables(*callables, **named_callables)
self.registered_callables.update(all_callables)
# Register callables
for tag_name, cls in all_callables.items():
if getattr(cls, "from_yaml", None) is None:
self.yaml.Constructor.add_constructor(
tag_name, from_yaml_constructor(cls)
)
else:
self.yaml.register_class(cls)
[docs] def register_constructors_from_modules(
self, *modules, filter_funcs=None, **named_modules
):
"""It's often helpful to `register_constructors` from a module.
This method registers all attributes of a given module that
satisfy the filter functions, prepending a given prefix to
the autogenerated tag name.
This is similar to using `from package import *` as it will clutter
the namespace and probably over-`register_constructors`. It is often
best to use `register_multi_constructors_from_modules`.
*Note:* This might fail if the `__name__` attribute is not set on either an
unnamed module, or any sub-module/callable.
Args:
modules: List of modules to register implicitly. These modules will be
registered using an inferred tag-prefix of '!<module name>'.
named_modules: Dictionary of tag-prefixes -> modules to register explicitly.
The modules passed as kwargs will be registered with the key as their tag.
filter_funcs:
these provide a way to limit registration by filtering
out any callables that don't meet a requirement. By
default any private attribute or non-class attribute
is not registered.
"""
all_callables = self._validate_callables(*modules, **named_modules)
filter_funcs = filter_funcs if filter_funcs else [is_class, is_public, is_local]
for prefix, module in all_callables.items():
prefix = prefix.rstrip(self.attr_sep)
attrs = [
(attr_name, getattr(module, attr_name)) for attr_name in dir(module)
]
named_callables = {
f"{prefix}{self.attr_sep}{attr.__name__}": attr
for attr_name, attr in attrs
if all(f(attr_name, attr, module) for f in filter_funcs)
}
self.register_constructors(**named_callables)
[docs] def register_multi_constructors(self, *callables, **named_callables):
"""This registration creates a default multi-constructor for the given callables.
A tag prefix can either be specified, or if left out, a default tag prefix will
be used (see :meth:`default_tag_name`).
Args:
callables: classes or functions to register a multi-constructors with a default tag prefix
named_callables: other callables with specified tag prefix
"""
warning_msg = (
"Multi-constructor for YAML tag '{tag}' overridden from {src} to {dst}"
)
all_callables = self._validate_callables(
*callables, warning_msg=warning_msg, **named_callables
)
self.registered_multi_callables.update(all_callables)
# Register multi-callables
for prefix, module in all_callables.items():
self.yaml.Constructor.add_multi_constructor(
prefix, from_yaml_multi_constructor(module, attr_sep=self.attr_sep)
)
[docs] def register_multi_constructors_from_modules(self, *modules, **named_modules):
"""This method registers modules as multi-constructors enabling
submodules to be accessed dynamically and lazy loaded.
This is favored over `register_constructors_from_modules` as it
only adds one multi-constructor per module instead of numerous single
use constructors per module.
Args:
modules: List of modules to register implicitly. These modules will be
registered using an inferred tag-prefix of '!<module name>'.
Note that this might fail if the `__name__` attribute is not set.
named_modules: Dictionary of tag-prefixes -> modules to register explicitly.
The modules passed as kwargs will be registered with the key as their tag.
"""
self.register_multi_constructors(*modules, **named_modules)
[docs] def render(self, spec, context=None, root_path=None):
"""Method responsible for rendering the config template. If context
is a valid (simple) yaml file then it will be used as context for
jinja's rendering, otherwise context should be a dictionary or object
that can be unpacked using the double-star operator (**).
Args:
spec: config path or string
context: context to pass to templating engine
root_path: path of config directory (only needed if config is multi-file)
Returns:
A rendered template, which is the fully formed YAML config file.
"""
if isinstance(context, str):
if os.path.isfile(context):
with open(context) as f:
context = f.read()
context = self.vanilla_load(context, ignore_unknown=True)
template = self._get_template(spec, root_path=root_path)
return template.render(**({} if context is None else context))
[docs] def normalize(self, spec, sort_keys=False, raw=False, context=None, root_path=None):
"""Get the config pre-initialization of members as a normalized representation.
Args: Same as :meth:`render`
Returns: Normalized representation of configuration as a string.
"""
document = self.render(spec, context=context, root_path=root_path)
normalized_conf = self.vanilla_load(document, ignore_unknown=False)
if raw:
return normalized_conf
return json.dumps(normalized_conf, indent=2, sort_keys=sort_keys)
[docs] def load(self, document):
"""Load and instantiate the rendered config (full-YAML document).
Args: document: YAML document as a string.
Returns: Config object (dict, list, etc...)
"""
try:
return self.yaml.load(document)
except ruamel.yaml.constructor.ConstructorError:
raise
except ruamel.yaml.error.MarkedYAMLError as e:
line = operator.attrgetter("problem_mark.line")(e)
line = line or operator.attrgetter("context_mark.line")(e)
column = operator.attrgetter("problem_mark.column")(e)
column = column or operator.attrgetter("context_mark.column")(e)
if line and column:
line, column = int(line), int(column)
snippet = document.splitlines()
snippet = snippet[line - 3 : line + 4]
snippet = "\n".join(snippet)
note = getattr(e, "note")
note = note if note else ""
note += f"\n\nError occurred around here:\n\n{snippet}"
e.note = note
raise
[docs] def hash(self, spec, context=None, root_path=None):
"""Hash a config. This hash will depend on the spec (conf string or file)
as well as any context it depends on.
For the hash to be computed, the config is first normalized (see :meth:`normalize`)
as a key-sorted, json-dumped string and then hashed using the SHA3-256 algorithm.
Args:
Same as :meth:`render`
Returns:
The config's unique hash as an int
"""
hash_obj = hashlib.sha3_256(
self.normalize(
spec, sort_keys=True, raw=False, context=context, root_path=root_path
).encode("utf-8")
)
return int.from_bytes(hash_obj.digest(), "big")
[docs] def build(self, spec, context=None, root_path=None):
"""Build the final configuration by first rendering the spec as a template
and then building the resulting YAML document.
Args:
Same as :meth:`render`
Returns: Config object (dict, list, etc...)
"""
document = self.render(spec, context=context, root_path=root_path)
return self.load(document)
[docs] def __call__(self, *args, **kwargs):
"""Alias of :meth:`build`"""
return self.build(*args, **kwargs)