# pylint: disable=too-many-lines
import json
import logging
import os
import shutil
import sys
import zipfile
from pathlib import Path
from tempfile import TemporaryFile
from uuid import uuid4
from botocore.exceptions import ClientError, WaiterError
from jinja2 import Environment, PackageLoader, select_autoescape
from jsonschema import Draft7Validator
from jsonschema.exceptions import ValidationError
from rpdk.core.fragment.generator import TemplateFragment
from rpdk.core.jsonutils.flattener import JsonSchemaFlattener
from . import __version__
from .boto_helpers import create_sdk_session
from .data_loaders import load_hook_spec, load_resource_spec, resource_json
from .exceptions import (
DownstreamError,
FragmentValidationError,
InternalError,
InvalidProjectError,
RPDKBaseException,
SpecValidationError,
)
from .fragment.module_fragment_reader import _get_fragment_file
from .jsonutils.pointer import fragment_decode, fragment_encode
from .jsonutils.utils import traverse
from .plugin_registry import load_plugin
from .type_name_resolver import TypeNameResolver
from .type_schema_loader import TypeSchemaLoader
from .upload import Uploader
LOG = logging.getLogger(__name__)
SETTINGS_FILENAME = ".rpdk-config"
SCHEMA_UPLOAD_FILENAME = "schema.json"
CONFIGURATION_SCHEMA_UPLOAD_FILENAME = "configuration-schema.json"
OVERRIDES_FILENAME = "overrides.json"
TARGET_INFO_FILENAME = "target-info.json"
INPUTS_FOLDER = "inputs"
EXAMPLE_INPUTS_FOLDER = "example_inputs"
TARGET_SCHEMAS_FOLDER = "target-schemas"
HOOK_ROLE_TEMPLATE_FILENAME = "hook-role.yaml"
RESOURCE_ROLE_TEMPLATE_FILENAME = "resource-role.yaml"
TYPE_NAME_RESOURCE_REGEX = "^[a-zA-Z0-9]{2,64}::[a-zA-Z0-9]{2,64}::[a-zA-Z0-9]{2,64}$"
TYPE_NAME_MODULE_REGEX = (
"^[a-zA-Z0-9]{2,64}::[a-zA-Z0-9]{2,64}::[a-zA-Z0-9]{2,64}::MODULE$"
)
TYPE_NAME_HOOK_REGEX = "^[a-zA-Z0-9]{2,64}::[a-zA-Z0-9]{2,64}::[a-zA-Z0-9]{2,64}$"
ARTIFACT_TYPE_RESOURCE = "RESOURCE"
ARTIFACT_TYPE_MODULE = "MODULE"
ARTIFACT_TYPE_HOOK = "HOOK"
DEFAULT_ROLE_TIMEOUT_MINUTES = 120 # 2 hours
# min and max are according to CreateRole API restrictions
# https://docs.aws.amazon.com/IAM/latest/APIReference/API_CreateRole.html
MIN_ROLE_TIMEOUT_SECONDS = 3600 # 1 hour
MAX_ROLE_TIMEOUT_SECONDS = 43200 # 12 hours
CFN_METADATA_FILENAME = ".cfn_metadata.json"
SETTINGS_VALIDATOR = Draft7Validator(
{
"properties": {
"artifact_type": {"type": "string"},
"language": {"type": "string"},
"typeName": {"type": "string", "pattern": TYPE_NAME_RESOURCE_REGEX},
"runtime": {"type": "string"},
"entrypoint": {"type": ["string", "null"]},
"testEntrypoint": {"type": ["string", "null"]},
"executableEntrypoint": {"type": ["string", "null"]},
"settings": {"type": "object"},
},
"required": ["language", "typeName", "runtime", "entrypoint"],
}
)
MODULE_SETTINGS_VALIDATOR = Draft7Validator(
{
"properties": {
"artifact_type": {"type": "string"},
"typeName": {"type": "string", "pattern": TYPE_NAME_MODULE_REGEX},
"settings": {"type": "object"},
},
"required": ["artifact_type", "typeName"],
}
)
HOOK_SETTINGS_VALIDATOR = Draft7Validator(
{
"properties": {
"artifact_type": {"type": "string"},
"language": {"type": "string"},
"typeName": {"type": "string", "pattern": TYPE_NAME_HOOK_REGEX},
"runtime": {"type": "string"},
"entrypoint": {"type": ["string", "null"]},
"testEntrypoint": {"type": ["string", "null"]},
"settings": {"type": "object"},
},
"required": ["language", "typeName", "runtime", "entrypoint"],
}
)
BASIC_TYPE_MAPPINGS = {
"string": "String",
"number": "Double",
"integer": "Integer",
"boolean": "Boolean",
}
MARKDOWN_RESERVED_CHARACTERS = frozenset({"^", "*", "+", ".", "(", "[", "{", "#"})
def escape_markdown(string):
"""Escapes the reserved Markdown characters."""
if not string:
return string
if string[0] in MARKDOWN_RESERVED_CHARACTERS:
return "\\{}".format(string)
return string
class Project: # pylint: disable=too-many-instance-attributes,too-many-public-methods
def __init__(self, overwrite_enabled=False, root=None):
self.overwrite_enabled = overwrite_enabled
self.root = Path(root) if root else Path.cwd()
self.settings_path = self.root / SETTINGS_FILENAME
self.type_info = None
self.artifact_type = None
self.language = None
self._plugin = None
self.settings = None
self.schema = None
self.configuration_schema = None
self._flattened_schema = None
self._marked_down_properties = {}
self.runtime = "noexec"
self.entrypoint = None
self.test_entrypoint = None
self.executable_entrypoint = None
self.fragment_dir = None
self.target_info = {}
self.env = Environment(
trim_blocks=True,
lstrip_blocks=True,
keep_trailing_newline=True,
loader=PackageLoader(__name__, "templates/"),
autoescape=select_autoescape(["html", "htm", "xml", "md"]),
)
self.env.filters["escape_markdown"] = escape_markdown
LOG.debug("Root directory: %s", self.root)
@property
def type_name(self):
return "::".join(self.type_info)
@type_name.setter
def type_name(self, value):
self.type_info = tuple(value.split("::"))
@property
def hypenated_name(self):
return "-".join(self.type_info).lower()
@property
def hyphenated_name_case_sensitive(self):
return "-".join(self.type_info)
@property
def schema_filename(self):
return f"{self.hypenated_name}.json"
@property
def configuration_schema_filename(self):
return "{}-configuration.json".format(self.hypenated_name)
@property
def schema_path(self):
return self.root / self.schema_filename
@property
def overrides_path(self):
return self.root / OVERRIDES_FILENAME
@property
def inputs_path(self):
return self.root / INPUTS_FOLDER
@property
def example_inputs_path(self):
return self.root / EXAMPLE_INPUTS_FOLDER
@property
def target_schemas_path(self):
return self.root / TARGET_SCHEMAS_FOLDER
@property
def target_info_path(self):
return self.root / TARGET_INFO_FILENAME
@staticmethod
def _raise_invalid_project(msg, e):
LOG.debug(msg, exc_info=e)
raise InvalidProjectError(msg) from e
def load_settings(self):
LOG.debug("Loading project file '%s'", self.settings_path)
try:
with self.settings_path.open("r", encoding="utf-8") as f:
raw_settings = json.load(f)
except json.JSONDecodeError as e:
self._raise_invalid_project(
f"Project file '{self.settings_path}' is invalid", e
)
# backward compatible
if "artifact_type" not in raw_settings:
raw_settings["artifact_type"] = ARTIFACT_TYPE_RESOURCE
if raw_settings["artifact_type"] == ARTIFACT_TYPE_RESOURCE:
self.validate_and_load_resource_settings(raw_settings)
elif raw_settings["artifact_type"] == ARTIFACT_TYPE_HOOK:
self.validate_and_load_hook_settings(raw_settings)
else:
self.validate_and_load_module_settings(raw_settings)
def validate_and_load_hook_settings(self, raw_settings):
try:
HOOK_SETTINGS_VALIDATOR.validate(raw_settings)
except ValidationError as e:
self._raise_invalid_project(
f"Project file '{self.settings_path}' is invalid", e
)
self.type_name = raw_settings["typeName"]
self.artifact_type = raw_settings["artifact_type"]
self.language = raw_settings["language"]
self.runtime = raw_settings["runtime"]
self.entrypoint = raw_settings["entrypoint"]
self.test_entrypoint = raw_settings["testEntrypoint"]
self.executable_entrypoint = raw_settings.get("executableEntrypoint")
self._plugin = load_plugin(raw_settings["language"])
self.settings = raw_settings.get("settings", {})
def validate_and_load_module_settings(self, raw_settings):
try:
MODULE_SETTINGS_VALIDATOR.validate(raw_settings)
except ValidationError as e:
self._raise_invalid_project(
f"Project file '{self.settings_path}' is invalid", e
)
self.type_name = raw_settings["typeName"]
self.artifact_type = raw_settings["artifact_type"]
self.settings = raw_settings.get("settings", {})
def validate_and_load_resource_settings(self, raw_settings):
try:
SETTINGS_VALIDATOR.validate(raw_settings)
except ValidationError as e:
self._raise_invalid_project(
f"Project file '{self.settings_path}' is invalid", e
)
self.type_name = raw_settings["typeName"]
self.artifact_type = raw_settings["artifact_type"]
self.language = raw_settings["language"]
self.runtime = raw_settings["runtime"]
self.entrypoint = raw_settings["entrypoint"]
self.test_entrypoint = raw_settings["testEntrypoint"]
self.executable_entrypoint = raw_settings.get("executableEntrypoint")
self._plugin = load_plugin(raw_settings["language"])
self.settings = raw_settings.get("settings", {})
def _write_example_schema(self):
self.schema = resource_json(
__name__, "data/examples/resource/initech.tps.report.v1.json"
)
self.schema["typeName"] = self.type_name
def _write(f):
json.dump(self.schema, f, indent=4)
f.write("\n")
self.safewrite(self.schema_path, _write)
def _write_example_hook_schema(self):
self.schema = resource_json(
__name__, "data/examples/hook/sse.verification.v1.json"
)
self.schema["typeName"] = self.type_name
def _write(f):
json.dump(self.schema, f, indent=4)
f.write("\n")
self.safewrite(self.schema_path, _write)
def _write_example_inputs(self):
shutil.rmtree(self.example_inputs_path, ignore_errors=True)
self.example_inputs_path.mkdir(exist_ok=True)
template = self.env.get_template("inputs.json")
properties = list(self.schema["properties"].keys())
for inputs_file in (
"inputs_1_create.json",
"inputs_1_update.json",
"inputs_1_invalid.json",
):
self.safewrite(
self.example_inputs_path / inputs_file,
template.render(
properties=properties[:-1], last_property=properties[-1]
),
)
def write_settings(self):
def _write_resource_settings(f):
executable_entrypoint_dict = (
{"executableEntrypoint": self.executable_entrypoint}
if self.executable_entrypoint
else {}
)
json.dump(
{
"artifact_type": self.artifact_type,
"typeName": self.type_name,
"language": self.language,
"runtime": self.runtime,
"entrypoint": self.entrypoint,
"testEntrypoint": self.test_entrypoint,
"settings": self.settings,
**executable_entrypoint_dict,
},
f,
indent=4,
)
f.write("\n")
def _write_module_settings(f):
json.dump(
{
"artifact_type": self.artifact_type,
"typeName": self.type_name,
"settings": self.settings,
},
f,
indent=4,
)
f.write("\n")
def _write_hook_settings(f):
executable_entrypoint_dict = (
{"executableEntrypoint": self.executable_entrypoint}
if self.executable_entrypoint
else {}
)
json.dump(
{
"artifact_type": self.artifact_type,
"typeName": self.type_name,
"language": self.language,
"runtime": self.runtime,
"entrypoint": self.entrypoint,
"testEntrypoint": self.test_entrypoint,
"settings": self.settings,
**executable_entrypoint_dict,
},
f,
indent=4,
)
f.write("\n")
if self.artifact_type == ARTIFACT_TYPE_RESOURCE:
self.overwrite(self.settings_path, _write_resource_settings)
elif self.artifact_type == ARTIFACT_TYPE_HOOK:
self.overwrite(self.settings_path, _write_hook_settings)
else:
self.overwrite(self.settings_path, _write_module_settings)
def init(self, type_name, language, settings=None):
self.artifact_type = ARTIFACT_TYPE_RESOURCE
self.type_name = type_name
self.language = language
self._plugin = load_plugin(language)
self.settings = settings or {}
self._write_example_schema()
self._write_example_inputs()
self._plugin.init(self)
self.write_settings()
def init_module(self, type_name):
self.artifact_type = ARTIFACT_TYPE_MODULE
self.type_name = type_name
self.settings = {}
self.write_settings()
def init_hook(self, type_name, language, settings=None):
self.artifact_type = ARTIFACT_TYPE_HOOK
self.type_name = type_name
self.language = language
self._plugin = load_plugin(language)
self.settings = settings or {}
self._write_example_hook_schema()
self._plugin.init(self)
self.write_settings()
def load_hook_schema(self):
if not self.type_info:
msg = "Internal error (Must load settings first)"
LOG.critical(msg)
raise InternalError(msg)
with self.schema_path.open("r", encoding="utf-8") as f:
self.schema = load_hook_spec(f)
def load_schema(self):
if not self.type_info:
msg = "Internal error (Must load settings first)"
LOG.critical(msg)
raise InternalError(msg)
with self.schema_path.open("r", encoding="utf-8") as f:
self.schema = load_resource_spec(f)
def load_configuration_schema(self):
if not self.schema:
msg = "Internal error (Must load type schema first)"
LOG.critical(msg)
raise InternalError(msg)
if "typeConfiguration" in self.schema:
configuration_schema = self.schema["typeConfiguration"]
configuration_schema["definitions"] = self.schema.get("definitions", {})
configuration_schema["typeName"] = self.type_name
self.configuration_schema = configuration_schema
def write_configuration_schema(self, path):
LOG.debug(
"Writing type configuration resource specification from resource specification: %s",
path,
)
def _write(f):
json.dump(self.configuration_schema, f, indent=4)
f.write("\n")
self.overwrite(path, _write)
@staticmethod
def overwrite(path, contents):
LOG.debug("Overwriting '%s'", path)
with path.open("w", encoding="utf-8") as f:
if callable(contents):
contents(f)
else:
f.write(contents)
def safewrite(self, path, contents):
if self.overwrite_enabled:
self.overwrite(path, contents)
else:
try:
with path.open("x", encoding="utf-8") as f:
if callable(contents):
contents(f)
else:
f.write(contents)
except FileExistsError:
LOG.info("File already exists, not overwriting '%s'", path)
def generate(
self,
endpoint_url=None,
region_name=None,
local_only=False,
target_schemas=None,
profile_name=None,
): # pylint: disable=too-many-arguments
if self.artifact_type == ARTIFACT_TYPE_MODULE:
return # for Modules, the schema is already generated in cfn validate
# generate template for IAM role assumed by cloudformation
# to provision resources if schema has handlers defined
if "handlers" in self.schema:
handlers = self.schema["handlers"]
permission = "Allow"
if self.artifact_type == ARTIFACT_TYPE_HOOK:
template = self.env.get_template("hook-role.yml")
path = self.root / HOOK_ROLE_TEMPLATE_FILENAME
else:
template = self.env.get_template("resource-role.yml")
path = self.root / RESOURCE_ROLE_TEMPLATE_FILENAME
LOG.debug("Writing Execution Role CloudFormation template: %s", path)
actions = {
action
for handler in handlers.values()
for action in handler.get("permissions", [])
}
# calculate IAM role max session timeout based on highest handler timeout
# with some buffer (70 seconds per minute)
max_handler_timeout = max(
(
handler.get("timeoutInMinutes", DEFAULT_ROLE_TIMEOUT_MINUTES)
for operation, handler in handlers.items()
),
default=DEFAULT_ROLE_TIMEOUT_MINUTES,
)
# max role session timeout must be between 1 hour and 12 hours
role_session_timeout = min(
MAX_ROLE_TIMEOUT_SECONDS,
max(MIN_ROLE_TIMEOUT_SECONDS, 70 * max_handler_timeout),
)
# gets rid of any empty string actions.
# Empty strings cannot be specified as an action in an IAM statement
actions.discard("")
# Check if handler has actions
if not actions:
actions.add("*")
permission = "Deny"
contents = template.render(
type_name=self.hyphenated_name_case_sensitive,
actions=sorted(actions),
permission=permission,
role_session_timeout=role_session_timeout,
)
self.overwrite(path, contents)
self.target_info = self._load_target_info(
endpoint_url,
region_name,
type_schemas=target_schemas,
local_only=local_only,
profile_name=profile_name,
)
self._plugin.generate(self)
def load(self):
try:
self.load_settings()
except FileNotFoundError as e:
self._raise_invalid_project(
f"Project file {self.settings_path} not found. Have you run 'init' or in a wrong directory?",
e,
)
if self.artifact_type == ARTIFACT_TYPE_MODULE:
self._load_modules_project()
elif self.artifact_type == ARTIFACT_TYPE_HOOK:
self._load_hooks_project()
else:
self._load_resources_project()
def _load_resources_project(self):
LOG.info("Validating your resource specification...")
try:
self.load_schema()
self.load_configuration_schema()
LOG.warning("Resource schema is valid.")
except FileNotFoundError as e:
self._raise_invalid_project("Resource schema not found.", e)
except SpecValidationError as e:
msg = "Resource schema is invalid: " + str(e)
self._raise_invalid_project(msg, e)
LOG.info("Validating your resource schema...")
def _load_modules_project(self):
LOG.info("Validating your module fragments...")
template_fragment = TemplateFragment(self.type_name, self.root)
try:
self._validate_fragments(template_fragment)
except FragmentValidationError as e:
msg = "Invalid template fragment: " + str(e)
self._raise_invalid_project(msg, e)
self.schema = template_fragment.generate_schema()
self.fragment_dir = template_fragment.fragment_dir
def _load_hooks_project(self):
LOG.info("Validating your hook specification...")
try:
self.load_hook_schema()
self.load_configuration_schema()
except FileNotFoundError as e:
self._raise_invalid_project("Hook specification not found.", e)
except SpecValidationError as e:
msg = "Hook specification is invalid: " + str(e)
self._raise_invalid_project(msg, e)
def _add_modules_content_to_zip(self, zip_file):
if not os.path.exists(self.root / SCHEMA_UPLOAD_FILENAME):
msg = "Module schema could not be found"
raise InternalError(msg)
zip_file.write(self.root / SCHEMA_UPLOAD_FILENAME, SCHEMA_UPLOAD_FILENAME)
file = _get_fragment_file(self.fragment_dir)
zip_file.write(
file,
arcname=file.replace(str(self.fragment_dir), "fragments/"),
)
@staticmethod
def _validate_fragments(template_fragment):
template_fragment.validate_fragments()
def submit(
self,
dry_run,
endpoint_url,
region_name,
role_arn,
use_role,
set_default,
profile_name,
): # pylint: disable=too-many-arguments
context_mgr = self._create_context_manager(dry_run)
with context_mgr as f:
# the default compression is ZIP_STORED, which helps with the
# file-size check on upload
args = {}
if sys.version_info >= (3, 8):
args = {"strict_timestamps": False}
with zipfile.ZipFile(f, mode="w", **args) as zip_file:
if self.configuration_schema:
with zip_file.open(
CONFIGURATION_SCHEMA_UPLOAD_FILENAME, "w"
) as configuration_file:
configuration_file.write(
json.dumps(self.configuration_schema, indent=4).encode(
"utf-8"
)
)
zip_file.write(self.settings_path, SETTINGS_FILENAME)
if self.artifact_type == ARTIFACT_TYPE_MODULE:
self._add_modules_content_to_zip(zip_file)
elif self.artifact_type == ARTIFACT_TYPE_HOOK:
self._add_hooks_content_to_zip(
zip_file, endpoint_url, region_name, profile_name
)
else:
self._add_resources_content_to_zip(zip_file)
self._add_overrides_file_to_zip(zip_file)
if dry_run:
LOG.error("Dry run complete: %s", self._get_zip_file_path().resolve())
else:
f.seek(0)
self._upload(
f,
endpoint_url,
region_name,
role_arn,
use_role,
set_default,
profile_name,
)
def _add_overrides_file_to_zip(self, zip_file):
try:
zip_file.write(self.overrides_path, OVERRIDES_FILENAME)
LOG.debug("%s found. Writing to package.", OVERRIDES_FILENAME)
except FileNotFoundError:
LOG.debug("%s not found. Not writing to package.", OVERRIDES_FILENAME)
def _add_resources_content_to_zip(self, zip_file):
zip_file.write(self.schema_path, SCHEMA_UPLOAD_FILENAME)
if os.path.isdir(self.inputs_path):
for filename in os.listdir(self.inputs_path):
absolute_path = self.inputs_path / filename
zip_file.write(absolute_path, INPUTS_FOLDER + "/" + filename)
LOG.debug("%s found. Writing to package.", filename)
else:
LOG.debug("%s not found. Not writing to package.", INPUTS_FOLDER)
self._plugin.package(self, zip_file)
cli_metadata = {}
try:
cli_metadata = self._plugin.get_plugin_information(self)
except AttributeError:
LOG.debug(
"Version info is not available for plugins, not writing to metadata file"
)
cli_metadata["cli-version"] = __version__
zip_file.writestr(CFN_METADATA_FILENAME, json.dumps(cli_metadata))
def _add_hooks_content_to_zip(
self, zip_file, endpoint_url=None, region_name=None, profile_name=None
):
zip_file.write(self.schema_path, SCHEMA_UPLOAD_FILENAME)
if os.path.isdir(self.inputs_path):
for filename in os.listdir(self.inputs_path):
absolute_path = self.inputs_path / filename
zip_file.write(absolute_path, INPUTS_FOLDER + "/" + filename)
LOG.debug("%s found. Writing to package.", filename)
else:
LOG.debug("%s not found. Not writing to package.", INPUTS_FOLDER)
target_info = {}
try:
target_info = (
self.target_info
if self.target_info
else self._load_target_info(
endpoint_url, region_name, profile_name=profile_name
)
)
except RPDKBaseException as e:
LOG.warning("Failed to load target info, attempting local...", exc_info=e)
try:
target_info = self._load_target_info(None, None, local_only=True)
except RPDKBaseException as ex:
LOG.warning("Failed to load target info, ignoring...", exc_info=ex)
if target_info:
zip_file.writestr(TARGET_INFO_FILENAME, json.dumps(target_info, indent=4))
for target_name, info in target_info.items():
filename = "{}.json".format(
"-".join(s.lower() for s in target_name.split("::"))
)
content = json.dumps(info.get("Schema", {}), indent=4).encode("utf-8")
zip_file.writestr(TARGET_SCHEMAS_FOLDER + "/" + filename, content)
LOG.debug("%s found. Writing to package.", filename)
self._plugin.package(self, zip_file)
cli_metadata = {}
try:
cli_metadata = self._plugin.get_plugin_information(self)
except AttributeError:
LOG.debug(
"Version info is not available for plugins, not writing to metadata file"
)
cli_metadata["cli-version"] = __version__
zip_file.writestr(CFN_METADATA_FILENAME, json.dumps(cli_metadata))
# pylint: disable=R1732
def _create_context_manager(self, dry_run):
# if it's a dry run, keep the file; otherwise can delete after upload
if dry_run:
return self._get_zip_file_path().open("wb")
return TemporaryFile("w+b")
def _get_zip_file_path(self):
return Path(f"{self.hypenated_name}.zip")
def generate_docs(self):
if self.artifact_type == ARTIFACT_TYPE_MODULE:
return
# generate the docs folder that contains documentation based on the schema
docs_path = self.root / "docs"
docs_attribute = (
self.configuration_schema
if self.artifact_type == ARTIFACT_TYPE_HOOK
else self.schema
)
if (
not self.type_info
or not docs_attribute
or "properties" not in docs_attribute
):
LOG.warning(
"Could not generate schema docs due to missing type info or schema"
)
return
target_names = (
self.target_info.keys()
if self.target_info
else {
target_name
for handler in self.schema.get("handlers", {}).values()
for target_name in handler.get("targetNames", [])
}
if self.artifact_type == ARTIFACT_TYPE_HOOK
else []
)
LOG.debug("Removing generated docs: %s", docs_path)
shutil.rmtree(docs_path, ignore_errors=True)
docs_path.mkdir(exist_ok=True)
LOG.debug("Writing generated docs")
# take care not to modify the master schema
docs_schema = json.loads(json.dumps(docs_attribute))
self._flattened_schema = JsonSchemaFlattener(
json.loads(json.dumps(docs_attribute))
).flatten_schema()
docs_schema["properties"] = {
name: self._set_docs_properties(name, value, (name,))
for name, value in self._flattened_schema[()]["properties"].items()
}
LOG.debug("Finished documenting nested properties")
ref = self._get_docs_primary_identifier(docs_schema)
getatt = self._get_docs_gettable_atts(docs_schema)
readme_path = docs_path / "README.md"
LOG.debug("Writing docs README: %s", readme_path)
readme_template = (
"hook-docs-readme.md"
if self.artifact_type == ARTIFACT_TYPE_HOOK
else "docs-readme.md"
)
template = self.env.get_template(readme_template)
contents = template.render(
type_name=self.type_name,
schema=docs_schema,
ref=ref,
getatt=getatt,
target_names=sorted(target_names),
)
self.safewrite(readme_path, contents)
def generate_image_build_config(self):
if not hasattr(self._plugin, "generate_image_build_config"):
raise InvalidProjectError(
f"Plugin for the {self.runtime} runtime does not support building an image"
)
return self._plugin.generate_image_build_config(self)
@staticmethod
def _get_docs_primary_identifier(docs_schema):
try:
primary_id = docs_schema["primaryIdentifier"]
if len(primary_id) == 1:
# drop /properties
primary_id_path = fragment_decode(primary_id[0], prefix="")[1:]
# at some point, someone might use a nested primary ID
if len(primary_id_path) == 1:
return primary_id_path[0]
LOG.warning("Nested primaryIdentifier found")
except (KeyError, ValueError):
pass
return None
@staticmethod
def _get_docs_gettable_atts(docs_schema):
def _get_property_description(prop):
path = fragment_decode(prop, prefix="")
name = path[-1]
try:
desc, _resolved_path, _parent = traverse(
docs_schema, path + ("description",)
)
except (KeyError, IndexError, ValueError):
desc = f"Returns the {name}
value."
return {"name": name, "description": desc}
return [
_get_property_description(prop)
for prop in docs_schema.get("readOnlyProperties", [])
]
def _set_docs_properties( # noqa: C901
self, propname, prop, proppath
): # pylint: disable=too-many-locals,too-many-statements
"""method sets markdown for each property;
1. Supports multiple types per property - done via flattened schema so `allOf`,
`anyOf`, `oneOf` combined into a collection then method iterates to reapply
itself to each type
2. Supports circular reference - done via pre calculating hypothetical .md file
path and name
which is reused once property is hit more than once
Args:
propname ([str]): property name
prop ([dict]): all the sub propeties
proppath ([tuple]): path of the property
Returns:
[dict]: modified sub dictionary with attached markdown
"""
types = ("jsontype", "yamltype", "longformtype")
jsontype, yamltype, longformtype = types
# reattach prop from reference
if "$ref" in prop:
ref = self._flattened_schema[prop["$ref"]]
propname = prop["$ref"][1]
# this is to tie object to a definition and not to a property
proppath = (propname,)
prop = ref
# this means method is traversing already visited property
if propname in self._marked_down_properties:
return {
property_item: markdown
for property_item, markdown in self._marked_down_properties[
propname
].items()
if property_item in types
} # returning already set markdown
proppath_ptr = fragment_encode(("properties",) + proppath, prefix="")
if (
"createOnlyProperties" in self.schema
and proppath_ptr in self.schema["createOnlyProperties"]
):
prop["createonly"] = True
if (
"conditionalCreateOnlyProperties" in self.schema
and proppath_ptr in self.schema["conditionalCreateOnlyProperties"]
):
prop["conditionalCreateOnly"] = True
if (
"readOnlyProperties" in self.schema
and proppath_ptr in self.schema["readOnlyProperties"]
):
prop["readonly"] = True
# join multiple types
def __join(item1, item2):
if not item1 or item2 == item1:
return item2
return "{}, {}".format(item1, item2)
def __set_property_type(prop_type, single_type=True):
nonlocal prop
# mark down formatting of the target value - used for complex objects
# ($ref) and arrays of such objects
markdown_lambda = (
lambda fname, name: f'{name}' # noqa: B950, C0301
)
type_json = type_yaml = type_longform = "Unknown"
if prop_type in BASIC_TYPE_MAPPINGS:
# primitives should not occur for circular ref;
type_json = type_yaml = type_longform = BASIC_TYPE_MAPPINGS[prop_type]
elif prop_type == "array":
# lambdas to reuse formatting
markdown_json = (
lambda markdown_value: f"[ {markdown_value}, ... ]"
) # noqa: E731
markdown_yaml = lambda markdown_value: ( # noqa: E731
f"\n - {markdown_value}"
if single_type
else f"[ {markdown_value}, ... ]"
)
markdown_long = (
lambda markdown_value: f"List of {markdown_value}"
) # noqa: E731
# potential circular ref
# setting up markdown before going deep in the heap to reuse markdown
if "$ref" in prop["items"]:
sub_propname = prop["items"]["$ref"][1] # get target sub property
sub_proppath = (sub_propname,)
# calculating hypothetical markdown property before
# actually traversing it - this way it could be reused -
# same .md doc could be attached to both instances
hypothetical = markdown_lambda(
"-".join(sub_proppath).lower() + ".md", sub_propname
)
# assigning hypothetical .md file reference before circular
# property gets boundary so when boundary is hit it reuses
# the same document
self._marked_down_properties[propname] = {
jsontype: markdown_json(hypothetical),
yamltype: markdown_yaml(hypothetical),
longformtype: markdown_long(hypothetical),
}
# traverse nested propertes
prop["arrayitems"] = arrayitems = self._set_docs_properties(
propname, prop["items"], proppath
)
# arrayitems should be similar to markdown of hypothetical target
# if there is an object ref
# arrayitems are could not be used for hypothetical values
# as could not be populated before traversing down
type_json = markdown_json(arrayitems[jsontype])
type_yaml = markdown_yaml(arrayitems[yamltype])
type_longform = markdown_long(arrayitems[longformtype])
elif prop_type == "object":
template = self.env.get_template("docs-subproperty.md")
docs_path = self.root / "docs"
object_properties = (
prop.get("properties") or prop.get("patternProperties") or {}
)
type_json = type_yaml = type_longform = "Map"
if object_properties:
subproperty_name = " ".join(proppath)
subproperty_filename = "-".join(proppath).lower() + ".md"
subproperty_path = docs_path / subproperty_filename
type_json = type_yaml = type_longform = markdown_lambda(
subproperty_filename, propname
)
# potential circular ref
# setting up markdown before going deep in the heap
# to reuse markdown
self._marked_down_properties[propname] = {
jsontype: type_json,
yamltype: type_json,
longformtype: type_json,
}
prop["properties"] = {
name: self._set_docs_properties(name, value, proppath + (name,))
for name, value in object_properties.items()
}
LOG.debug(
"Writing docs %s: %s", subproperty_filename, subproperty_path
)
contents = template.render(
type_name=self.type_name,
subproperty_name=subproperty_name,
schema=prop,
)
self.safewrite(subproperty_path, contents)
prop[jsontype] = __join(prop.get(jsontype), type_json)
prop[yamltype] = __join(prop.get(yamltype), type_yaml)
prop[longformtype] = __join(prop.get(longformtype), type_longform)
if "enum" in prop:
prop["allowedvalues"] = prop["enum"]
prop_type = prop.get("type", "object")
single_item = False
if not isinstance(prop_type, list):
prop_type = [prop_type]
single_item = True
for prop_item in prop_type:
if isinstance(prop_item, tuple): # if tuple, then it's a ref
# using doc method to generate the mdo and reassign the ref
resolved = self._set_docs_properties(
propname, {"$ref": prop_item}, proppath
)
prop[jsontype] = __join(prop.get(jsontype), resolved[jsontype])
prop[yamltype] = __join(prop.get(yamltype), resolved[yamltype])
prop[longformtype] = __join(
prop.get(longformtype), resolved[longformtype]
)
else:
__set_property_type(prop_item, single_type=single_item)
return prop
def _upload(
self,
fileobj,
endpoint_url,
region_name,
role_arn,
use_role,
set_default,
profile_name,
): # pylint: disable=too-many-arguments, too-many-locals
LOG.debug("Packaging complete, uploading...")
session = create_sdk_session(region_name, profile_name)
LOG.debug("Uploading to region '%s'", session.region_name)
cfn_client = session.client("cloudformation", endpoint_url=endpoint_url)
s3_client = session.client("s3")
uploader = Uploader(cfn_client, s3_client)
if use_role and not role_arn and "handlers" in self.schema:
LOG.debug("Creating execution role for provider to use")
if self.artifact_type == ARTIFACT_TYPE_HOOK:
role_template_file = HOOK_ROLE_TEMPLATE_FILENAME
else:
role_template_file = RESOURCE_ROLE_TEMPLATE_FILENAME
role_arn = uploader.create_or_update_role(
self.root / role_template_file, self.hypenated_name
)
s3_url = uploader.upload(self.hypenated_name, fileobj)
LOG.debug("Got S3 URL: %s", s3_url)
log_delivery_role = uploader.get_log_delivery_role_arn()
LOG.debug("Got Log Role: %s", log_delivery_role)
kwargs = {
"Type": self.artifact_type,
"TypeName": self.type_name,
"SchemaHandlerPackage": s3_url,
"ClientRequestToken": str(uuid4()),
"LoggingConfig": {
"LogRoleArn": log_delivery_role,
"LogGroupName": f"{self.hypenated_name}-logs",
},
}
if role_arn and use_role:
kwargs["ExecutionRoleArn"] = role_arn
try:
response = cfn_client.register_type(**kwargs)
except ClientError as e:
LOG.debug("Registering type resulted in unknown ClientError", exc_info=e)
raise DownstreamError("Unknown CloudFormation error") from e
else:
self._wait_for_registration(
cfn_client, response["RegistrationToken"], set_default
)
@staticmethod
def _wait_for_registration(cfn_client, registration_token, set_default):
registration_waiter = cfn_client.get_waiter("type_registration_complete")
try:
LOG.warning(
"Successfully submitted type. "
"Waiting for registration with token '%s' to complete.",
registration_token,
)
registration_waiter.wait(RegistrationToken=registration_token)
except WaiterError as e:
LOG.warning(
"Failed to register the type with registration token '%s'.",
registration_token,
)
try:
response = cfn_client.describe_type_registration(
RegistrationToken=registration_token
)
except ClientError as describe_error:
LOG.debug(
"Describing type registration resulted in unknown ClientError",
exc_info=e,
)
raise DownstreamError(
"Error describing type registration"
) from describe_error
LOG.warning(
"Please see response for additional information: '%s'", response
)
raise DownstreamError("Type registration error") from e
LOG.warning("Registration complete.")
response = cfn_client.describe_type_registration(
RegistrationToken=registration_token
)
LOG.warning(response)
if set_default:
arn = response["TypeVersionArn"]
try:
cfn_client.set_type_default_version(Arn=arn)
except ClientError as e:
LOG.debug(
"Setting default version resulted in unknown ClientError",
exc_info=e,
)
raise DownstreamError("Error setting default version") from e
LOG.warning("Set default version to '%s", arn)
# flake8: noqa: C901
# pylint: disable=R0914
def _load_target_info(
self,
endpoint_url,
region_name,
type_schemas=None,
local_only=False,
profile_name=None,
): # pylint: disable=too-many-arguments
if self.artifact_type != ARTIFACT_TYPE_HOOK or not self.schema:
return {}
if type_schemas is None:
type_schemas = []
if os.path.isdir(self.target_schemas_path):
for filename in os.listdir(self.target_schemas_path):
absolute_path = self.target_schemas_path / filename
if absolute_path.is_file() and absolute_path.match("*.json"):
type_schemas.append(str(absolute_path))
local_info = {}
if os.path.isfile(self.target_info_path):
try:
with self.target_info_path.open("r", encoding="utf-8") as f:
local_info = json.load(f)
except json.JSONDecodeError as e:
self._raise_invalid_project(
f"Target info file '{self.target_info_path}' is invalid", e
)
target_names = {
target_name
for handler in self.schema.get("handlers", {}).values()
for target_name in handler.get("targetNames", [])
}
LOG.debug("Hook schema target names: %s", str(target_names))
if local_only:
targets = TypeNameResolver.resolve_type_names_locally(
target_names, local_info
)
loader = TypeSchemaLoader(None, None, local_only=local_only)
else:
session = create_sdk_session(region_name, profile_name)
cfn_client = session.client("cloudformation", endpoint_url=endpoint_url)
s3_client = session.client("s3")
targets = TypeNameResolver(cfn_client).resolve_type_names(target_names)
loader = TypeSchemaLoader(cfn_client, s3_client, local_only=local_only)
LOG.debug("Retrieving info for following target names: %s", str(targets))
type_info = loader.load_type_info(
targets, local_schemas=type_schemas, local_info=local_info
)
missing_targets = {target for target in targets if target not in type_info}
if missing_targets:
raise InvalidProjectError(
f"Type info missing for the following targets: {missing_targets}"
)
return type_info