// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`OpenAPI Gateway Python With Docs Unit Tests With Docs 1`] = `
Object {
".gitattributes": "# ~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".
/.gitattributes linguist-generated
/.github/workflows/pull-request-lint.yml linguist-generated
/.gitignore linguist-generated
/.projen/** linguist-generated
/.projen/deps.json linguist-generated
/.projen/files.json linguist-generated
/.projen/tasks.json linguist-generated
/generated/README.md linguist-generated
/MANIFEST.in linguist-generated
/requirements-dev.txt linguist-generated
/requirements.txt linguist-generated
/setup.py linguist-generated",
".github/workflows/pull-request-lint.yml": "# ~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".
name: pull-request-lint
on:
pull_request_target:
types:
- labeled
- opened
- synchronize
- reopened
- ready_for_review
- edited
jobs:
validate:
name: Validate PR title
runs-on: ubuntu-latest
permissions:
pull-requests: write
steps:
- uses: amannn/action-semantic-pull-request@v5.0.2
env:
GITHUB_TOKEN: \${{ secrets.GITHUB_TOKEN }}
with:
types: |-
feat
fix
chore
requireScope: false
",
".gitignore": "# ~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".
node_modules/
!/.gitattributes
!/.projen/tasks.json
!/.projen/deps.json
!/.projen/files.json
!/.github/workflows/pull-request-lint.yml
/.env
!/requirements.txt
!/requirements-dev.txt
!/setup.py
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
*.manifest
*.spec
pip-log.txt
pip-delete-this-directory.txt
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
*.mo
*.pot
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
instance/
.webassets-cache
.scrapy
docs/_build/
.pybuilder/
target/
.ipynb_checkpoints
profile_default/
ipython_config.py
__pypackages__/
celerybeat-schedule
celerybeat.pid
*.sage.py
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
.spyderproject
.spyproject
.ropeproject
/site
.mypy_cache/
.dmypy.json
dmypy.json
.pyre/
.pytype/
cython_debug/
!/generated/README.md
!/MANIFEST.in
",
".projen/deps.json": Object {
"//": "~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".",
"dependencies": Array [
Object {
"name": "projen",
"type": "devenv",
"version": "99.99.99",
},
Object {
"name": "twine",
"type": "devenv",
"version": "3.3.0",
},
Object {
"name": "wheel",
"type": "devenv",
"version": "0.36.2",
},
Object {
"name": "aws_prototyping_sdk.open_api_gateway",
"type": "runtime",
},
Object {
"name": "aws-cdk-lib",
"type": "runtime",
},
Object {
"name": "cdk-nag",
"type": "runtime",
},
Object {
"name": "constructs",
"type": "runtime",
},
Object {
"name": "my_api_python",
"type": "runtime",
},
],
},
".projen/files.json": Object {
"//": "~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".",
"files": Array [
".gitattributes",
".github/workflows/pull-request-lint.yml",
".gitignore",
".projen/deps.json",
".projen/files.json",
".projen/tasks.json",
"generated/README.md",
"MANIFEST.in",
"requirements-dev.txt",
"requirements.txt",
"setup.py",
],
},
".projen/tasks.json": Object {
"//": "~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".",
"env": Object {
"PATH": "$(echo $PWD/.env/bin:$PATH)",
"VIRTUAL_ENV": "$(echo $PWD/.env)",
},
"tasks": Object {
"build": Object {
"description": "Full release build",
"name": "build",
"steps": Array [
Object {
"spawn": "default",
},
Object {
"spawn": "pre-compile",
},
Object {
"spawn": "compile",
},
Object {
"spawn": "post-compile",
},
Object {
"spawn": "test",
},
Object {
"spawn": "package",
},
],
},
"clobber": Object {
"condition": "git diff --exit-code > /dev/null",
"description": "hard resets to HEAD of origin and cleans the local repo",
"env": Object {
"BRANCH": "$(git branch --show-current)",
},
"name": "clobber",
"steps": Array [
Object {
"exec": "git checkout -b scratch",
"name": "save current HEAD in \\"scratch\\" branch",
},
Object {
"exec": "git checkout $BRANCH",
},
Object {
"exec": "git fetch origin",
"name": "fetch latest changes from origin",
},
Object {
"exec": "git reset --hard origin/$BRANCH",
"name": "hard reset to origin commit",
},
Object {
"exec": "git clean -fdx",
"name": "clean all untracked files",
},
Object {
"say": "ready to rock! (unpushed commits are under the \\"scratch\\" branch)",
},
],
},
"compile": Object {
"description": "Only compile",
"name": "compile",
},
"default": Object {
"description": "Synthesize project files",
"name": "default",
"steps": Array [
Object {
"exec": "python .projenrc.py",
},
],
},
"eject": Object {
"description": "Remove projen from the project",
"env": Object {
"PROJEN_EJECTING": "true",
},
"name": "eject",
"steps": Array [
Object {
"spawn": "default",
},
],
},
"install": Object {
"description": "Install and upgrade dependencies",
"name": "install",
"steps": Array [
Object {
"exec": "pip install --upgrade pip",
},
Object {
"exec": "pip install -r requirements.txt",
},
Object {
"exec": "pip install -r requirements-dev.txt",
},
],
},
"package": Object {
"description": "Creates the distribution package",
"name": "package",
"steps": Array [
Object {
"exec": "python setup.py sdist bdist_wheel",
},
],
},
"post-compile": Object {
"description": "Runs after successful compilation",
"name": "post-compile",
},
"pre-compile": Object {
"description": "Prepare the project for compilation",
"name": "pre-compile",
},
"publish": Object {
"description": "Uploads the package against a test PyPI endpoint.",
"name": "publish",
"steps": Array [
Object {
"exec": "twine upload dist/*",
},
],
},
"publish:test": Object {
"description": "Uploads the package against a test PyPI endpoint.",
"name": "publish:test",
"steps": Array [
Object {
"exec": "twine upload --repository-url https://test.pypi.org/legacy/ dist/*",
},
],
},
"test": Object {
"description": "Run tests",
"name": "test",
},
},
},
"MANIFEST.in": "recursive-include generated/python/dist/layer *",
"README.md": "# replace this",
"generated/.client-settings.json": Object {
"//": "~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".",
"clientLanguages": Array [
"python",
],
"documentationFormats": Array [
"html2",
"markdown",
"plantuml",
],
},
"generated/README.md": "## Generated Clients
This directory contains generated client code based on your OpenAPI Specification file (spec.yaml).
Like other \`projen\` managed files, this directory should be checked in to source control, but should not be edited manually.",
"generated/documentation/html2/.openapi-generator-ignore": "# OpenAPI Generator Ignore
# Generated by openapi-generator https://github.com/openapitools/openapi-generator
# Use this file to prevent files from being overwritten by the generator.
# The patterns follow closely to .gitignore or .dockerignore.
# As an example, the C# client generator defines ApiClient.cs.
# You can make changes and tell OpenAPI Generator to ignore just this file by uncommenting the following line:
#ApiClient.cs
# You can match any string of characters against a directory, file or extension with a single asterisk (*):
#foo/*/qux
# The above matches foo/bar/qux and foo/baz/qux, but not foo/bar/baz/qux
# You can recursively match patterns against a directory, file or extension with a double asterisk (**):
#foo/**/qux
# This matches foo/bar/qux, foo/baz/qux, and foo/bar/baz/qux
# You can also negate patterns with an exclamation (!).
# For example, you can ignore all files in a docs folder with the file extension .md:
#docs/*.md
# Then explicitly reverse the ignore rule for a single file:
#!docs/README.md
",
"generated/documentation/html2/.openapi-generator/FILES": ".openapi-generator-ignore
index.html
",
"generated/documentation/html2/.openapi-generator/VERSION": "6.3.0",
"generated/documentation/html2/index.html": "
Example API
Default
/hello
Usage and SDK Samples
curl -X GET \\\\
-H \\"Accept: application/json\\" \\\\
\\"http://localhost/hello?name=name_example\\"
import org.openapitools.client.*;
import org.openapitools.client.auth.*;
import org.openapitools.client.model.*;
import org.openapitools.client.api.DefaultApi;
import java.io.File;
import java.util.*;
public class DefaultApiExample {
public static void main(String[] args) {
// Create an instance of the API class
DefaultApi apiInstance = new DefaultApi();
String name = name_example; // String |
try {
SayHelloResponseContent result = apiInstance.sayHello(name);
System.out.println(result);
} catch (ApiException e) {
System.err.println(\\"Exception when calling DefaultApi#sayHello\\");
e.printStackTrace();
}
}
}
import org.openapitools.client.api.DefaultApi;
public class DefaultApiExample {
public static void main(String[] args) {
DefaultApi apiInstance = new DefaultApi();
String name = name_example; // String |
try {
SayHelloResponseContent result = apiInstance.sayHello(name);
System.out.println(result);
} catch (ApiException e) {
System.err.println(\\"Exception when calling DefaultApi#sayHello\\");
e.printStackTrace();
}
}
}
// Create an instance of the API class
DefaultApi *apiInstance = [[DefaultApi alloc] init];
String *name = name_example; // (default to null)
[apiInstance sayHelloWith:name
completionHandler: ^(SayHelloResponseContent output, NSError* error) {
if (output) {
NSLog(@\\"%@\\", output);
}
if (error) {
NSLog(@\\"Error: %@\\", error);
}
}];
var ExampleApi = require('example_api');
// Create an instance of the API class
var api = new ExampleApi.DefaultApi()
var name = name_example; // {String}
var callback = function(error, data, response) {
if (error) {
console.error(error);
} else {
console.log('API called successfully. Returned data: ' + data);
}
};
api.sayHello(name, callback);
using System;
using System.Diagnostics;
using Org.OpenAPITools.Api;
using Org.OpenAPITools.Client;
using Org.OpenAPITools.Model;
namespace Example
{
public class sayHelloExample
{
public void main()
{
// Create an instance of the API class
var apiInstance = new DefaultApi();
var name = name_example; // String | (default to null)
try {
SayHelloResponseContent result = apiInstance.sayHello(name);
Debug.WriteLine(result);
} catch (Exception e) {
Debug.Print(\\"Exception when calling DefaultApi.sayHello: \\" + e.Message );
}
}
}
}
<?php
require_once(__DIR__ . '/vendor/autoload.php');
// Create an instance of the API class
$api_instance = new OpenAPITools\\\\Client\\\\Api\\\\DefaultApi();
$name = name_example; // String |
try {
$result = $api_instance->sayHello($name);
print_r($result);
} catch (Exception $e) {
echo 'Exception when calling DefaultApi->sayHello: ', $e->getMessage(), PHP_EOL;
}
?>
use Data::Dumper;
use WWW::OPenAPIClient::Configuration;
use WWW::OPenAPIClient::DefaultApi;
# Create an instance of the API class
my $api_instance = WWW::OPenAPIClient::DefaultApi->new();
my $name = name_example; # String |
eval {
my $result = $api_instance->sayHello(name => $name);
print Dumper($result);
};
if ($@) {
warn \\"Exception when calling DefaultApi->sayHello: $@\\\\n\\";
}
from __future__ import print_statement
import time
import openapi_client
from openapi_client.rest import ApiException
from pprint import pprint
# Create an instance of the API class
api_instance = openapi_client.DefaultApi()
name = name_example # String | (default to null)
try:
api_response = api_instance.say_hello(name)
pprint(api_response)
except ApiException as e:
print(\\"Exception when calling DefaultApi->sayHello: %s\\\\n\\" % e)
extern crate DefaultApi;
pub fn main() {
let name = name_example; // String
let mut context = DefaultApi::Context::default();
let result = client.sayHello(name, &context).wait();
println!(\\"{:?}\\", result);
}
Scopes
Parameters
Query parameters
Responses
",
"generated/documentation/markdown/.openapi-generator-ignore": "# OpenAPI Generator Ignore
# Generated by openapi-generator https://github.com/openapitools/openapi-generator
# Use this file to prevent files from being overwritten by the generator.
# The patterns follow closely to .gitignore or .dockerignore.
# As an example, the C# client generator defines ApiClient.cs.
# You can make changes and tell OpenAPI Generator to ignore just this file by uncommenting the following line:
#ApiClient.cs
# You can match any string of characters against a directory, file or extension with a single asterisk (*):
#foo/*/qux
# The above matches foo/bar/qux and foo/baz/qux, but not foo/bar/baz/qux
# You can recursively match patterns against a directory, file or extension with a double asterisk (**):
#foo/**/qux
# This matches foo/bar/qux, foo/baz/qux, and foo/bar/baz/qux
# You can also negate patterns with an exclamation (!).
# For example, you can ignore all files in a docs folder with the file extension .md:
#docs/*.md
# Then explicitly reverse the ignore rule for a single file:
#!docs/README.md
",
"generated/documentation/markdown/.openapi-generator/FILES": ".openapi-generator-ignore
Apis/DefaultApi.md
Models/ApiErrorResponseContent.md
Models/SayHelloResponseContent.md
README.md
",
"generated/documentation/markdown/.openapi-generator/VERSION": "6.3.0",
"generated/documentation/markdown/Apis/DefaultApi.md": "# DefaultApi
All URIs are relative to *http://localhost*
| Method | HTTP request | Description |
|------------- | ------------- | -------------|
| [**sayHello**](DefaultApi.md#sayHello) | **GET** /hello | |
# **sayHello**
> SayHelloResponseContent sayHello(name)
### Parameters
|Name | Type | Description | Notes |
|------------- | ------------- | ------------- | -------------|
| **name** | **String**| | [default to null] |
### Return type
[**SayHelloResponseContent**](../Models/SayHelloResponseContent.md)
### Authorization
No authorization required
### HTTP request headers
- **Content-Type**: Not defined
- **Accept**: application/json
",
"generated/documentation/markdown/Models/ApiErrorResponseContent.md": "# ApiErrorResponseContent
## Properties
| Name | Type | Description | Notes |
|------------ | ------------- | ------------- | -------------|
| **errorMessage** | **String** | | [default to null] |
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
",
"generated/documentation/markdown/Models/SayHelloResponseContent.md": "# SayHelloResponseContent
## Properties
| Name | Type | Description | Notes |
|------------ | ------------- | ------------- | -------------|
| **message** | **String** | | [default to null] |
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
",
"generated/documentation/markdown/README.md": "# Documentation for Example API
## Documentation for API Endpoints
All URIs are relative to *http://localhost*
| Class | Method | HTTP request | Description |
|------------ | ------------- | ------------- | -------------|
| *DefaultApi* | [**sayHello**](Apis/DefaultApi.md#sayhello) | **GET** /hello | |
## Documentation for Models
- [ApiErrorResponseContent](./Models/ApiErrorResponseContent.md)
- [SayHelloResponseContent](./Models/SayHelloResponseContent.md)
## Documentation for Authorization
All endpoints do not require authorization.
",
"generated/documentation/plantuml/.openapi-generator-ignore": "# OpenAPI Generator Ignore
# Generated by openapi-generator https://github.com/openapitools/openapi-generator
# Use this file to prevent files from being overwritten by the generator.
# The patterns follow closely to .gitignore or .dockerignore.
# As an example, the C# client generator defines ApiClient.cs.
# You can make changes and tell OpenAPI Generator to ignore just this file by uncommenting the following line:
#ApiClient.cs
# You can match any string of characters against a directory, file or extension with a single asterisk (*):
#foo/*/qux
# The above matches foo/bar/qux and foo/baz/qux, but not foo/bar/baz/qux
# You can recursively match patterns against a directory, file or extension with a double asterisk (**):
#foo/**/qux
# This matches foo/bar/qux, foo/baz/qux, and foo/bar/baz/qux
# You can also negate patterns with an exclamation (!).
# For example, you can ignore all files in a docs folder with the file extension .md:
#docs/*.md
# Then explicitly reverse the ignore rule for a single file:
#!docs/README.md
",
"generated/documentation/plantuml/.openapi-generator/FILES": ".openapi-generator-ignore
schemas.plantuml
",
"generated/documentation/plantuml/.openapi-generator/VERSION": "6.3.0",
"generated/documentation/plantuml/schemas.plantuml": "@startuml
title Example API Schemas Diagram
entity ApiErrorResponseContent {
* errorMessage: String
}
entity SayHelloResponseContent {
* message: String
}
@enduml",
"generated/python/.gitattributes": "# ~~ Generated by projen. To modify, edit .projenrc.js and run \\"npx projen\\".
/.gitattributes linguist-generated
/.gitignore linguist-generated
/.openapi-generator-ignore linguist-generated
/.projen/** linguist-generated
/.projen/deps.json linguist-generated
/.projen/files.json linguist-generated
/.projen/tasks.json linguist-generated
/package.json linguist-generated
/requirements-dev.txt linguist-generated
/requirements.txt linguist-generated",
"generated/python/.gitignore": "# ~~ Generated by projen. To modify, edit .projenrc.js and run \\"npx projen\\".
node_modules/
!/.gitattributes
!/.projen/tasks.json
!/.projen/deps.json
!/.projen/files.json
/../../.env
!/requirements.txt
!/requirements-dev.txt
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
*.manifest
*.spec
pip-log.txt
pip-delete-this-directory.txt
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
*.mo
*.pot
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
instance/
.webassets-cache
.scrapy
docs/_build/
.pybuilder/
target/
.ipynb_checkpoints
profile_default/
ipython_config.py
__pypackages__/
celerybeat-schedule
celerybeat.pid
*.sage.py
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
.spyderproject
.spyproject
.ropeproject
/site
.mypy_cache/
.dmypy.json
dmypy.json
.pyre/
.pytype/
cython_debug/
!/package.json
!/.openapi-generator-ignore
",
"generated/python/.gitlab-ci.yml": "# ref: https://docs.gitlab.com/ee/ci/README.html
stages:
- test
.tests:
stage: test
script:
- pip install -r requirements.txt
- pip install -r test-requirements.txt
- pytest --cov=my_api_python
test-3.5:
extends: .tests
image: python:3.5-alpine
test-3.6:
extends: .tests
image: python:3.6-alpine
test-3.7:
extends: .tests
image: python:3.7-alpine
test-3.8:
extends: .tests
image: python:3.8-alpine
",
"generated/python/.openapi-generator-ignore": "# ~~ Generated by projen. To modify, edit .projenrc.js and run \\"npx projen\\".
.gitignore
",
"generated/python/.openapi-generator/FILES": ".gitlab-ci.yml
.travis.yml
README.md
docs/apis/tags/DefaultApi.md
docs/models/ApiErrorResponseContent.md
docs/models/SayHelloResponseContent.md
git_push.sh
my_api_python/__init__.py
my_api_python/api_client.py
my_api_python/apis/__init__.py
my_api_python/apis/tags/default_api.py
my_api_python/apis/tags/default_api_operation_config.py
my_api_python/configuration.py
my_api_python/exceptions.py
my_api_python/model/__init__.py
my_api_python/model/api_error_response_content.py
my_api_python/model/api_error_response_content.pyi
my_api_python/model/say_hello_response_content.py
my_api_python/model/say_hello_response_content.pyi
my_api_python/models/__init__.py
my_api_python/rest.py
my_api_python/schemas.py
requirements.txt
setup.cfg
setup.py
test-requirements.txt
test/__init__.py
test/test_models/__init__.py
test/test_models/test_api_error_response_content.py
test/test_models/test_say_hello_response_content.py
tox.ini
",
"generated/python/.openapi-generator/VERSION": "6.3.0",
"generated/python/.projen/files.json": Object {
"//": "~~ Generated by projen. To modify, edit .projenrc.js and run \\"npx projen\\".",
"files": Array [
".gitattributes",
".gitignore",
".openapi-generator-ignore",
".projen/deps.json",
".projen/files.json",
".projen/tasks.json",
"package.json",
"requirements-dev.txt",
"requirements.txt",
],
},
"generated/python/.projen/tasks.json": Object {
"//": "~~ Generated by projen. To modify, edit .projenrc.js and run \\"npx projen\\".",
"env": Object {
"PATH": "$(echo $PWD/../../.env/bin:$PATH)",
"VIRTUAL_ENV": "$(echo $PWD/../../.env)",
},
"tasks": Object {
"build": Object {
"description": "Full release build",
"name": "build",
"steps": Array [
Object {
"spawn": "pre-compile",
},
Object {
"spawn": "compile",
},
Object {
"spawn": "post-compile",
},
Object {
"spawn": "test",
},
Object {
"spawn": "package",
},
],
},
"compile": Object {
"description": "Only compile",
"name": "compile",
},
"default": Object {
"description": "Synthesize project files",
"name": "default",
"steps": Array [
Object {
"cwd": "../..",
"exec": "npx projen default",
},
],
},
"install": Object {
"description": "Install and upgrade dependencies",
"name": "install",
"steps": Array [
Object {
"exec": "pip install --upgrade pip",
},
Object {
"exec": "pip install -r requirements.txt",
},
Object {
"exec": "pip install -r requirements-dev.txt",
},
Object {
"exec": "pip install --editable .",
},
Object {
"exec": "rm -rf dist/layer/python",
},
Object {
"exec": "pip install . --target dist/layer/python",
},
],
},
"package": Object {
"description": "Creates the distribution package",
"name": "package",
},
"post-compile": Object {
"description": "Runs after successful compilation",
"name": "post-compile",
},
"pre-compile": Object {
"description": "Prepare the project for compilation",
"name": "pre-compile",
},
"test": Object {
"description": "Run tests",
"name": "test",
},
},
},
"generated/python/.travis.yml": "# ref: https://docs.travis-ci.com/user/languages/python
language: python
python:
- \\"3.5\\"
- \\"3.6\\"
- \\"3.7\\"
- \\"3.8\\"
# command to install dependencies
install:
- \\"pip install -r requirements.txt\\"
- \\"pip install -r test-requirements.txt\\"
# command to run tests
script: pytest --cov=my_api_python
",
"generated/python/README.md": "# my-api-python
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
This Python package is automatically generated by the [OpenAPI Generator](https://openapi-generator.tech) project:
- API version: 1.0.0
- Package version: 1.0.0
- Build package: org.openapitools.codegen.languages.PythonClientCodegen
## Requirements.
Python >=3.7
## Migration from other generators like python and python-legacy
### Changes
1. This generator uses spec case for all (object) property names and parameter names.
- So if the spec has a property name like camelCase, it will use camelCase rather than camel_case
- So you will need to update how you input and read properties to use spec case
2. Endpoint parameters are stored in dictionaries to prevent collisions (explanation below)
- So you will need to update how you pass data in to endpoints
3. Endpoint responses now include the original response, the deserialized response body, and (todo)the deserialized headers
- So you will need to update your code to use response.body to access deserialized data
4. All validated data is instantiated in an instance that subclasses all validated Schema classes and Decimal/str/list/tuple/frozendict/NoneClass/BoolClass/bytes/io.FileIO
- This means that you can use isinstance to check if a payload validated against a schema class
- This means that no data will be of type None/True/False
- ingested None will subclass NoneClass
- ingested True will subclass BoolClass
- ingested False will subclass BoolClass
- So if you need to check is True/False/None, instead use instance.is_true_oapg()/.is_false_oapg()/.is_none_oapg()
5. All validated class instances are immutable except for ones based on io.File
- This is because if properties were changed after validation, that validation would no longer apply
- So no changing values or property values after a class has been instantiated
6. String + Number types with formats
- String type data is stored as a string and if you need to access types based on its format like date,
date-time, uuid, number etc then you will need to use accessor functions on the instance
- type string + format: See .as_date_oapg, .as_datetime_oapg, .as_decimal_oapg, .as_uuid_oapg
- type number + format: See .as_float_oapg, .as_int_oapg
- this was done because openapi/json-schema defines constraints. string data may be type string with no format
keyword in one schema, and include a format constraint in another schema
- So if you need to access a string format based type, use as_date_oapg/as_datetime_oapg/as_decimal_oapg/as_uuid_oapg
- So if you need to access a number format based type, use as_int_oapg/as_float_oapg
7. Property access on AnyType(type unset) or object(dict) schemas
- Only required keys with valid python names are properties like .someProp and have type hints
- All optional keys may not exist, so properties are not defined for them
- One can access optional values with dict_instance['optionalProp'] and KeyError will be raised if it does not exist
- Use get_item_oapg if you need a way to always get a value whether or not the key exists
- If the key does not exist, schemas.unset is returned from calling dict_instance.get_item_oapg('optionalProp')
- All required and optional keys have type hints for this method, and @typing.overload is used
- A type hint is also generated for additionalProperties accessed using this method
- So you will need to update you code to use some_instance['optionalProp'] to access optional property
and additionalProperty values
8. The location of the api classes has changed
- Api classes are located in your_package.apis.tags.some_api
- This change was made to eliminate redundant code generation
- Legacy generators generated the same endpoint twice if it had > 1 tag on it
- This generator defines an endpoint in one class, then inherits that class to generate
apis by tags and by paths
- This change reduces code and allows quicker run time if you use the path apis
- path apis are at your_package.apis.paths.some_path
- Those apis will only load their needed models, which is less to load than all of the resources needed in a tag api
- So you will need to update your import paths to the api classes
### Why are Oapg and _oapg used in class and method names?
Classes can have arbitrarily named properties set on them
Endpoints can have arbitrary operationId method names set
For those reasons, I use the prefix Oapg and _oapg to greatly reduce the likelihood of collisions
on protected + public classes/methods.
oapg stands for OpenApi Python Generator.
### Object property spec case
This was done because when payloads are ingested, they can be validated against N number of schemas.
If the input signature used a different property name then that has mutated the payload.
So SchemaA and SchemaB must both see the camelCase spec named variable.
Also it is possible to send in two properties, named camelCase and camel_case in the same payload.
That use case should be support so spec case is used.
### Parameter spec case
Parameters can be included in different locations including:
- query
- path
- header
- cookie
Any of those parameters could use the same parameter names, so if every parameter
was included as an endpoint parameter in a function signature, they would collide.
For that reason, each of those inputs have been separated out into separate typed dictionaries:
- query_params
- path_params
- header_params
- cookie_params
So when updating your code, you will need to pass endpoint parameters in using those
dictionaries.
### Endpoint responses
Endpoint responses have been enriched to now include more information.
Any response reom an endpoint will now include the following properties:
response: urllib3.HTTPResponse
body: typing.Union[Unset, Schema]
headers: typing.Union[Unset, TODO]
Note: response header deserialization has not yet been added
## Installation & Usage
### pip install
If the python package is hosted on a repository, you can install directly using:
\`\`\`sh
pip install git+https://github.com/GIT_USER_ID/GIT_REPO_ID.git
\`\`\`
(you may need to run \`pip\` with root permission: \`sudo pip install git+https://github.com/GIT_USER_ID/GIT_REPO_ID.git\`)
Then import the package:
\`\`\`python
import my_api_python
\`\`\`
### Setuptools
Install via [Setuptools](http://pypi.python.org/pypi/setuptools).
\`\`\`sh
python setup.py install --user
\`\`\`
(or \`sudo python setup.py install\` to install the package for all users)
Then import the package:
\`\`\`python
import my_api_python
\`\`\`
## Getting Started
Please follow the [installation procedure](#installation--usage) and then run the following:
\`\`\`python
import time
import my_api_python
from pprint import pprint
from my_api_python.apis.tags import default_api
from my_api_python.model.api_error_response_content import ApiErrorResponseContent
from my_api_python.model.say_hello_response_content import SayHelloResponseContent
# Defining the host is optional and defaults to http://localhost
# See configuration.py for a list of all supported configuration parameters.
configuration = my_api_python.Configuration(
host = \\"http://localhost\\"
)
# Enter a context with an instance of the API client
with my_api_python.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = default_api.DefaultApi(api_client)
name = \\"name_example\\" # str |
try:
api_response = api_instance.say_hello(name)
pprint(api_response)
except my_api_python.ApiException as e:
print(\\"Exception when calling DefaultApi->say_hello: %s\\\\n\\" % e)
\`\`\`
## Documentation for API Endpoints
All URIs are relative to *http://localhost*
Class | Method | HTTP request | Description
------------ | ------------- | ------------- | -------------
*DefaultApi* | [**say_hello**](docs/apis/tags/DefaultApi.md#say_hello) | **get** /hello |
## Documentation For Models
- [ApiErrorResponseContent](docs/models/ApiErrorResponseContent.md)
- [SayHelloResponseContent](docs/models/SayHelloResponseContent.md)
## Documentation For Authorization
All endpoints do not require authorization.
## Author
## Notes for Large OpenAPI documents
If the OpenAPI document is large, imports in my_api_python.apis and my_api_python.models may fail with a
RecursionError indicating the maximum recursion limit has been exceeded. In that case, there are a couple of solutions:
Solution 1:
Use specific imports for apis and models like:
- \`from my_api_python.apis.default_api import DefaultApi\`
- \`from my_api_python.model.pet import Pet\`
Solution 1:
Before importing the package, adjust the maximum recursion limit as shown below:
\`\`\`
import sys
sys.setrecursionlimit(1500)
import my_api_python
from my_api_python.apis import *
from my_api_python.models import *
\`\`\`
",
"generated/python/docs/apis/tags/DefaultApi.md": "
# my_api_python.apis.tags.default_api.DefaultApi
All URIs are relative to *http://localhost*
Method | HTTP request | Description
------------- | ------------- | -------------
[**say_hello**](#say_hello) | **get** /hello |
# **say_hello**
> SayHelloResponseContent say_hello(name)
### Example
\`\`\`python
import my_api_python
from my_api_python.apis.tags import default_api
from my_api_python.model.say_hello_response_content import SayHelloResponseContent
from my_api_python.model.api_error_response_content import ApiErrorResponseContent
from pprint import pprint
# Defining the host is optional and defaults to http://localhost
# See configuration.py for a list of all supported configuration parameters.
configuration = my_api_python.Configuration(
host = \\"http://localhost\\"
)
# Enter a context with an instance of the API client
with my_api_python.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = default_api.DefaultApi(api_client)
# example passing only required values which don't have defaults set
query_params = {
'name': \\"name_example\\",
}
try:
api_response = api_instance.say_hello(
query_params=query_params,
)
pprint(api_response)
except my_api_python.ApiException as e:
print(\\"Exception when calling DefaultApi->say_hello: %s\\\\n\\" % e)
\`\`\`
### Parameters
Name | Type | Description | Notes
------------- | ------------- | ------------- | -------------
query_params | RequestQueryParams | |
accept_content_types | typing.Tuple[str] | default is ('application/json', ) | Tells the server the content type(s) that are accepted by the client
stream | bool | default is False | if True then the response.content will be streamed and loaded from a file like object. When downloading a file, set this to True to force the code to deserialize the content to a FileSchema file
timeout | typing.Optional[typing.Union[int, typing.Tuple]] | default is None | the timeout used by the rest client
skip_deserialization | bool | default is False | when True, headers and body will be unset and an instance of api_client.ApiResponseWithoutDeserialization will be returned
### query_params
#### RequestQueryParams
Name | Type | Description | Notes
------------- | ------------- | ------------- | -------------
name | NameSchema | |
# NameSchema
## Model Type Info
Input Type | Accessed Type | Description | Notes
------------ | ------------- | ------------- | -------------
str, | str, | |
### Return Types, Responses
Code | Class | Description
------------- | ------------- | -------------
n/a | api_client.ApiResponseWithoutDeserialization | When skip_deserialization is True this response is returned
200 | [ApiResponseFor200](#say_hello.ApiResponseFor200) | Successful response
400 | [ApiResponseFor400](#say_hello.ApiResponseFor400) | Error response
#### say_hello.ApiResponseFor200
Name | Type | Description | Notes
------------- | ------------- | ------------- | -------------
response | urllib3.HTTPResponse | Raw response |
body | typing.Union[SchemaFor200ResponseBodyApplicationJson, ] | |
headers | Unset | headers were not defined |
# SchemaFor200ResponseBodyApplicationJson
Type | Description | Notes
------------- | ------------- | -------------
[**SayHelloResponseContent**](../../models/SayHelloResponseContent.md) | |
#### say_hello.ApiResponseFor400
Name | Type | Description | Notes
------------- | ------------- | ------------- | -------------
response | urllib3.HTTPResponse | Raw response |
body | typing.Union[SchemaFor400ResponseBodyApplicationJson, ] | |
headers | Unset | headers were not defined |
# SchemaFor400ResponseBodyApplicationJson
Type | Description | Notes
------------- | ------------- | -------------
[**ApiErrorResponseContent**](../../models/ApiErrorResponseContent.md) | |
### Authorization
No authorization required
[[Back to top]](#__pageTop) [[Back to API list]](../../../README.md#documentation-for-api-endpoints) [[Back to Model list]](../../../README.md#documentation-for-models) [[Back to README]](../../../README.md)
",
"generated/python/docs/models/ApiErrorResponseContent.md": "# my_api_python.model.api_error_response_content.ApiErrorResponseContent
## Model Type Info
Input Type | Accessed Type | Description | Notes
------------ | ------------- | ------------- | -------------
dict, frozendict.frozendict, | frozendict.frozendict, | |
### Dictionary Keys
Key | Input Type | Accessed Type | Description | Notes
------------ | ------------- | ------------- | ------------- | -------------
**errorMessage** | str, | str, | |
**any_string_name** | dict, frozendict.frozendict, str, date, datetime, int, float, bool, decimal.Decimal, None, list, tuple, bytes, io.FileIO, io.BufferedReader | frozendict.frozendict, str, BoolClass, decimal.Decimal, NoneClass, tuple, bytes, FileIO | any string name can be used but the value must be the correct type | [optional]
[[Back to Model list]](../../README.md#documentation-for-models) [[Back to API list]](../../README.md#documentation-for-api-endpoints) [[Back to README]](../../README.md)
",
"generated/python/docs/models/SayHelloResponseContent.md": "# my_api_python.model.say_hello_response_content.SayHelloResponseContent
## Model Type Info
Input Type | Accessed Type | Description | Notes
------------ | ------------- | ------------- | -------------
dict, frozendict.frozendict, | frozendict.frozendict, | |
### Dictionary Keys
Key | Input Type | Accessed Type | Description | Notes
------------ | ------------- | ------------- | ------------- | -------------
**message** | str, | str, | |
**any_string_name** | dict, frozendict.frozendict, str, date, datetime, int, float, bool, decimal.Decimal, None, list, tuple, bytes, io.FileIO, io.BufferedReader | frozendict.frozendict, str, BoolClass, decimal.Decimal, NoneClass, tuple, bytes, FileIO | any string name can be used but the value must be the correct type | [optional]
[[Back to Model list]](../../README.md#documentation-for-models) [[Back to API list]](../../README.md#documentation-for-api-endpoints) [[Back to README]](../../README.md)
",
"generated/python/git_push.sh": "#!/bin/sh
# ref: https://help.github.com/articles/adding-an-existing-project-to-github-using-the-command-line/
#
# Usage example: /bin/sh ./git_push.sh wing328 openapi-petstore-perl \\"minor update\\" \\"gitlab.com\\"
git_user_id=$1
git_repo_id=$2
release_note=$3
git_host=$4
if [ \\"$git_host\\" = \\"\\" ]; then
git_host=\\"github.com\\"
echo \\"[INFO] No command line input provided. Set \\\\$git_host to $git_host\\"
fi
if [ \\"$git_user_id\\" = \\"\\" ]; then
git_user_id=\\"GIT_USER_ID\\"
echo \\"[INFO] No command line input provided. Set \\\\$git_user_id to $git_user_id\\"
fi
if [ \\"$git_repo_id\\" = \\"\\" ]; then
git_repo_id=\\"GIT_REPO_ID\\"
echo \\"[INFO] No command line input provided. Set \\\\$git_repo_id to $git_repo_id\\"
fi
if [ \\"$release_note\\" = \\"\\" ]; then
release_note=\\"Minor update\\"
echo \\"[INFO] No command line input provided. Set \\\\$release_note to $release_note\\"
fi
# Initialize the local directory as a Git repository
git init
# Adds the files in the local repository and stages them for commit.
git add .
# Commits the tracked changes and prepares them to be pushed to a remote repository.
git commit -m \\"$release_note\\"
# Sets the new remote
git_remote=\`git remote\`
if [ \\"$git_remote\\" = \\"\\" ]; then # git remote not defined
if [ \\"$GIT_TOKEN\\" = \\"\\" ]; then
echo \\"[INFO] \\\\$GIT_TOKEN (environment variable) is not set. Using the git credential in your environment.\\"
git remote add origin https://\${git_host}/\${git_user_id}/\${git_repo_id}.git
else
git remote add origin https://\${git_user_id}:\${GIT_TOKEN}@\${git_host}/\${git_user_id}/\${git_repo_id}.git
fi
fi
git pull origin master
# Pushes (Forces) the changes in the local repository up to the remote repository
echo \\"Git pushing to https://\${git_host}/\${git_user_id}/\${git_repo_id}.git\\"
git push origin master 2>&1 | grep -v 'To https'
",
"generated/python/my_api_python/__init__.py": "# coding: utf-8
# flake8: noqa
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
__version__ = \\"1.0.0\\"
# import ApiClient
from my_api_python.api_client import ApiClient
# import Configuration
from my_api_python.configuration import Configuration
# import exceptions
from my_api_python.exceptions import OpenApiException
from my_api_python.exceptions import ApiAttributeError
from my_api_python.exceptions import ApiTypeError
from my_api_python.exceptions import ApiValueError
from my_api_python.exceptions import ApiKeyError
from my_api_python.exceptions import ApiException
",
"generated/python/my_api_python/api_client.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
from dataclasses import dataclass
from decimal import Decimal
import enum
import email
import json
import os
import io
import atexit
from multiprocessing.pool import ThreadPool
import re
import tempfile
import typing
import typing_extensions
import urllib3
from urllib3._collections import HTTPHeaderDict
from urllib.parse import urlparse, quote
from urllib3.fields import RequestField as RequestFieldBase
import frozendict
from my_api_python import rest
from my_api_python.configuration import Configuration
from my_api_python.exceptions import ApiTypeError, ApiValueError
from my_api_python.schemas import (
NoneClass,
BoolClass,
Schema,
FileIO,
BinarySchema,
date,
datetime,
none_type,
Unset,
unset,
)
class RequestField(RequestFieldBase):
def __eq__(self, other):
if not isinstance(other, RequestField):
return False
return self.__dict__ == other.__dict__
class JSONEncoder(json.JSONEncoder):
compact_separators = (',', ':')
def default(self, obj):
if isinstance(obj, str):
return str(obj)
elif isinstance(obj, float):
return float(obj)
elif isinstance(obj, int):
return int(obj)
elif isinstance(obj, Decimal):
if obj.as_tuple().exponent >= 0:
return int(obj)
return float(obj)
elif isinstance(obj, NoneClass):
return None
elif isinstance(obj, BoolClass):
return bool(obj)
elif isinstance(obj, (dict, frozendict.frozendict)):
return {key: self.default(val) for key, val in obj.items()}
elif isinstance(obj, (list, tuple)):
return [self.default(item) for item in obj]
raise ApiValueError('Unable to prepare type {} for serialization'.format(obj.__class__.__name__))
class ParameterInType(enum.Enum):
QUERY = 'query'
HEADER = 'header'
PATH = 'path'
COOKIE = 'cookie'
class ParameterStyle(enum.Enum):
MATRIX = 'matrix'
LABEL = 'label'
FORM = 'form'
SIMPLE = 'simple'
SPACE_DELIMITED = 'spaceDelimited'
PIPE_DELIMITED = 'pipeDelimited'
DEEP_OBJECT = 'deepObject'
class PrefixSeparatorIterator:
# A class to store prefixes and separators for rfc6570 expansions
def __init__(self, prefix: str, separator: str):
self.prefix = prefix
self.separator = separator
self.first = True
if separator in {'.', '|', '%20'}:
item_separator = separator
else:
item_separator = ','
self.item_separator = item_separator
def __iter__(self):
return self
def __next__(self):
if self.first:
self.first = False
return self.prefix
return self.separator
class ParameterSerializerBase:
@classmethod
def _get_default_explode(cls, style: ParameterStyle) -> bool:
return False
@staticmethod
def __ref6570_item_value(in_data: typing.Any, percent_encode: bool):
\\"\\"\\"
Get representation if str/float/int/None/items in list/ values in dict
None is returned if an item is undefined, use cases are value=
- None
- []
- {}
- [None, None None]
- {'a': None, 'b': None}
\\"\\"\\"
if type(in_data) in {str, float, int}:
if percent_encode:
return quote(str(in_data))
return str(in_data)
elif isinstance(in_data, none_type):
# ignored by the expansion process https://datatracker.ietf.org/doc/html/rfc6570#section-3.2.1
return None
elif isinstance(in_data, list) and not in_data:
# ignored by the expansion process https://datatracker.ietf.org/doc/html/rfc6570#section-3.2.1
return None
elif isinstance(in_data, dict) and not in_data:
# ignored by the expansion process https://datatracker.ietf.org/doc/html/rfc6570#section-3.2.1
return None
raise ApiValueError('Unable to generate a ref6570 item representation of {}'.format(in_data))
@staticmethod
def _to_dict(name: str, value: str):
return {name: value}
@classmethod
def __ref6570_str_float_int_expansion(
cls,
variable_name: str,
in_data: typing.Any,
explode: bool,
percent_encode: bool,
prefix_separator_iterator: PrefixSeparatorIterator,
var_name_piece: str,
named_parameter_expansion: bool
) -> str:
item_value = cls.__ref6570_item_value(in_data, percent_encode)
if item_value is None or (item_value == '' and prefix_separator_iterator.separator == ';'):
return next(prefix_separator_iterator) + var_name_piece
value_pair_equals = '=' if named_parameter_expansion else ''
return next(prefix_separator_iterator) + var_name_piece + value_pair_equals + item_value
@classmethod
def __ref6570_list_expansion(
cls,
variable_name: str,
in_data: typing.Any,
explode: bool,
percent_encode: bool,
prefix_separator_iterator: PrefixSeparatorIterator,
var_name_piece: str,
named_parameter_expansion: bool
) -> str:
item_values = [cls.__ref6570_item_value(v, percent_encode) for v in in_data]
item_values = [v for v in item_values if v is not None]
if not item_values:
# ignored by the expansion process https://datatracker.ietf.org/doc/html/rfc6570#section-3.2.1
return \\"\\"
value_pair_equals = '=' if named_parameter_expansion else ''
if not explode:
return (
next(prefix_separator_iterator) +
var_name_piece +
value_pair_equals +
prefix_separator_iterator.item_separator.join(item_values)
)
# exploded
return next(prefix_separator_iterator) + next(prefix_separator_iterator).join(
[var_name_piece + value_pair_equals + val for val in item_values]
)
@classmethod
def __ref6570_dict_expansion(
cls,
variable_name: str,
in_data: typing.Any,
explode: bool,
percent_encode: bool,
prefix_separator_iterator: PrefixSeparatorIterator,
var_name_piece: str,
named_parameter_expansion: bool
) -> str:
in_data_transformed = {key: cls.__ref6570_item_value(val, percent_encode) for key, val in in_data.items()}
in_data_transformed = {key: val for key, val in in_data_transformed.items() if val is not None}
if not in_data_transformed:
# ignored by the expansion process https://datatracker.ietf.org/doc/html/rfc6570#section-3.2.1
return \\"\\"
value_pair_equals = '=' if named_parameter_expansion else ''
if not explode:
return (
next(prefix_separator_iterator) +
var_name_piece + value_pair_equals +
prefix_separator_iterator.item_separator.join(
prefix_separator_iterator.item_separator.join(
item_pair
) for item_pair in in_data_transformed.items()
)
)
# exploded
return next(prefix_separator_iterator) + next(prefix_separator_iterator).join(
[key + '=' + val for key, val in in_data_transformed.items()]
)
@classmethod
def _ref6570_expansion(
cls,
variable_name: str,
in_data: typing.Any,
explode: bool,
percent_encode: bool,
prefix_separator_iterator: PrefixSeparatorIterator
) -> str:
\\"\\"\\"
Separator is for separate variables like dict with explode true, not for array item separation
\\"\\"\\"
named_parameter_expansion = prefix_separator_iterator.separator in {'&', ';'}
var_name_piece = variable_name if named_parameter_expansion else ''
if type(in_data) in {str, float, int}:
return cls.__ref6570_str_float_int_expansion(
variable_name,
in_data,
explode,
percent_encode,
prefix_separator_iterator,
var_name_piece,
named_parameter_expansion
)
elif isinstance(in_data, none_type):
# ignored by the expansion process https://datatracker.ietf.org/doc/html/rfc6570#section-3.2.1
return \\"\\"
elif isinstance(in_data, list):
return cls.__ref6570_list_expansion(
variable_name,
in_data,
explode,
percent_encode,
prefix_separator_iterator,
var_name_piece,
named_parameter_expansion
)
elif isinstance(in_data, dict):
return cls.__ref6570_dict_expansion(
variable_name,
in_data,
explode,
percent_encode,
prefix_separator_iterator,
var_name_piece,
named_parameter_expansion
)
# bool, bytes, etc
raise ApiValueError('Unable to generate a ref6570 representation of {}'.format(in_data))
class StyleFormSerializer(ParameterSerializerBase):
@classmethod
def _get_default_explode(cls, style: ParameterStyle) -> bool:
if style is ParameterStyle.FORM:
return True
return super()._get_default_explode(style)
def _serialize_form(
self,
in_data: typing.Union[None, int, float, str, bool, dict, list],
name: str,
explode: bool,
percent_encode: bool,
prefix_separator_iterator: typing.Optional[PrefixSeparatorIterator] = None
) -> str:
if prefix_separator_iterator is None:
prefix_separator_iterator = PrefixSeparatorIterator('', '&')
return self._ref6570_expansion(
variable_name=name,
in_data=in_data,
explode=explode,
percent_encode=percent_encode,
prefix_separator_iterator=prefix_separator_iterator
)
class StyleSimpleSerializer(ParameterSerializerBase):
def _serialize_simple(
self,
in_data: typing.Union[None, int, float, str, bool, dict, list],
name: str,
explode: bool,
percent_encode: bool
) -> str:
prefix_separator_iterator = PrefixSeparatorIterator('', ',')
return self._ref6570_expansion(
variable_name=name,
in_data=in_data,
explode=explode,
percent_encode=percent_encode,
prefix_separator_iterator=prefix_separator_iterator
)
class JSONDetector:
\\"\\"\\"
Works for:
application/json
application/json; charset=UTF-8
application/json-patch+json
application/geo+json
\\"\\"\\"
__json_content_type_pattern = re.compile(\\"application/[^+]*[+]?(json);?.*\\")
@classmethod
def _content_type_is_json(cls, content_type: str) -> bool:
if cls.__json_content_type_pattern.match(content_type):
return True
return False
@dataclass
class ParameterBase(JSONDetector):
name: str
in_type: ParameterInType
required: bool
style: typing.Optional[ParameterStyle]
explode: typing.Optional[bool]
allow_reserved: typing.Optional[bool]
schema: typing.Optional[typing.Type[Schema]]
content: typing.Optional[typing.Dict[str, typing.Type[Schema]]]
__style_to_in_type = {
ParameterStyle.MATRIX: {ParameterInType.PATH},
ParameterStyle.LABEL: {ParameterInType.PATH},
ParameterStyle.FORM: {ParameterInType.QUERY, ParameterInType.COOKIE},
ParameterStyle.SIMPLE: {ParameterInType.PATH, ParameterInType.HEADER},
ParameterStyle.SPACE_DELIMITED: {ParameterInType.QUERY},
ParameterStyle.PIPE_DELIMITED: {ParameterInType.QUERY},
ParameterStyle.DEEP_OBJECT: {ParameterInType.QUERY},
}
__in_type_to_default_style = {
ParameterInType.QUERY: ParameterStyle.FORM,
ParameterInType.PATH: ParameterStyle.SIMPLE,
ParameterInType.HEADER: ParameterStyle.SIMPLE,
ParameterInType.COOKIE: ParameterStyle.FORM,
}
__disallowed_header_names = {'Accept', 'Content-Type', 'Authorization'}
_json_encoder = JSONEncoder()
@classmethod
def __verify_style_to_in_type(cls, style: typing.Optional[ParameterStyle], in_type: ParameterInType):
if style is None:
return
in_type_set = cls.__style_to_in_type[style]
if in_type not in in_type_set:
raise ValueError(
'Invalid style and in_type combination. For style={} only in_type={} are allowed'.format(
style, in_type_set
)
)
def __init__(
self,
name: str,
in_type: ParameterInType,
required: bool = False,
style: typing.Optional[ParameterStyle] = None,
explode: bool = False,
allow_reserved: typing.Optional[bool] = None,
schema: typing.Optional[typing.Type[Schema]] = None,
content: typing.Optional[typing.Dict[str, typing.Type[Schema]]] = None
):
if schema is None and content is None:
raise ValueError('Value missing; Pass in either schema or content')
if schema and content:
raise ValueError('Too many values provided. Both schema and content were provided. Only one may be input')
if name in self.__disallowed_header_names and in_type is ParameterInType.HEADER:
raise ValueError('Invalid name, name may not be one of {}'.format(self.__disallowed_header_names))
self.__verify_style_to_in_type(style, in_type)
if content is None and style is None:
style = self.__in_type_to_default_style[in_type]
if content is not None and in_type in self.__in_type_to_default_style and len(content) != 1:
raise ValueError('Invalid content length, content length must equal 1')
self.in_type = in_type
self.name = name
self.required = required
self.style = style
self.explode = explode
self.allow_reserved = allow_reserved
self.schema = schema
self.content = content
def _serialize_json(
self,
in_data: typing.Union[None, int, float, str, bool, dict, list],
eliminate_whitespace: bool = False
) -> str:
if eliminate_whitespace:
return json.dumps(in_data, separators=self._json_encoder.compact_separators)
return json.dumps(in_data)
class PathParameter(ParameterBase, StyleSimpleSerializer):
def __init__(
self,
name: str,
required: bool = False,
style: typing.Optional[ParameterStyle] = None,
explode: bool = False,
allow_reserved: typing.Optional[bool] = None,
schema: typing.Optional[typing.Type[Schema]] = None,
content: typing.Optional[typing.Dict[str, typing.Type[Schema]]] = None
):
super().__init__(
name,
in_type=ParameterInType.PATH,
required=required,
style=style,
explode=explode,
allow_reserved=allow_reserved,
schema=schema,
content=content
)
def __serialize_label(
self,
in_data: typing.Union[None, int, float, str, bool, dict, list]
) -> typing.Dict[str, str]:
prefix_separator_iterator = PrefixSeparatorIterator('.', '.')
value = self._ref6570_expansion(
variable_name=self.name,
in_data=in_data,
explode=self.explode,
percent_encode=True,
prefix_separator_iterator=prefix_separator_iterator
)
return self._to_dict(self.name, value)
def __serialize_matrix(
self,
in_data: typing.Union[None, int, float, str, bool, dict, list]
) -> typing.Dict[str, str]:
prefix_separator_iterator = PrefixSeparatorIterator(';', ';')
value = self._ref6570_expansion(
variable_name=self.name,
in_data=in_data,
explode=self.explode,
percent_encode=True,
prefix_separator_iterator=prefix_separator_iterator
)
return self._to_dict(self.name, value)
def __serialize_simple(
self,
in_data: typing.Union[None, int, float, str, bool, dict, list],
) -> typing.Dict[str, str]:
value = self._serialize_simple(
in_data=in_data,
name=self.name,
explode=self.explode,
percent_encode=True
)
return self._to_dict(self.name, value)
def serialize(
self,
in_data: typing.Union[
Schema, Decimal, int, float, str, date, datetime, None, bool, list, tuple, dict, frozendict.frozendict]
) -> typing.Dict[str, str]:
if self.schema:
cast_in_data = self.schema(in_data)
cast_in_data = self._json_encoder.default(cast_in_data)
\\"\\"\\"
simple -> path
path:
returns path_params: dict
label -> path
returns path_params
matrix -> path
returns path_params
\\"\\"\\"
if self.style:
if self.style is ParameterStyle.SIMPLE:
return self.__serialize_simple(cast_in_data)
elif self.style is ParameterStyle.LABEL:
return self.__serialize_label(cast_in_data)
elif self.style is ParameterStyle.MATRIX:
return self.__serialize_matrix(cast_in_data)
# self.content will be length one
for content_type, schema in self.content.items():
cast_in_data = schema(in_data)
cast_in_data = self._json_encoder.default(cast_in_data)
if self._content_type_is_json(content_type):
value = self._serialize_json(cast_in_data)
return self._to_dict(self.name, value)
raise NotImplementedError('Serialization of {} has not yet been implemented'.format(content_type))
class QueryParameter(ParameterBase, StyleFormSerializer):
def __init__(
self,
name: str,
required: bool = False,
style: typing.Optional[ParameterStyle] = None,
explode: typing.Optional[bool] = None,
allow_reserved: typing.Optional[bool] = None,
schema: typing.Optional[typing.Type[Schema]] = None,
content: typing.Optional[typing.Dict[str, typing.Type[Schema]]] = None
):
used_style = ParameterStyle.FORM if style is None else style
used_explode = self._get_default_explode(used_style) if explode is None else explode
super().__init__(
name,
in_type=ParameterInType.QUERY,
required=required,
style=used_style,
explode=used_explode,
allow_reserved=allow_reserved,
schema=schema,
content=content
)
def __serialize_space_delimited(
self,
in_data: typing.Union[None, int, float, str, bool, dict, list],
prefix_separator_iterator: typing.Optional[PrefixSeparatorIterator]
) -> typing.Dict[str, str]:
if prefix_separator_iterator is None:
prefix_separator_iterator = self.get_prefix_separator_iterator()
value = self._ref6570_expansion(
variable_name=self.name,
in_data=in_data,
explode=self.explode,
percent_encode=True,
prefix_separator_iterator=prefix_separator_iterator
)
return self._to_dict(self.name, value)
def __serialize_pipe_delimited(
self,
in_data: typing.Union[None, int, float, str, bool, dict, list],
prefix_separator_iterator: typing.Optional[PrefixSeparatorIterator]
) -> typing.Dict[str, str]:
if prefix_separator_iterator is None:
prefix_separator_iterator = self.get_prefix_separator_iterator()
value = self._ref6570_expansion(
variable_name=self.name,
in_data=in_data,
explode=self.explode,
percent_encode=True,
prefix_separator_iterator=prefix_separator_iterator
)
return self._to_dict(self.name, value)
def __serialize_form(
self,
in_data: typing.Union[None, int, float, str, bool, dict, list],
prefix_separator_iterator: typing.Optional[PrefixSeparatorIterator]
) -> typing.Dict[str, str]:
if prefix_separator_iterator is None:
prefix_separator_iterator = self.get_prefix_separator_iterator()
value = self._serialize_form(
in_data,
name=self.name,
explode=self.explode,
percent_encode=True,
prefix_separator_iterator=prefix_separator_iterator
)
return self._to_dict(self.name, value)
def get_prefix_separator_iterator(self) -> typing.Optional[PrefixSeparatorIterator]:
if self.style is ParameterStyle.FORM:
return PrefixSeparatorIterator('?', '&')
elif self.style is ParameterStyle.SPACE_DELIMITED:
return PrefixSeparatorIterator('', '%20')
elif self.style is ParameterStyle.PIPE_DELIMITED:
return PrefixSeparatorIterator('', '|')
def serialize(
self,
in_data: typing.Union[
Schema, Decimal, int, float, str, date, datetime, None, bool, list, tuple, dict, frozendict.frozendict],
prefix_separator_iterator: typing.Optional[PrefixSeparatorIterator] = None
) -> typing.Dict[str, str]:
if self.schema:
cast_in_data = self.schema(in_data)
cast_in_data = self._json_encoder.default(cast_in_data)
\\"\\"\\"
form -> query
query:
- GET/HEAD/DELETE: could use fields
- PUT/POST: must use urlencode to send parameters
returns fields: tuple
spaceDelimited -> query
returns fields
pipeDelimited -> query
returns fields
deepObject -> query, https://github.com/OAI/OpenAPI-Specification/issues/1706
returns fields
\\"\\"\\"
if self.style:
# TODO update query ones to omit setting values when [] {} or None is input
if self.style is ParameterStyle.FORM:
return self.__serialize_form(cast_in_data, prefix_separator_iterator)
elif self.style is ParameterStyle.SPACE_DELIMITED:
return self.__serialize_space_delimited(cast_in_data, prefix_separator_iterator)
elif self.style is ParameterStyle.PIPE_DELIMITED:
return self.__serialize_pipe_delimited(cast_in_data, prefix_separator_iterator)
# self.content will be length one
if prefix_separator_iterator is None:
prefix_separator_iterator = self.get_prefix_separator_iterator()
for content_type, schema in self.content.items():
cast_in_data = schema(in_data)
cast_in_data = self._json_encoder.default(cast_in_data)
if self._content_type_is_json(content_type):
value = self._serialize_json(cast_in_data, eliminate_whitespace=True)
return self._to_dict(
self.name,
next(prefix_separator_iterator) + self.name + '=' + quote(value)
)
raise NotImplementedError('Serialization of {} has not yet been implemented'.format(content_type))
class CookieParameter(ParameterBase, StyleFormSerializer):
def __init__(
self,
name: str,
required: bool = False,
style: typing.Optional[ParameterStyle] = None,
explode: typing.Optional[bool] = None,
allow_reserved: typing.Optional[bool] = None,
schema: typing.Optional[typing.Type[Schema]] = None,
content: typing.Optional[typing.Dict[str, typing.Type[Schema]]] = None
):
used_style = ParameterStyle.FORM if style is None and content is None and schema else style
used_explode = self._get_default_explode(used_style) if explode is None else explode
super().__init__(
name,
in_type=ParameterInType.COOKIE,
required=required,
style=used_style,
explode=used_explode,
allow_reserved=allow_reserved,
schema=schema,
content=content
)
def serialize(
self,
in_data: typing.Union[
Schema, Decimal, int, float, str, date, datetime, None, bool, list, tuple, dict, frozendict.frozendict]
) -> typing.Dict[str, str]:
if self.schema:
cast_in_data = self.schema(in_data)
cast_in_data = self._json_encoder.default(cast_in_data)
\\"\\"\\"
form -> cookie
returns fields: tuple
\\"\\"\\"
if self.style:
\\"\\"\\"
TODO add escaping of comma, space, equals
or turn encoding on
\\"\\"\\"
value = self._serialize_form(
cast_in_data,
explode=self.explode,
name=self.name,
percent_encode=False,
prefix_separator_iterator=PrefixSeparatorIterator('', '&')
)
return self._to_dict(self.name, value)
# self.content will be length one
for content_type, schema in self.content.items():
cast_in_data = schema(in_data)
cast_in_data = self._json_encoder.default(cast_in_data)
if self._content_type_is_json(content_type):
value = self._serialize_json(cast_in_data)
return self._to_dict(self.name, value)
raise NotImplementedError('Serialization of {} has not yet been implemented'.format(content_type))
class HeaderParameter(ParameterBase, StyleSimpleSerializer):
def __init__(
self,
name: str,
required: bool = False,
style: typing.Optional[ParameterStyle] = None,
explode: bool = False,
allow_reserved: typing.Optional[bool] = None,
schema: typing.Optional[typing.Type[Schema]] = None,
content: typing.Optional[typing.Dict[str, typing.Type[Schema]]] = None
):
super().__init__(
name,
in_type=ParameterInType.HEADER,
required=required,
style=style,
explode=explode,
allow_reserved=allow_reserved,
schema=schema,
content=content
)
@staticmethod
def __to_headers(in_data: typing.Tuple[typing.Tuple[str, str], ...]) -> HTTPHeaderDict:
data = tuple(t for t in in_data if t)
headers = HTTPHeaderDict()
if not data:
return headers
headers.extend(data)
return headers
def serialize(
self,
in_data: typing.Union[
Schema, Decimal, int, float, str, date, datetime, None, bool, list, tuple, dict, frozendict.frozendict]
) -> HTTPHeaderDict:
if self.schema:
cast_in_data = self.schema(in_data)
cast_in_data = self._json_encoder.default(cast_in_data)
\\"\\"\\"
simple -> header
headers: PoolManager needs a mapping, tuple is close
returns headers: dict
\\"\\"\\"
if self.style:
value = self._serialize_simple(cast_in_data, self.name, self.explode, False)
return self.__to_headers(((self.name, value),))
# self.content will be length one
for content_type, schema in self.content.items():
cast_in_data = schema(in_data)
cast_in_data = self._json_encoder.default(cast_in_data)
if self._content_type_is_json(content_type):
value = self._serialize_json(cast_in_data)
return self.__to_headers(((self.name, value),))
raise NotImplementedError('Serialization of {} has not yet been implemented'.format(content_type))
class Encoding:
def __init__(
self,
content_type: str,
headers: typing.Optional[typing.Dict[str, HeaderParameter]] = None,
style: typing.Optional[ParameterStyle] = None,
explode: bool = False,
allow_reserved: bool = False,
):
self.content_type = content_type
self.headers = headers
self.style = style
self.explode = explode
self.allow_reserved = allow_reserved
@dataclass
class MediaType:
\\"\\"\\"
Used to store request and response body schema information
encoding:
A map between a property name and its encoding information.
The key, being the property name, MUST exist in the schema as a property.
The encoding object SHALL only apply to requestBody objects when the media type is
multipart or application/x-www-form-urlencoded.
\\"\\"\\"
schema: typing.Optional[typing.Type[Schema]] = None
encoding: typing.Optional[typing.Dict[str, Encoding]] = None
@dataclass
class ApiResponse:
response: urllib3.HTTPResponse
body: typing.Union[Unset, Schema] = unset
headers: typing.Union[Unset, typing.Dict[str, Schema]] = unset
def __init__(
self,
response: urllib3.HTTPResponse,
body: typing.Union[Unset, Schema] = unset,
headers: typing.Union[Unset, typing.Dict[str, Schema]] = unset
):
\\"\\"\\"
pycharm needs this to prevent 'Unexpected argument' warnings
\\"\\"\\"
self.response = response
self.body = body
self.headers = headers
@dataclass
class ApiResponseWithoutDeserialization(ApiResponse):
response: urllib3.HTTPResponse
body: typing.Union[Unset, typing.Type[Schema]] = unset
headers: typing.Union[Unset, typing.List[HeaderParameter]] = unset
class OpenApiResponse(JSONDetector):
__filename_content_disposition_pattern = re.compile('filename=\\"(.+?)\\"')
def __init__(
self,
response_cls: typing.Type[ApiResponse] = ApiResponse,
content: typing.Optional[typing.Dict[str, MediaType]] = None,
headers: typing.Optional[typing.List[HeaderParameter]] = None,
):
self.headers = headers
if content is not None and len(content) == 0:
raise ValueError('Invalid value for content, the content dict must have >= 1 entry')
self.content = content
self.response_cls = response_cls
@staticmethod
def __deserialize_json(response: urllib3.HTTPResponse) -> typing.Any:
# python must be >= 3.9 so we can pass in bytes into json.loads
return json.loads(response.data)
@staticmethod
def __file_name_from_response_url(response_url: typing.Optional[str]) -> typing.Optional[str]:
if response_url is None:
return None
url_path = urlparse(response_url).path
if url_path:
path_basename = os.path.basename(url_path)
if path_basename:
_filename, ext = os.path.splitext(path_basename)
if ext:
return path_basename
return None
@classmethod
def __file_name_from_content_disposition(cls, content_disposition: typing.Optional[str]) -> typing.Optional[str]:
if content_disposition is None:
return None
match = cls.__filename_content_disposition_pattern.search(content_disposition)
if not match:
return None
return match.group(1)
def __deserialize_application_octet_stream(
self, response: urllib3.HTTPResponse
) -> typing.Union[bytes, io.BufferedReader]:
\\"\\"\\"
urllib3 use cases:
1. when preload_content=True (stream=False) then supports_chunked_reads is False and bytes are returned
2. when preload_content=False (stream=True) then supports_chunked_reads is True and
a file will be written and returned
\\"\\"\\"
if response.supports_chunked_reads():
file_name = (
self.__file_name_from_content_disposition(response.headers.get('content-disposition'))
or self.__file_name_from_response_url(response.geturl())
)
if file_name is None:
_fd, path = tempfile.mkstemp()
else:
path = os.path.join(tempfile.gettempdir(), file_name)
with open(path, 'wb') as new_file:
chunk_size = 1024
while True:
data = response.read(chunk_size)
if not data:
break
new_file.write(data)
# release_conn is needed for streaming connections only
response.release_conn()
new_file = open(path, 'rb')
return new_file
else:
return response.data
@staticmethod
def __deserialize_multipart_form_data(
response: urllib3.HTTPResponse
) -> typing.Dict[str, typing.Any]:
msg = email.message_from_bytes(response.data)
return {
part.get_param(\\"name\\", header=\\"Content-Disposition\\"): part.get_payload(
decode=True
).decode(part.get_content_charset())
if part.get_content_charset()
else part.get_payload()
for part in msg.get_payload()
}
def deserialize(self, response: urllib3.HTTPResponse, configuration: Configuration) -> ApiResponse:
content_type = response.getheader('content-type')
deserialized_body = unset
streamed = response.supports_chunked_reads()
deserialized_headers = unset
if self.headers is not None:
# TODO add header deserialiation here
pass
if self.content is not None:
if content_type not in self.content:
raise ApiValueError(
f\\"Invalid content_type returned. Content_type='{content_type}' was returned \\"
f\\"when only {str(set(self.content))} are defined for status_code={str(response.status)}\\"
)
body_schema = self.content[content_type].schema
if body_schema is None:
# some specs do not define response content media type schemas
return self.response_cls(
response=response,
headers=deserialized_headers,
body=unset
)
if self._content_type_is_json(content_type):
body_data = self.__deserialize_json(response)
elif content_type == 'application/octet-stream':
body_data = self.__deserialize_application_octet_stream(response)
elif content_type.startswith('multipart/form-data'):
body_data = self.__deserialize_multipart_form_data(response)
content_type = 'multipart/form-data'
else:
raise NotImplementedError('Deserialization of {} has not yet been implemented'.format(content_type))
deserialized_body = body_schema.from_openapi_data_oapg(
body_data, _configuration=configuration)
elif streamed:
response.release_conn()
return self.response_cls(
response=response,
headers=deserialized_headers,
body=deserialized_body
)
class ApiClient:
\\"\\"\\"Generic API client for OpenAPI client library builds.
OpenAPI generic API client. This client handles the client-
server communication, and is invariant across implementations. Specifics of
the methods and models for each application are generated from the OpenAPI
templates.
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
:param configuration: .Configuration object for this client
:param header_name: a header to pass when making calls to the API.
:param header_value: a header value to pass when making calls to
the API.
:param cookie: a cookie to include in the header when making calls
to the API
:param pool_threads: The number of threads to use for async requests
to the API. More threads means more concurrent API requests.
\\"\\"\\"
_pool = None
def __init__(
self,
configuration: typing.Optional[Configuration] = None,
header_name: typing.Optional[str] = None,
header_value: typing.Optional[str] = None,
cookie: typing.Optional[str] = None,
pool_threads: int = 1
):
if configuration is None:
configuration = Configuration()
self.configuration = configuration
self.pool_threads = pool_threads
self.rest_client = rest.RESTClientObject(configuration)
self.default_headers = HTTPHeaderDict()
if header_name is not None:
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = 'OpenAPI-Generator/1.0.0/python'
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def close(self):
if self._pool:
self._pool.close()
self._pool.join()
self._pool = None
if hasattr(atexit, 'unregister'):
atexit.unregister(self.close)
@property
def pool(self):
\\"\\"\\"Create thread pool on first request
avoids instantiating unused threadpool for blocking clients.
\\"\\"\\"
if self._pool is None:
atexit.register(self.close)
self._pool = ThreadPool(self.pool_threads)
return self._pool
@property
def user_agent(self):
\\"\\"\\"User agent for this API client\\"\\"\\"
return self.default_headers['User-Agent']
@user_agent.setter
def user_agent(self, value):
self.default_headers['User-Agent'] = value
def set_default_header(self, header_name, header_value):
self.default_headers[header_name] = header_value
def __call_api(
self,
resource_path: str,
method: str,
headers: typing.Optional[HTTPHeaderDict] = None,
body: typing.Optional[typing.Union[str, bytes]] = None,
fields: typing.Optional[typing.Tuple[typing.Tuple[str, str], ...]] = None,
auth_settings: typing.Optional[typing.List[str]] = None,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
host: typing.Optional[str] = None,
) -> urllib3.HTTPResponse:
# header parameters
used_headers = HTTPHeaderDict(self.default_headers)
if self.cookie:
headers['Cookie'] = self.cookie
# auth setting
self.update_params_for_auth(used_headers,
auth_settings, resource_path, method, body)
# must happen after cookie setting and auth setting in case user is overriding those
if headers:
used_headers.update(headers)
# request url
if host is None:
url = self.configuration.host + resource_path
else:
# use server/host defined in path or operation instead
url = host + resource_path
# perform request and return response
response = self.request(
method,
url,
headers=used_headers,
fields=fields,
body=body,
stream=stream,
timeout=timeout,
)
return response
def call_api(
self,
resource_path: str,
method: str,
headers: typing.Optional[HTTPHeaderDict] = None,
body: typing.Optional[typing.Union[str, bytes]] = None,
fields: typing.Optional[typing.Tuple[typing.Tuple[str, str], ...]] = None,
auth_settings: typing.Optional[typing.List[str]] = None,
async_req: typing.Optional[bool] = None,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
host: typing.Optional[str] = None,
) -> urllib3.HTTPResponse:
\\"\\"\\"Makes the HTTP request (synchronous) and returns deserialized data.
To make an async_req request, set the async_req parameter.
:param resource_path: Path to method endpoint.
:param method: Method to call.
:param headers: Header parameters to be
placed in the request header.
:param body: Request body.
:param fields: Request post form parameters,
for \`application/x-www-form-urlencoded\`, \`multipart/form-data\`.
:param auth_settings: Auth Settings names for the request.
:param async_req: execute request asynchronously
:type async_req: bool, optional TODO remove, unused
:param stream: if True, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Also when True, if the openapi spec describes a file download,
the data will be written to a local filesystme file and the BinarySchema
instance will also inherit from FileSchema and FileIO
Default is False.
:type stream: bool, optional
:param timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param host: api endpoint host
:return:
If async_req parameter is True,
the request will be called asynchronously.
The method will return the request thread.
If parameter async_req is False or missing,
then the method will return the response directly.
\\"\\"\\"
if not async_req:
return self.__call_api(
resource_path,
method,
headers,
body,
fields,
auth_settings,
stream,
timeout,
host,
)
return self.pool.apply_async(
self.__call_api,
(
resource_path,
method,
headers,
body,
json,
fields,
auth_settings,
stream,
timeout,
host,
)
)
def request(
self,
method: str,
url: str,
headers: typing.Optional[HTTPHeaderDict] = None,
fields: typing.Optional[typing.Tuple[typing.Tuple[str, str], ...]] = None,
body: typing.Optional[typing.Union[str, bytes]] = None,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
) -> urllib3.HTTPResponse:
\\"\\"\\"Makes the HTTP request using RESTClient.\\"\\"\\"
if method == \\"GET\\":
return self.rest_client.GET(url,
stream=stream,
timeout=timeout,
headers=headers)
elif method == \\"HEAD\\":
return self.rest_client.HEAD(url,
stream=stream,
timeout=timeout,
headers=headers)
elif method == \\"OPTIONS\\":
return self.rest_client.OPTIONS(url,
headers=headers,
fields=fields,
stream=stream,
timeout=timeout,
body=body)
elif method == \\"POST\\":
return self.rest_client.POST(url,
headers=headers,
fields=fields,
stream=stream,
timeout=timeout,
body=body)
elif method == \\"PUT\\":
return self.rest_client.PUT(url,
headers=headers,
fields=fields,
stream=stream,
timeout=timeout,
body=body)
elif method == \\"PATCH\\":
return self.rest_client.PATCH(url,
headers=headers,
fields=fields,
stream=stream,
timeout=timeout,
body=body)
elif method == \\"DELETE\\":
return self.rest_client.DELETE(url,
headers=headers,
stream=stream,
timeout=timeout,
body=body)
else:
raise ApiValueError(
\\"http method must be \`GET\`, \`HEAD\`, \`OPTIONS\`,\\"
\\" \`POST\`, \`PATCH\`, \`PUT\` or \`DELETE\`.\\"
)
def update_params_for_auth(self, headers, auth_settings,
resource_path, method, body):
\\"\\"\\"Updates header and query params based on authentication setting.
:param headers: Header parameters dict to be updated.
:param auth_settings: Authentication setting identifiers list.
:param resource_path: A string representation of the HTTP request resource path.
:param method: A string representation of the HTTP request method.
:param body: A object representing the body of the HTTP request.
The object type is the return value of _encoder.default().
\\"\\"\\"
if not auth_settings:
return
for auth in auth_settings:
auth_setting = self.configuration.auth_settings().get(auth)
if not auth_setting:
continue
if auth_setting['in'] == 'cookie':
headers.add('Cookie', auth_setting['value'])
elif auth_setting['in'] == 'header':
if auth_setting['type'] != 'http-signature':
headers.add(auth_setting['key'], auth_setting['value'])
elif auth_setting['in'] == 'query':
\\"\\"\\" TODO implement auth in query
need to pass in prefix_separator_iterator
and need to output resource_path with query params added
\\"\\"\\"
raise ApiValueError(\\"Auth in query not yet implemented\\")
else:
raise ApiValueError(
'Authentication token must be in \`query\` or \`header\`'
)
class Api:
\\"\\"\\"NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
\\"\\"\\"
def __init__(self, api_client: typing.Optional[ApiClient] = None):
if api_client is None:
api_client = ApiClient()
self.api_client = api_client
@staticmethod
def _verify_typed_dict_inputs_oapg(cls: typing.Type[typing_extensions.TypedDict], data: typing.Dict[str, typing.Any]):
\\"\\"\\"
Ensures that:
- required keys are present
- additional properties are not input
- value stored under required keys do not have the value unset
Note: detailed value checking is done in schema classes
\\"\\"\\"
missing_required_keys = []
required_keys_with_unset_values = []
for required_key in cls.__required_keys__:
if required_key not in data:
missing_required_keys.append(required_key)
continue
value = data[required_key]
if value is unset:
required_keys_with_unset_values.append(required_key)
if missing_required_keys:
raise ApiTypeError(
'{} missing {} required arguments: {}'.format(
cls.__name__, len(missing_required_keys), missing_required_keys
)
)
if required_keys_with_unset_values:
raise ApiValueError(
'{} contains invalid unset values for {} required keys: {}'.format(
cls.__name__, len(required_keys_with_unset_values), required_keys_with_unset_values
)
)
disallowed_additional_keys = []
for key in data:
if key in cls.__required_keys__ or key in cls.__optional_keys__:
continue
disallowed_additional_keys.append(key)
if disallowed_additional_keys:
raise ApiTypeError(
'{} got {} unexpected keyword arguments: {}'.format(
cls.__name__, len(disallowed_additional_keys), disallowed_additional_keys
)
)
def _get_host_oapg(
self,
operation_id: str,
servers: typing.Tuple[typing.Dict[str, str], ...] = tuple(),
host_index: typing.Optional[int] = None
) -> typing.Optional[str]:
configuration = self.api_client.configuration
try:
if host_index is None:
index = configuration.server_operation_index.get(
operation_id, configuration.server_index
)
else:
index = host_index
server_variables = configuration.server_operation_variables.get(
operation_id, configuration.server_variables
)
host = configuration.get_host_from_settings(
index, variables=server_variables, servers=servers
)
except IndexError:
if servers:
raise ApiValueError(
\\"Invalid host index. Must be 0 <= index < %s\\" %
len(servers)
)
host = None
return host
class SerializedRequestBody(typing_extensions.TypedDict, total=False):
body: typing.Union[str, bytes]
fields: typing.Tuple[typing.Union[RequestField, typing.Tuple[str, str]], ...]
class RequestBody(StyleFormSerializer, JSONDetector):
\\"\\"\\"
A request body parameter
content: content_type to MediaType Schema info
\\"\\"\\"
__json_encoder = JSONEncoder()
def __init__(
self,
content: typing.Dict[str, MediaType],
required: bool = False,
):
self.required = required
if len(content) == 0:
raise ValueError('Invalid value for content, the content dict must have >= 1 entry')
self.content = content
def __serialize_json(
self,
in_data: typing.Any
) -> typing.Dict[str, bytes]:
in_data = self.__json_encoder.default(in_data)
json_str = json.dumps(in_data, separators=(\\",\\", \\":\\"), ensure_ascii=False).encode(
\\"utf-8\\"
)
return dict(body=json_str)
@staticmethod
def __serialize_text_plain(in_data: typing.Any) -> typing.Dict[str, str]:
if isinstance(in_data, frozendict.frozendict):
raise ValueError('Unable to serialize type frozendict.frozendict to text/plain')
elif isinstance(in_data, tuple):
raise ValueError('Unable to serialize type tuple to text/plain')
elif isinstance(in_data, NoneClass):
raise ValueError('Unable to serialize type NoneClass to text/plain')
elif isinstance(in_data, BoolClass):
raise ValueError('Unable to serialize type BoolClass to text/plain')
return dict(body=str(in_data))
def __multipart_json_item(self, key: str, value: Schema) -> RequestField:
json_value = self.__json_encoder.default(value)
request_field = RequestField(name=key, data=json.dumps(json_value))
request_field.make_multipart(content_type='application/json')
return request_field
def __multipart_form_item(self, key: str, value: Schema) -> RequestField:
if isinstance(value, str):
request_field = RequestField(name=key, data=str(value))
request_field.make_multipart(content_type='text/plain')
elif isinstance(value, bytes):
request_field = RequestField(name=key, data=value)
request_field.make_multipart(content_type='application/octet-stream')
elif isinstance(value, FileIO):
# TODO use content.encoding to limit allowed content types if they are present
request_field = RequestField.from_tuples(key, (os.path.basename(value.name), value.read()))
value.close()
else:
request_field = self.__multipart_json_item(key=key, value=value)
return request_field
def __serialize_multipart_form_data(
self, in_data: Schema
) -> typing.Dict[str, typing.Tuple[RequestField, ...]]:
if not isinstance(in_data, frozendict.frozendict):
raise ValueError(f'Unable to serialize {in_data} to multipart/form-data because it is not a dict of data')
\\"\\"\\"
In a multipart/form-data request body, each schema property, or each element of a schema array property,
takes a section in the payload with an internal header as defined by RFC7578. The serialization strategy
for each property of a multipart/form-data request body can be specified in an associated Encoding Object.
When passing in multipart types, boundaries MAY be used to separate sections of the content being
transferred – thus, the following default Content-Types are defined for multipart:
If the (object) property is a primitive, or an array of primitive values, the default Content-Type is text/plain
If the property is complex, or an array of complex values, the default Content-Type is application/json
Question: how is the array of primitives encoded?
If the property is a type: string with a contentEncoding, the default Content-Type is application/octet-stream
\\"\\"\\"
fields = []
for key, value in in_data.items():
if isinstance(value, tuple):
if value:
# values use explode = True, so the code makes a RequestField for each item with name=key
for item in value:
request_field = self.__multipart_form_item(key=key, value=item)
fields.append(request_field)
else:
# send an empty array as json because exploding will not send it
request_field = self.__multipart_json_item(key=key, value=value)
fields.append(request_field)
else:
request_field = self.__multipart_form_item(key=key, value=value)
fields.append(request_field)
return dict(fields=tuple(fields))
def __serialize_application_octet_stream(self, in_data: BinarySchema) -> typing.Dict[str, bytes]:
if isinstance(in_data, bytes):
return dict(body=in_data)
# FileIO type
result = dict(body=in_data.read())
in_data.close()
return result
def __serialize_application_x_www_form_data(
self, in_data: typing.Any
) -> SerializedRequestBody:
\\"\\"\\"
POST submission of form data in body
\\"\\"\\"
if not isinstance(in_data, frozendict.frozendict):
raise ValueError(
f'Unable to serialize {in_data} to application/x-www-form-urlencoded because it is not a dict of data')
cast_in_data = self.__json_encoder.default(in_data)
value = self._serialize_form(cast_in_data, name='', explode=True, percent_encode=True)
return dict(body=value)
def serialize(
self, in_data: typing.Any, content_type: str
) -> SerializedRequestBody:
\\"\\"\\"
If a str is returned then the result will be assigned to data when making the request
If a tuple is returned then the result will be used as fields input in encode_multipart_formdata
Return a tuple of
The key of the return dict is
- body for application/json
- encode_multipart and fields for multipart/form-data
\\"\\"\\"
media_type = self.content[content_type]
if isinstance(in_data, media_type.schema):
cast_in_data = in_data
elif isinstance(in_data, (dict, frozendict.frozendict)) and in_data:
cast_in_data = media_type.schema(**in_data)
else:
cast_in_data = media_type.schema(in_data)
# TODO check for and use encoding if it exists
# and content_type is multipart or application/x-www-form-urlencoded
if self._content_type_is_json(content_type):
return self.__serialize_json(cast_in_data)
elif content_type == 'text/plain':
return self.__serialize_text_plain(cast_in_data)
elif content_type == 'multipart/form-data':
return self.__serialize_multipart_form_data(cast_in_data)
elif content_type == 'application/x-www-form-urlencoded':
return self.__serialize_application_x_www_form_data(cast_in_data)
elif content_type == 'application/octet-stream':
return self.__serialize_application_octet_stream(cast_in_data)
raise NotImplementedError('Serialization has not yet been implemented for {}'.format(content_type))",
"generated/python/my_api_python/apis/__init__.py": "# do not import all endpoints into this module because that uses a lot of memory and stack frames
# if you need the ability to import all endpoints then import them from
# tags, paths, or path_to_api, or tag_to_api",
"generated/python/my_api_python/apis/path_to_api.py": "import typing_extensions
from my_api_python.paths import PathValues
from my_api_python.apis.paths.hello import Hello
PathToApi = typing_extensions.TypedDict(
'PathToApi',
{
PathValues.HELLO: Hello,
}
)
path_to_api = PathToApi(
{
PathValues.HELLO: Hello,
}
)
",
"generated/python/my_api_python/apis/paths/__init__.py": "# do not import all endpoints into this module because that uses a lot of memory and stack frames
# if you need the ability to import all endpoints from this module, import them with
# from my_api_python.apis.path_to_api import path_to_api
",
"generated/python/my_api_python/apis/paths/hello.py": "from my_api_python.paths.hello.get import ApiForget
class Hello(
ApiForget,
):
pass
",
"generated/python/my_api_python/apis/tag_to_api.py": "import typing_extensions
from my_api_python.apis.tags import TagValues
from my_api_python.apis.tags.default_api import DefaultApi
TagToApi = typing_extensions.TypedDict(
'TagToApi',
{
TagValues.DEFAULT: DefaultApi,
}
)
tag_to_api = TagToApi(
{
TagValues.DEFAULT: DefaultApi,
}
)
",
"generated/python/my_api_python/apis/tags/__init__.py": "# do not import all endpoints into this module because that uses a lot of memory and stack frames
# if you need the ability to import all endpoints from this module, import them with
# from my_api_python.apis.tag_to_api import tag_to_api
import enum
class TagValues(str, enum.Enum):
DEFAULT = \\"default\\"
",
"generated/python/my_api_python/apis/tags/default_api.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
from my_api_python.paths.hello.get import SayHello
class DefaultApi(
SayHello,
):
\\"\\"\\"NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
\\"\\"\\"
pass
",
"generated/python/my_api_python/apis/tags/default_api_operation_config.py": "import urllib.parse
import json
from typing import Callable, Any, Dict, List, NamedTuple, TypeVar, Generic, Union, TypedDict, Protocol, Optional, Literal
from functools import wraps
from dataclasses import dataclass, fields
from my_api_python.model.api_error_response_content import ApiErrorResponseContent
from my_api_python.model.say_hello_response_content import SayHelloResponseContent
from my_api_python.schemas import (
date,
datetime,
file_type,
none_type,
)
from my_api_python.api_client import JSONEncoder
T = TypeVar('T')
# Generic type for object keyed by operation names
@dataclass
class OperationConfig(Generic[T]):
say_hello: T
...
# Look up path and http method for a given operation name
OperationLookup = {
\\"say_hello\\": {
\\"path\\": \\"/hello\\",
\\"method\\": \\"get\\",
},
}
class Operations:
@staticmethod
def all(value: T) -> OperationConfig[T]:
\\"\\"\\"
Returns an OperationConfig with the same value for every operation
\\"\\"\\"
return OperationConfig(**{ operation_id: value for operation_id, _ in OperationLookup.items() })
def uri_decode(value):
\\"\\"\\"
URI decode a value or list of values
\\"\\"\\"
if isinstance(value, list):
return [urllib.parse.unquote(v) for v in value]
return urllib.parse.unquote(value)
def decode_request_parameters(parameters):
\\"\\"\\"
URI decode api request parameters (path, query or multi-value query)
\\"\\"\\"
return { key: uri_decode(parameters[key]) if parameters[key] is not None else parameters[key] for key in parameters.keys() }
def parse_body(body, content_types, model):
\\"\\"\\"
Parse the body of an api request into the given model if present
\\"\\"\\"
if len([c for c in content_types if c != 'application/json']) == 0:
body = json.loads(body or '{}')
if model != Any:
body = model(**body)
return body
RequestParameters = TypeVar('RequestParameters')
RequestArrayParameters = TypeVar('RequestArrayParameters')
RequestBody = TypeVar('RequestBody')
ResponseBody = TypeVar('ResponseBody')
StatusCode = TypeVar('StatusCode')
@dataclass
class ApiRequest(Generic[RequestParameters, RequestArrayParameters, RequestBody]):
request_parameters: RequestParameters
request_array_parameters: RequestArrayParameters
body: RequestBody
event: Any
context: Any
interceptor_context: Dict[str, Any]
@dataclass
class ChainedApiRequest(ApiRequest[RequestParameters, RequestArrayParameters, RequestBody],
Generic[RequestParameters, RequestArrayParameters, RequestBody]):
chain: 'HandlerChain'
@dataclass
class ApiResponse(Generic[StatusCode, ResponseBody]):
status_code: StatusCode
headers: Dict[str, str]
body: ResponseBody
class HandlerChain(Generic[RequestParameters, RequestArrayParameters, RequestBody, StatusCode, ResponseBody]):
def next(self, request: ChainedApiRequest[RequestParameters, RequestArrayParameters, RequestBody]) -> ApiResponse[StatusCode, ResponseBody]:
raise Exception(\\"Not implemented!\\")
def _build_handler_chain(_interceptors, handler) -> HandlerChain:
if len(_interceptors) == 0:
class BaseHandlerChain(HandlerChain[RequestParameters, RequestArrayParameters, RequestBody, StatusCode, ResponseBody]):
def next(self, request: ApiRequest[RequestParameters, RequestArrayParameters, RequestBody]) -> ApiResponse[StatusCode, ResponseBody]:
return handler(request)
return BaseHandlerChain()
else:
interceptor = _interceptors[0]
class RemainingHandlerChain(HandlerChain[RequestParameters, RequestArrayParameters, RequestBody, StatusCode, ResponseBody]):
def next(self, request: ChainedApiRequest[RequestParameters, RequestArrayParameters, RequestBody]) -> ApiResponse[StatusCode, ResponseBody]:
return interceptor(ChainedApiRequest(
request_parameters = request.request_parameters,
request_array_parameters = request.request_array_parameters,
body = request.body,
event = request.event,
context = request.context,
interceptor_context = request.interceptor_context,
chain = _build_handler_chain(_interceptors[1:len(_interceptors)], handler),
))
return RemainingHandlerChain()
# Request parameters are single value query params or path params
class SayHelloRequestParameters(TypedDict):
name: str
...
# Request array parameters are multi-value query params
class SayHelloRequestArrayParameters(TypedDict):
...
# Request body type (default to Any when no body parameters exist, or leave unchanged as str if it's a primitive type)
SayHelloRequestBody = Any
SayHello200OperationResponse = ApiResponse[Literal[200], SayHelloResponseContent]
SayHello400OperationResponse = ApiResponse[Literal[400], ApiErrorResponseContent]
SayHelloOperationResponses = Union[SayHello200OperationResponse, SayHello400OperationResponse, ]
# Request type for say_hello
SayHelloRequest = ApiRequest[SayHelloRequestParameters, SayHelloRequestArrayParameters, SayHelloRequestBody]
SayHelloChainedRequest = ChainedApiRequest[SayHelloRequestParameters, SayHelloRequestArrayParameters, SayHelloRequestBody]
class SayHelloHandlerFunction(Protocol):
def __call__(self, input: SayHelloRequest, **kwargs) -> SayHelloOperationResponses:
...
SayHelloInterceptor = Callable[[SayHelloChainedRequest], SayHelloOperationResponses]
def say_hello_handler(_handler: SayHelloHandlerFunction = None, interceptors: List[SayHelloInterceptor] = []):
\\"\\"\\"
Decorator for an api handler for the say_hello operation, providing a typed interface for inputs and outputs
\\"\\"\\"
def _handler_wrapper(handler: SayHelloHandlerFunction):
@wraps(handler)
def wrapper(event, context, additional_interceptors = [], **kwargs):
request_parameters = decode_request_parameters({
**(event['pathParameters'] or {}),
**(event['queryStringParameters'] or {}),
})
request_array_parameters = decode_request_parameters({
**(event['multiValueQueryStringParameters'] or {}),
})
body = {}
interceptor_context = {}
chain = _build_handler_chain(additional_interceptors + interceptors, handler)
response = chain.next(ApiRequest(
request_parameters,
request_array_parameters,
body,
event,
context,
interceptor_context,
), **kwargs)
response_body = ''
if response.body is None:
pass
elif response.status_code == 200:
response_body = json.dumps(JSONEncoder().default(response.body))
elif response.status_code == 400:
response_body = json.dumps(JSONEncoder().default(response.body))
return {
'statusCode': response.status_code,
'headers': response.headers,
'body': response_body,
}
return wrapper
# Support use as a decorator with no arguments, or with interceptor arguments
if callable(_handler):
return _handler_wrapper(_handler)
elif _handler is None:
return _handler_wrapper
else:
raise Exception(\\"Positional arguments are not supported by say_hello_handler.\\")
Interceptor = Callable[[ChainedApiRequest[RequestParameters, RequestArrayParameters, RequestBody]], ApiResponse[StatusCode, ResponseBody]]
def concat_method_and_path(method: str, path: str):
return \\"{}||{}\\".format(method.lower(), path)
OperationIdByMethodAndPath = { concat_method_and_path(method_and_path[\\"method\\"], method_and_path[\\"path\\"]): operation for operation, method_and_path in OperationLookup.items() }
@dataclass
class HandlerRouterHandlers:
say_hello: Callable[[Dict, Any], Dict]
def handler_router(handlers: HandlerRouterHandlers, interceptors: List[Interceptor] = []):
\\"\\"\\"
Returns a lambda handler which can be used to route requests to the appropriate typed lambda handler function.
\\"\\"\\"
_handlers = { field.name: getattr(handlers, field.name) for field in fields(handlers) }
def handler_wrapper(event, context):
operation_id = OperationIdByMethodAndPath[concat_method_and_path(event['requestContext']['httpMethod'], event['requestContext']['resourcePath'])]
handler = _handlers[operation_id]
return handler(event, context, additional_interceptors=interceptors)
return handler_wrapper
",
"generated/python/my_api_python/configuration.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
import copy
import logging
import multiprocessing
import sys
import urllib3
from http import client as http_client
from my_api_python.exceptions import ApiValueError
JSON_SCHEMA_VALIDATION_KEYWORDS = {
'multipleOf', 'maximum', 'exclusiveMaximum',
'minimum', 'exclusiveMinimum', 'maxLength',
'minLength', 'pattern', 'maxItems', 'minItems',
'uniqueItems', 'maxProperties', 'minProperties',
}
class Configuration(object):
\\"\\"\\"NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
:param host: Base url
:param api_key: Dict to store API key(s).
Each entry in the dict specifies an API key.
The dict key is the name of the security scheme in the OAS specification.
The dict value is the API key secret.
:param api_key_prefix: Dict to store API prefix (e.g. Bearer)
The dict key is the name of the security scheme in the OAS specification.
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication
:param password: Password for HTTP basic authentication
:param discard_unknown_keys: Boolean value indicating whether to discard
unknown properties. A server may send a response that includes additional
properties that are not known by the client in the following scenarios:
1. The OpenAPI document is incomplete, i.e. it does not match the server
implementation.
2. The client was generated using an older version of the OpenAPI document
and the server has been upgraded since then.
If a schema in the OpenAPI document defines the additionalProperties attribute,
then all undeclared properties received by the server are injected into the
additional properties map. In that case, there are undeclared properties, and
nothing to discard.
:param disabled_client_side_validations (string): Comma-separated list of
JSON schema validation keywords to disable JSON schema structural validation
rules. The following keywords may be specified: multipleOf, maximum,
exclusiveMaximum, minimum, exclusiveMinimum, maxLength, minLength, pattern,
maxItems, minItems.
By default, the validation is performed for data generated locally by the client
and data received from the server, independent of any validation performed by
the server side. If the input data does not satisfy the JSON schema validation
rules specified in the OpenAPI document, an exception is raised.
If disabled_client_side_validations is set, structural validation is
disabled. This can be useful to troubleshoot data validation problem, such as
when the OpenAPI document validation rules do not match the actual API data
received by the server.
:param server_index: Index to servers configuration.
:param server_variables: Mapping with string values to replace variables in
templated server configuration. The validation of enums is performed for
variables with defined enum values before.
:param server_operation_index: Mapping from operation ID to an index to server
configuration.
:param server_operation_variables: Mapping from operation ID to a mapping with
string values to replace variables in templated server configuration.
The validation of enums is performed for variables with defined enum values before.
\\"\\"\\"
_default = None
def __init__(
self,
host=None,
discard_unknown_keys=False,
disabled_client_side_validations=\\"\\",
server_index=None,
server_variables=None,
server_operation_index=None,
server_operation_variables=None,
):
\\"\\"\\"Constructor
\\"\\"\\"
self._base_path = \\"http://localhost\\" if host is None else host
\\"\\"\\"Default Base url
\\"\\"\\"
self.server_index = 0 if server_index is None and host is None else server_index
self.server_operation_index = server_operation_index or {}
\\"\\"\\"Default server index
\\"\\"\\"
self.server_variables = server_variables or {}
self.server_operation_variables = server_operation_variables or {}
\\"\\"\\"Default server variables
\\"\\"\\"
self.temp_folder_path = None
\\"\\"\\"Temp file folder for downloading files
\\"\\"\\"
# Authentication Settings
self.disabled_client_side_validations = disabled_client_side_validations
self.logger = {}
\\"\\"\\"Logging Settings
\\"\\"\\"
self.logger[\\"package_logger\\"] = logging.getLogger(\\"my_api_python\\")
self.logger[\\"urllib3_logger\\"] = logging.getLogger(\\"urllib3\\")
self.logger_format = '%(asctime)s %(levelname)s %(message)s'
\\"\\"\\"Log format
\\"\\"\\"
self.logger_stream_handler = None
\\"\\"\\"Log stream handler
\\"\\"\\"
self.logger_file_handler = None
\\"\\"\\"Log file handler
\\"\\"\\"
self.logger_file = None
\\"\\"\\"Debug file location
\\"\\"\\"
self.debug = False
\\"\\"\\"Debug switch
\\"\\"\\"
self.verify_ssl = True
\\"\\"\\"SSL/TLS verification
Set this to false to skip verifying SSL certificate when calling API
from https server.
\\"\\"\\"
self.ssl_ca_cert = None
\\"\\"\\"Set this to customize the certificate file to verify the peer.
\\"\\"\\"
self.cert_file = None
\\"\\"\\"client certificate file
\\"\\"\\"
self.key_file = None
\\"\\"\\"client key file
\\"\\"\\"
self.assert_hostname = None
\\"\\"\\"Set this to True/False to enable/disable SSL hostname verification.
\\"\\"\\"
self.connection_pool_maxsize = multiprocessing.cpu_count() * 5
\\"\\"\\"urllib3 connection pool's maximum number of connections saved
per pool. urllib3 uses 1 connection as default value, but this is
not the best value when you are making a lot of possibly parallel
requests to the same host, which is often the case here.
cpu_count * 5 is used as default value to increase performance.
\\"\\"\\"
self.proxy = None
\\"\\"\\"Proxy URL
\\"\\"\\"
self.proxy_headers = None
\\"\\"\\"Proxy headers
\\"\\"\\"
self.safe_chars_for_path_param = ''
\\"\\"\\"Safe chars for path_param
\\"\\"\\"
self.retries = None
\\"\\"\\"Adding retries to override urllib3 default value 3
\\"\\"\\"
# Enable client side validation
self.client_side_validation = True
# Options to pass down to the underlying urllib3 socket
self.socket_options = None
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if k not in ('logger', 'logger_file_handler'):
setattr(result, k, copy.deepcopy(v, memo))
# shallow copy of loggers
result.logger = copy.copy(self.logger)
# use setters to configure loggers
result.logger_file = self.logger_file
result.debug = self.debug
return result
def __setattr__(self, name, value):
object.__setattr__(self, name, value)
if name == 'disabled_client_side_validations':
s = set(filter(None, value.split(',')))
for v in s:
if v not in JSON_SCHEMA_VALIDATION_KEYWORDS:
raise ApiValueError(
\\"Invalid keyword: '{0}''\\".format(v))
self._disabled_client_side_validations = s
@classmethod
def set_default(cls, default):
\\"\\"\\"Set default instance of configuration.
It stores default configuration, which can be
returned by get_default_copy method.
:param default: object of Configuration
\\"\\"\\"
cls._default = copy.deepcopy(default)
@classmethod
def get_default_copy(cls):
\\"\\"\\"Return new instance of configuration.
This method returns newly created, based on default constructor,
object of Configuration class or returns a copy of default
configuration passed by the set_default method.
:return: The configuration object.
\\"\\"\\"
if cls._default is not None:
return copy.deepcopy(cls._default)
return Configuration()
@property
def logger_file(self):
\\"\\"\\"The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
\\"\\"\\"
return self.__logger_file
@logger_file.setter
def logger_file(self, value):
\\"\\"\\"The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
\\"\\"\\"
self.__logger_file = value
if self.__logger_file:
# If set logging file,
# then add file handler and remove stream handler.
self.logger_file_handler = logging.FileHandler(self.__logger_file)
self.logger_file_handler.setFormatter(self.logger_formatter)
for _, logger in self.logger.items():
logger.addHandler(self.logger_file_handler)
@property
def debug(self):
\\"\\"\\"Debug status
:param value: The debug status, True or False.
:type: bool
\\"\\"\\"
return self.__debug
@debug.setter
def debug(self, value):
\\"\\"\\"Debug status
:param value: The debug status, True or False.
:type: bool
\\"\\"\\"
self.__debug = value
if self.__debug:
# if debug status is True, turn on debug logging
for _, logger in self.logger.items():
logger.setLevel(logging.DEBUG)
# turn on http_client debug
http_client.HTTPConnection.debuglevel = 1
else:
# if debug status is False, turn off debug logging,
# setting log level to default \`logging.WARNING\`
for _, logger in self.logger.items():
logger.setLevel(logging.WARNING)
# turn off http_client debug
http_client.HTTPConnection.debuglevel = 0
@property
def logger_format(self):
\\"\\"\\"The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
\\"\\"\\"
return self.__logger_format
@logger_format.setter
def logger_format(self, value):
\\"\\"\\"The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
\\"\\"\\"
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier, alias=None):
\\"\\"\\"Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
\\"\\"\\"
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
return \\"%s %s\\" % (prefix, key)
else:
return key
def get_basic_auth_token(self):
\\"\\"\\"Gets HTTP basic authentication header (string).
:return: The token for basic HTTP authentication.
\\"\\"\\"
username = \\"\\"
if self.username is not None:
username = self.username
password = \\"\\"
if self.password is not None:
password = self.password
return urllib3.util.make_headers(
basic_auth=username + ':' + password
).get('authorization')
def auth_settings(self):
\\"\\"\\"Gets Auth Settings dict for api client.
:return: The Auth Settings information dict.
\\"\\"\\"
auth = {}
return auth
def to_debug_report(self):
\\"\\"\\"Gets the essential information for debugging.
:return: The report for debugging.
\\"\\"\\"
return \\"Python SDK Debug Report:\\\\n\\"\\\\
\\"OS: {env}\\\\n\\"\\\\
\\"Python Version: {pyversion}\\\\n\\"\\\\
\\"Version of the API: 1.0.0\\\\n\\"\\\\
\\"SDK Package Version: 1.0.0\\".\\\\
format(env=sys.platform, pyversion=sys.version)
def get_host_settings(self):
\\"\\"\\"Gets an array of host settings
:return: An array of host settings
\\"\\"\\"
return [
{
'url': \\"\\",
'description': \\"No description provided\\",
}
]
def get_host_from_settings(self, index, variables=None, servers=None):
\\"\\"\\"Gets host URL based on the index and variables
:param index: array index of the host settings
:param variables: hash of variable and the corresponding value
:param servers: an array of host settings or None
:return: URL based on host settings
\\"\\"\\"
if index is None:
return self._base_path
variables = {} if variables is None else variables
servers = self.get_host_settings() if servers is None else servers
try:
server = servers[index]
except IndexError:
raise ValueError(
\\"Invalid index {0} when selecting the host settings. \\"
\\"Must be less than {1}\\".format(index, len(servers)))
url = server['url']
# go through variables and replace placeholders
for variable_name, variable in server.get('variables', {}).items():
used_value = variables.get(
variable_name, variable['default_value'])
if 'enum_values' in variable \\\\
and used_value not in variable['enum_values']:
raise ValueError(
\\"The variable \`{0}\` in the host URL has invalid value \\"
\\"{1}. Must be {2}.\\".format(
variable_name, variables[variable_name],
variable['enum_values']))
url = url.replace(\\"{\\" + variable_name + \\"}\\", used_value)
return url
@property
def host(self):
\\"\\"\\"Return generated host.\\"\\"\\"
return self.get_host_from_settings(self.server_index, variables=self.server_variables)
@host.setter
def host(self, value):
\\"\\"\\"Fix base path.\\"\\"\\"
self._base_path = value
self.server_index = None
",
"generated/python/my_api_python/exceptions.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
import dataclasses
import typing
from urllib3._collections import HTTPHeaderDict
class OpenApiException(Exception):
\\"\\"\\"The base exception class for all OpenAPIExceptions\\"\\"\\"
class ApiTypeError(OpenApiException, TypeError):
def __init__(self, msg, path_to_item=None, valid_classes=None,
key_type=None):
\\"\\"\\" Raises an exception for TypeErrors
Args:
msg (str): the exception message
Keyword Args:
path_to_item (list): a list of keys an indices to get to the
current_item
None if unset
valid_classes (tuple): the primitive classes that current item
should be an instance of
None if unset
key_type (bool): False if our value is a value in a dict
True if it is a key in a dict
False if our item is an item in a list
None if unset
\\"\\"\\"
self.path_to_item = path_to_item
self.valid_classes = valid_classes
self.key_type = key_type
full_msg = msg
if path_to_item:
full_msg = \\"{0} at {1}\\".format(msg, render_path(path_to_item))
super(ApiTypeError, self).__init__(full_msg)
class ApiValueError(OpenApiException, ValueError):
def __init__(self, msg, path_to_item=None):
\\"\\"\\"
Args:
msg (str): the exception message
Keyword Args:
path_to_item (list) the path to the exception in the
received_data dict. None if unset
\\"\\"\\"
self.path_to_item = path_to_item
full_msg = msg
if path_to_item:
full_msg = \\"{0} at {1}\\".format(msg, render_path(path_to_item))
super(ApiValueError, self).__init__(full_msg)
class ApiAttributeError(OpenApiException, AttributeError):
def __init__(self, msg, path_to_item=None):
\\"\\"\\"
Raised when an attribute reference or assignment fails.
Args:
msg (str): the exception message
Keyword Args:
path_to_item (None/list) the path to the exception in the
received_data dict
\\"\\"\\"
self.path_to_item = path_to_item
full_msg = msg
if path_to_item:
full_msg = \\"{0} at {1}\\".format(msg, render_path(path_to_item))
super(ApiAttributeError, self).__init__(full_msg)
class ApiKeyError(OpenApiException, KeyError):
def __init__(self, msg, path_to_item=None):
\\"\\"\\"
Args:
msg (str): the exception message
Keyword Args:
path_to_item (None/list) the path to the exception in the
received_data dict
\\"\\"\\"
self.path_to_item = path_to_item
full_msg = msg
if path_to_item:
full_msg = \\"{0} at {1}\\".format(msg, render_path(path_to_item))
super(ApiKeyError, self).__init__(full_msg)
T = typing.TypeVar(\\"T\\")
@dataclasses.dataclass
class ApiException(OpenApiException, typing.Generic[T]):
status: int
reason: str
api_response: typing.Optional[T] = None
@property
def body(self) -> typing.Union[str, bytes, None]:
if not self.api_response:
return None
return self.api_response.response.data
@property
def headers(self) -> typing.Optional[HTTPHeaderDict]:
if not self.api_response:
return None
return self.api_response.response.getheaders()
def __str__(self):
\\"\\"\\"Custom error messages for exception\\"\\"\\"
error_message = \\"({0})\\\\n\\"\\\\
\\"Reason: {1}\\\\n\\".format(self.status, self.reason)
if self.headers:
error_message += \\"HTTP response headers: {0}\\\\n\\".format(
self.headers)
if self.body:
error_message += \\"HTTP response body: {0}\\\\n\\".format(self.body)
return error_message
def render_path(path_to_item):
\\"\\"\\"Returns a string representation of a path\\"\\"\\"
result = \\"\\"
for pth in path_to_item:
if isinstance(pth, int):
result += \\"[{0}]\\".format(pth)
else:
result += \\"['{0}']\\".format(pth)
return result
",
"generated/python/my_api_python/model/__init__.py": "# we can not import model classes here because that would create a circular
# reference which would not work in python2
# do not import all models into this module because that uses a lot of memory and stack frames
# if you need the ability to import all models from one package, import them with
# from my_api_python.models import ModelA, ModelB
",
"generated/python/my_api_python/model/api_error_response_content.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
from datetime import date, datetime # noqa: F401
import decimal # noqa: F401
import functools # noqa: F401
import io # noqa: F401
import re # noqa: F401
import typing # noqa: F401
import typing_extensions # noqa: F401
import uuid # noqa: F401
import frozendict # noqa: F401
from my_api_python import schemas # noqa: F401
class ApiErrorResponseContent(
schemas.DictSchema
):
\\"\\"\\"NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
\\"\\"\\"
class MetaOapg:
required = {
\\"errorMessage\\",
}
class properties:
errorMessage = schemas.StrSchema
__annotations__ = {
\\"errorMessage\\": errorMessage,
}
errorMessage: MetaOapg.properties.errorMessage
@typing.overload
def __getitem__(self, name: typing_extensions.Literal[\\"errorMessage\\"]) -> MetaOapg.properties.errorMessage: ...
@typing.overload
def __getitem__(self, name: str) -> schemas.UnsetAnyTypeSchema: ...
def __getitem__(self, name: typing.Union[typing_extensions.Literal[\\"errorMessage\\", ], str]):
# dict_instance[name] accessor
return super().__getitem__(name)
@typing.overload
def get_item_oapg(self, name: typing_extensions.Literal[\\"errorMessage\\"]) -> MetaOapg.properties.errorMessage: ...
@typing.overload
def get_item_oapg(self, name: str) -> typing.Union[schemas.UnsetAnyTypeSchema, schemas.Unset]: ...
def get_item_oapg(self, name: typing.Union[typing_extensions.Literal[\\"errorMessage\\", ], str]):
return super().get_item_oapg(name)
def __new__(
cls,
*_args: typing.Union[dict, frozendict.frozendict, ],
errorMessage: typing.Union[MetaOapg.properties.errorMessage, str, ],
_configuration: typing.Optional[schemas.Configuration] = None,
**kwargs: typing.Union[schemas.AnyTypeSchema, dict, frozendict.frozendict, str, date, datetime, uuid.UUID, int, float, decimal.Decimal, None, list, tuple, bytes],
) -> 'ApiErrorResponseContent':
return super().__new__(
cls,
*_args,
errorMessage=errorMessage,
_configuration=_configuration,
**kwargs,
)
",
"generated/python/my_api_python/model/api_error_response_content.pyi": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
from datetime import date, datetime # noqa: F401
import decimal # noqa: F401
import functools # noqa: F401
import io # noqa: F401
import re # noqa: F401
import typing # noqa: F401
import typing_extensions # noqa: F401
import uuid # noqa: F401
import frozendict # noqa: F401
from my_api_python import schemas # noqa: F401
class ApiErrorResponseContent(
schemas.DictSchema
):
\\"\\"\\"NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
\\"\\"\\"
class MetaOapg:
required = {
\\"errorMessage\\",
}
class properties:
errorMessage = schemas.StrSchema
__annotations__ = {
\\"errorMessage\\": errorMessage,
}
errorMessage: MetaOapg.properties.errorMessage
@typing.overload
def __getitem__(self, name: typing_extensions.Literal[\\"errorMessage\\"]) -> MetaOapg.properties.errorMessage: ...
@typing.overload
def __getitem__(self, name: str) -> schemas.UnsetAnyTypeSchema: ...
def __getitem__(self, name: typing.Union[typing_extensions.Literal[\\"errorMessage\\", ], str]):
# dict_instance[name] accessor
return super().__getitem__(name)
@typing.overload
def get_item_oapg(self, name: typing_extensions.Literal[\\"errorMessage\\"]) -> MetaOapg.properties.errorMessage: ...
@typing.overload
def get_item_oapg(self, name: str) -> typing.Union[schemas.UnsetAnyTypeSchema, schemas.Unset]: ...
def get_item_oapg(self, name: typing.Union[typing_extensions.Literal[\\"errorMessage\\", ], str]):
return super().get_item_oapg(name)
def __new__(
cls,
*_args: typing.Union[dict, frozendict.frozendict, ],
errorMessage: typing.Union[MetaOapg.properties.errorMessage, str, ],
_configuration: typing.Optional[schemas.Configuration] = None,
**kwargs: typing.Union[schemas.AnyTypeSchema, dict, frozendict.frozendict, str, date, datetime, uuid.UUID, int, float, decimal.Decimal, None, list, tuple, bytes],
) -> 'ApiErrorResponseContent':
return super().__new__(
cls,
*_args,
errorMessage=errorMessage,
_configuration=_configuration,
**kwargs,
)
",
"generated/python/my_api_python/model/say_hello_response_content.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
from datetime import date, datetime # noqa: F401
import decimal # noqa: F401
import functools # noqa: F401
import io # noqa: F401
import re # noqa: F401
import typing # noqa: F401
import typing_extensions # noqa: F401
import uuid # noqa: F401
import frozendict # noqa: F401
from my_api_python import schemas # noqa: F401
class SayHelloResponseContent(
schemas.DictSchema
):
\\"\\"\\"NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
\\"\\"\\"
class MetaOapg:
required = {
\\"message\\",
}
class properties:
message = schemas.StrSchema
__annotations__ = {
\\"message\\": message,
}
message: MetaOapg.properties.message
@typing.overload
def __getitem__(self, name: typing_extensions.Literal[\\"message\\"]) -> MetaOapg.properties.message: ...
@typing.overload
def __getitem__(self, name: str) -> schemas.UnsetAnyTypeSchema: ...
def __getitem__(self, name: typing.Union[typing_extensions.Literal[\\"message\\", ], str]):
# dict_instance[name] accessor
return super().__getitem__(name)
@typing.overload
def get_item_oapg(self, name: typing_extensions.Literal[\\"message\\"]) -> MetaOapg.properties.message: ...
@typing.overload
def get_item_oapg(self, name: str) -> typing.Union[schemas.UnsetAnyTypeSchema, schemas.Unset]: ...
def get_item_oapg(self, name: typing.Union[typing_extensions.Literal[\\"message\\", ], str]):
return super().get_item_oapg(name)
def __new__(
cls,
*_args: typing.Union[dict, frozendict.frozendict, ],
message: typing.Union[MetaOapg.properties.message, str, ],
_configuration: typing.Optional[schemas.Configuration] = None,
**kwargs: typing.Union[schemas.AnyTypeSchema, dict, frozendict.frozendict, str, date, datetime, uuid.UUID, int, float, decimal.Decimal, None, list, tuple, bytes],
) -> 'SayHelloResponseContent':
return super().__new__(
cls,
*_args,
message=message,
_configuration=_configuration,
**kwargs,
)
",
"generated/python/my_api_python/model/say_hello_response_content.pyi": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
from datetime import date, datetime # noqa: F401
import decimal # noqa: F401
import functools # noqa: F401
import io # noqa: F401
import re # noqa: F401
import typing # noqa: F401
import typing_extensions # noqa: F401
import uuid # noqa: F401
import frozendict # noqa: F401
from my_api_python import schemas # noqa: F401
class SayHelloResponseContent(
schemas.DictSchema
):
\\"\\"\\"NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
\\"\\"\\"
class MetaOapg:
required = {
\\"message\\",
}
class properties:
message = schemas.StrSchema
__annotations__ = {
\\"message\\": message,
}
message: MetaOapg.properties.message
@typing.overload
def __getitem__(self, name: typing_extensions.Literal[\\"message\\"]) -> MetaOapg.properties.message: ...
@typing.overload
def __getitem__(self, name: str) -> schemas.UnsetAnyTypeSchema: ...
def __getitem__(self, name: typing.Union[typing_extensions.Literal[\\"message\\", ], str]):
# dict_instance[name] accessor
return super().__getitem__(name)
@typing.overload
def get_item_oapg(self, name: typing_extensions.Literal[\\"message\\"]) -> MetaOapg.properties.message: ...
@typing.overload
def get_item_oapg(self, name: str) -> typing.Union[schemas.UnsetAnyTypeSchema, schemas.Unset]: ...
def get_item_oapg(self, name: typing.Union[typing_extensions.Literal[\\"message\\", ], str]):
return super().get_item_oapg(name)
def __new__(
cls,
*_args: typing.Union[dict, frozendict.frozendict, ],
message: typing.Union[MetaOapg.properties.message, str, ],
_configuration: typing.Optional[schemas.Configuration] = None,
**kwargs: typing.Union[schemas.AnyTypeSchema, dict, frozendict.frozendict, str, date, datetime, uuid.UUID, int, float, decimal.Decimal, None, list, tuple, bytes],
) -> 'SayHelloResponseContent':
return super().__new__(
cls,
*_args,
message=message,
_configuration=_configuration,
**kwargs,
)
",
"generated/python/my_api_python/models/__init__.py": "# coding: utf-8
# flake8: noqa
# import all models into this package
# if you have many models here with many references from one model to another this may
# raise a RecursionError
# to avoid this, import only the models that you directly need like:
# from my_api_python.model.pet import Pet
# or import this package, but before doing it, use:
# import sys
# sys.setrecursionlimit(n)
from my_api_python.model.api_error_response_content import ApiErrorResponseContent
from my_api_python.model.say_hello_response_content import SayHelloResponseContent
",
"generated/python/my_api_python/paths/__init__.py": "# do not import all endpoints into this module because that uses a lot of memory and stack frames
# if you need the ability to import all endpoints from this module, import them with
# from my_api_python.apis.path_to_api import path_to_api
import enum
class PathValues(str, enum.Enum):
HELLO = \\"/hello\\"
",
"generated/python/my_api_python/paths/hello/__init__.py": "# do not import all endpoints into this module because that uses a lot of memory and stack frames
# if you need the ability to import all endpoints from this module, import them with
# from my_api_python.paths.hello import Api
from my_api_python.paths import PathValues
path = PathValues.HELLO",
"generated/python/my_api_python/paths/hello/get.py": "# coding: utf-8
\\"\\"\\"
Generated by: https://openapi-generator.tech
\\"\\"\\"
from dataclasses import dataclass
import typing_extensions
import urllib3
from urllib3._collections import HTTPHeaderDict
from my_api_python import api_client, exceptions
from datetime import date, datetime # noqa: F401
import decimal # noqa: F401
import functools # noqa: F401
import io # noqa: F401
import re # noqa: F401
import typing # noqa: F401
import typing_extensions # noqa: F401
import uuid # noqa: F401
import frozendict # noqa: F401
from my_api_python import schemas # noqa: F401
from my_api_python.model.say_hello_response_content import SayHelloResponseContent
from my_api_python.model.api_error_response_content import ApiErrorResponseContent
from . import path
# Query params
NameSchema = schemas.StrSchema
RequestRequiredQueryParams = typing_extensions.TypedDict(
'RequestRequiredQueryParams',
{
'name': typing.Union[NameSchema, str, ],
}
)
RequestOptionalQueryParams = typing_extensions.TypedDict(
'RequestOptionalQueryParams',
{
},
total=False
)
class RequestQueryParams(RequestRequiredQueryParams, RequestOptionalQueryParams):
pass
request_query_name = api_client.QueryParameter(
name=\\"name\\",
style=api_client.ParameterStyle.FORM,
schema=NameSchema,
required=True,
explode=True,
)
SchemaFor200ResponseBodyApplicationJson = SayHelloResponseContent
@dataclass
class ApiResponseFor200(api_client.ApiResponse):
response: urllib3.HTTPResponse
body: typing.Union[
SchemaFor200ResponseBodyApplicationJson,
]
headers: schemas.Unset = schemas.unset
_response_for_200 = api_client.OpenApiResponse(
response_cls=ApiResponseFor200,
content={
'application/json': api_client.MediaType(
schema=SchemaFor200ResponseBodyApplicationJson),
},
)
SchemaFor400ResponseBodyApplicationJson = ApiErrorResponseContent
@dataclass
class ApiResponseFor400(api_client.ApiResponse):
response: urllib3.HTTPResponse
body: typing.Union[
SchemaFor400ResponseBodyApplicationJson,
]
headers: schemas.Unset = schemas.unset
_response_for_400 = api_client.OpenApiResponse(
response_cls=ApiResponseFor400,
content={
'application/json': api_client.MediaType(
schema=SchemaFor400ResponseBodyApplicationJson),
},
)
_status_code_to_response = {
'200': _response_for_200,
'400': _response_for_400,
}
_all_accept_content_types = (
'application/json',
)
class BaseApi(api_client.Api):
@typing.overload
def _say_hello_oapg(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: typing_extensions.Literal[False] = ...,
) -> typing.Union[
ApiResponseFor200,
]: ...
@typing.overload
def _say_hello_oapg(
self,
skip_deserialization: typing_extensions.Literal[True],
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
) -> api_client.ApiResponseWithoutDeserialization: ...
@typing.overload
def _say_hello_oapg(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = ...,
) -> typing.Union[
ApiResponseFor200,
api_client.ApiResponseWithoutDeserialization,
]: ...
def _say_hello_oapg(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = False,
):
\\"\\"\\"
:param skip_deserialization: If true then api_response.response will be set but
api_response.body and api_response.headers will not be deserialized into schema
class instances
\\"\\"\\"
self._verify_typed_dict_inputs_oapg(RequestQueryParams, query_params)
used_path = path.value
prefix_separator_iterator = None
for parameter in (
request_query_name,
):
parameter_data = query_params.get(parameter.name, schemas.unset)
if parameter_data is schemas.unset:
continue
if prefix_separator_iterator is None:
prefix_separator_iterator = parameter.get_prefix_separator_iterator()
serialized_data = parameter.serialize(parameter_data, prefix_separator_iterator)
for serialized_value in serialized_data.values():
used_path += serialized_value
_headers = HTTPHeaderDict()
# TODO add cookie handling
if accept_content_types:
for accept_content_type in accept_content_types:
_headers.add('Accept', accept_content_type)
response = self.api_client.call_api(
resource_path=used_path,
method='get'.upper(),
headers=_headers,
stream=stream,
timeout=timeout,
)
if skip_deserialization:
api_response = api_client.ApiResponseWithoutDeserialization(response=response)
else:
response_for_status = _status_code_to_response.get(str(response.status))
if response_for_status:
api_response = response_for_status.deserialize(response, self.api_client.configuration)
else:
api_response = api_client.ApiResponseWithoutDeserialization(response=response)
if not 200 <= response.status <= 299:
raise exceptions.ApiException(
status=response.status,
reason=response.reason,
api_response=api_response
)
return api_response
class SayHello(BaseApi):
# this class is used by api classes that refer to endpoints with operationId fn names
@typing.overload
def say_hello(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: typing_extensions.Literal[False] = ...,
) -> typing.Union[
ApiResponseFor200,
]: ...
@typing.overload
def say_hello(
self,
skip_deserialization: typing_extensions.Literal[True],
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
) -> api_client.ApiResponseWithoutDeserialization: ...
@typing.overload
def say_hello(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = ...,
) -> typing.Union[
ApiResponseFor200,
api_client.ApiResponseWithoutDeserialization,
]: ...
def say_hello(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = False,
):
return self._say_hello_oapg(
query_params=query_params,
accept_content_types=accept_content_types,
stream=stream,
timeout=timeout,
skip_deserialization=skip_deserialization
)
class ApiForget(BaseApi):
# this class is used by api classes that refer to endpoints by path and http method names
@typing.overload
def get(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: typing_extensions.Literal[False] = ...,
) -> typing.Union[
ApiResponseFor200,
]: ...
@typing.overload
def get(
self,
skip_deserialization: typing_extensions.Literal[True],
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
) -> api_client.ApiResponseWithoutDeserialization: ...
@typing.overload
def get(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = ...,
) -> typing.Union[
ApiResponseFor200,
api_client.ApiResponseWithoutDeserialization,
]: ...
def get(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = False,
):
return self._say_hello_oapg(
query_params=query_params,
accept_content_types=accept_content_types,
stream=stream,
timeout=timeout,
skip_deserialization=skip_deserialization
)
",
"generated/python/my_api_python/paths/hello/get.pyi": "# coding: utf-8
\\"\\"\\"
Generated by: https://openapi-generator.tech
\\"\\"\\"
from dataclasses import dataclass
import typing_extensions
import urllib3
from urllib3._collections import HTTPHeaderDict
from my_api_python import api_client, exceptions
from datetime import date, datetime # noqa: F401
import decimal # noqa: F401
import functools # noqa: F401
import io # noqa: F401
import re # noqa: F401
import typing # noqa: F401
import typing_extensions # noqa: F401
import uuid # noqa: F401
import frozendict # noqa: F401
from my_api_python import schemas # noqa: F401
from my_api_python.model.say_hello_response_content import SayHelloResponseContent
from my_api_python.model.api_error_response_content import ApiErrorResponseContent
# Query params
NameSchema = schemas.StrSchema
RequestRequiredQueryParams = typing_extensions.TypedDict(
'RequestRequiredQueryParams',
{
'name': typing.Union[NameSchema, str, ],
}
)
RequestOptionalQueryParams = typing_extensions.TypedDict(
'RequestOptionalQueryParams',
{
},
total=False
)
class RequestQueryParams(RequestRequiredQueryParams, RequestOptionalQueryParams):
pass
request_query_name = api_client.QueryParameter(
name=\\"name\\",
style=api_client.ParameterStyle.FORM,
schema=NameSchema,
required=True,
explode=True,
)
SchemaFor200ResponseBodyApplicationJson = SayHelloResponseContent
@dataclass
class ApiResponseFor200(api_client.ApiResponse):
response: urllib3.HTTPResponse
body: typing.Union[
SchemaFor200ResponseBodyApplicationJson,
]
headers: schemas.Unset = schemas.unset
_response_for_200 = api_client.OpenApiResponse(
response_cls=ApiResponseFor200,
content={
'application/json': api_client.MediaType(
schema=SchemaFor200ResponseBodyApplicationJson),
},
)
SchemaFor400ResponseBodyApplicationJson = ApiErrorResponseContent
@dataclass
class ApiResponseFor400(api_client.ApiResponse):
response: urllib3.HTTPResponse
body: typing.Union[
SchemaFor400ResponseBodyApplicationJson,
]
headers: schemas.Unset = schemas.unset
_response_for_400 = api_client.OpenApiResponse(
response_cls=ApiResponseFor400,
content={
'application/json': api_client.MediaType(
schema=SchemaFor400ResponseBodyApplicationJson),
},
)
_all_accept_content_types = (
'application/json',
)
class BaseApi(api_client.Api):
@typing.overload
def _say_hello_oapg(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: typing_extensions.Literal[False] = ...,
) -> typing.Union[
ApiResponseFor200,
]: ...
@typing.overload
def _say_hello_oapg(
self,
skip_deserialization: typing_extensions.Literal[True],
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
) -> api_client.ApiResponseWithoutDeserialization: ...
@typing.overload
def _say_hello_oapg(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = ...,
) -> typing.Union[
ApiResponseFor200,
api_client.ApiResponseWithoutDeserialization,
]: ...
def _say_hello_oapg(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = False,
):
\\"\\"\\"
:param skip_deserialization: If true then api_response.response will be set but
api_response.body and api_response.headers will not be deserialized into schema
class instances
\\"\\"\\"
self._verify_typed_dict_inputs_oapg(RequestQueryParams, query_params)
used_path = path.value
prefix_separator_iterator = None
for parameter in (
request_query_name,
):
parameter_data = query_params.get(parameter.name, schemas.unset)
if parameter_data is schemas.unset:
continue
if prefix_separator_iterator is None:
prefix_separator_iterator = parameter.get_prefix_separator_iterator()
serialized_data = parameter.serialize(parameter_data, prefix_separator_iterator)
for serialized_value in serialized_data.values():
used_path += serialized_value
_headers = HTTPHeaderDict()
# TODO add cookie handling
if accept_content_types:
for accept_content_type in accept_content_types:
_headers.add('Accept', accept_content_type)
response = self.api_client.call_api(
resource_path=used_path,
method='get'.upper(),
headers=_headers,
stream=stream,
timeout=timeout,
)
if skip_deserialization:
api_response = api_client.ApiResponseWithoutDeserialization(response=response)
else:
response_for_status = _status_code_to_response.get(str(response.status))
if response_for_status:
api_response = response_for_status.deserialize(response, self.api_client.configuration)
else:
api_response = api_client.ApiResponseWithoutDeserialization(response=response)
if not 200 <= response.status <= 299:
raise exceptions.ApiException(
status=response.status,
reason=response.reason,
api_response=api_response
)
return api_response
class SayHello(BaseApi):
# this class is used by api classes that refer to endpoints with operationId fn names
@typing.overload
def say_hello(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: typing_extensions.Literal[False] = ...,
) -> typing.Union[
ApiResponseFor200,
]: ...
@typing.overload
def say_hello(
self,
skip_deserialization: typing_extensions.Literal[True],
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
) -> api_client.ApiResponseWithoutDeserialization: ...
@typing.overload
def say_hello(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = ...,
) -> typing.Union[
ApiResponseFor200,
api_client.ApiResponseWithoutDeserialization,
]: ...
def say_hello(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = False,
):
return self._say_hello_oapg(
query_params=query_params,
accept_content_types=accept_content_types,
stream=stream,
timeout=timeout,
skip_deserialization=skip_deserialization
)
class ApiForget(BaseApi):
# this class is used by api classes that refer to endpoints by path and http method names
@typing.overload
def get(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: typing_extensions.Literal[False] = ...,
) -> typing.Union[
ApiResponseFor200,
]: ...
@typing.overload
def get(
self,
skip_deserialization: typing_extensions.Literal[True],
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
) -> api_client.ApiResponseWithoutDeserialization: ...
@typing.overload
def get(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = ...,
) -> typing.Union[
ApiResponseFor200,
api_client.ApiResponseWithoutDeserialization,
]: ...
def get(
self,
query_params: RequestQueryParams = frozendict.frozendict(),
accept_content_types: typing.Tuple[str] = _all_accept_content_types,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
skip_deserialization: bool = False,
):
return self._say_hello_oapg(
query_params=query_params,
accept_content_types=accept_content_types,
stream=stream,
timeout=timeout,
skip_deserialization=skip_deserialization
)
",
"generated/python/my_api_python/rest.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
import logging
import ssl
from urllib.parse import urlencode
import typing
import certifi
import urllib3
from urllib3._collections import HTTPHeaderDict
from my_api_python.exceptions import ApiException, ApiValueError
logger = logging.getLogger(__name__)
class RESTClientObject(object):
def __init__(self, configuration, pools_size=4, maxsize=None):
# urllib3.PoolManager will pass all kw parameters to connectionpool
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501
# maxsize is the number of requests to host that are allowed in parallel # noqa: E501
# Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501
# cert_reqs
if configuration.verify_ssl:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
# ca_certs
if configuration.ssl_ca_cert:
ca_certs = configuration.ssl_ca_cert
else:
# if not set certificate file, use Mozilla's root certificates.
ca_certs = certifi.where()
addition_pool_args = {}
if configuration.assert_hostname is not None:
addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501
if configuration.retries is not None:
addition_pool_args['retries'] = configuration.retries
if configuration.socket_options is not None:
addition_pool_args['socket_options'] = configuration.socket_options
if maxsize is None:
if configuration.connection_pool_maxsize is not None:
maxsize = configuration.connection_pool_maxsize
else:
maxsize = 4
# https pool manager
if configuration.proxy:
self.pool_manager = urllib3.ProxyManager(
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
proxy_url=configuration.proxy,
proxy_headers=configuration.proxy_headers,
**addition_pool_args
)
else:
self.pool_manager = urllib3.PoolManager(
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
**addition_pool_args
)
def request(
self,
method: str,
url: str,
headers: typing.Optional[HTTPHeaderDict] = None,
fields: typing.Optional[typing.Tuple[typing.Tuple[str, typing.Any], ...]] = None,
body: typing.Optional[typing.Union[str, bytes]] = None,
stream: bool = False,
timeout: typing.Optional[typing.Union[int, typing.Tuple]] = None,
) -> urllib3.HTTPResponse:
\\"\\"\\"Perform requests.
:param method: http request method
:param url: http request url
:param headers: http request headers
:param body: request body, for other types
:param fields: request parameters for
\`application/x-www-form-urlencoded\`
or \`multipart/form-data\`
:param stream: if True, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is False.
:param timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
\\"\\"\\"
method = method.upper()
assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT',
'PATCH', 'OPTIONS']
if fields and body:
raise ApiValueError(
\\"body parameter cannot be used with fields parameter.\\"
)
fields = fields or {}
headers = headers or {}
if timeout:
if isinstance(timeout, (int, float)): # noqa: E501,F821
timeout = urllib3.Timeout(total=timeout)
elif (isinstance(timeout, tuple) and
len(timeout) == 2):
timeout = urllib3.Timeout(connect=timeout[0], read=timeout[1])
try:
# For \`POST\`, \`PUT\`, \`PATCH\`, \`OPTIONS\`, \`DELETE\`
if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']:
if 'Content-Type' not in headers and body is None:
r = self.pool_manager.request(
method,
url,
preload_content=not stream,
timeout=timeout,
headers=headers
)
elif headers['Content-Type'] == 'application/x-www-form-urlencoded': # noqa: E501
r = self.pool_manager.request(
method, url,
body=body,
fields=fields,
encode_multipart=False,
preload_content=not stream,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'multipart/form-data':
# must del headers['Content-Type'], or the correct
# Content-Type which generated by urllib3 will be
# overwritten.
del headers['Content-Type']
r = self.pool_manager.request(
method, url,
fields=fields,
encode_multipart=True,
preload_content=not stream,
timeout=timeout,
headers=headers)
# Pass a \`string\` parameter directly in the body to support
# other content types than Json when \`body\` argument is
# provided in serialized form
elif isinstance(body, str) or isinstance(body, bytes):
request_body = body
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=not stream,
timeout=timeout,
headers=headers)
else:
# Cannot generate the request from given parameters
msg = \\"\\"\\"Cannot prepare a request message for provided
arguments. Please check that your arguments match
declared content type.\\"\\"\\"
raise ApiException(status=0, reason=msg)
# For \`GET\`, \`HEAD\`
else:
r = self.pool_manager.request(method, url,
preload_content=not stream,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = \\"{0}\\\\n{1}\\".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if not stream:
# log response body
logger.debug(\\"response body: %s\\", r.data)
return r
def GET(self, url, headers=None, stream=False,
timeout=None, fields=None) -> urllib3.HTTPResponse:
return self.request(\\"GET\\", url,
headers=headers,
stream=stream,
timeout=timeout,
fields=fields)
def HEAD(self, url, headers=None, stream=False,
timeout=None, fields=None) -> urllib3.HTTPResponse:
return self.request(\\"HEAD\\", url,
headers=headers,
stream=stream,
timeout=timeout,
fields=fields)
def OPTIONS(self, url, headers=None,
body=None, stream=False, timeout=None, fields=None) -> urllib3.HTTPResponse:
return self.request(\\"OPTIONS\\", url,
headers=headers,
stream=stream,
timeout=timeout,
body=body, fields=fields)
def DELETE(self, url, headers=None, body=None,
stream=False, timeout=None, fields=None) -> urllib3.HTTPResponse:
return self.request(\\"DELETE\\", url,
headers=headers,
stream=stream,
timeout=timeout,
body=body, fields=fields)
def POST(self, url, headers=None,
body=None, stream=False, timeout=None, fields=None) -> urllib3.HTTPResponse:
return self.request(\\"POST\\", url,
headers=headers,
stream=stream,
timeout=timeout,
body=body, fields=fields)
def PUT(self, url, headers=None,
body=None, stream=False, timeout=None, fields=None) -> urllib3.HTTPResponse:
return self.request(\\"PUT\\", url,
headers=headers,
stream=stream,
timeout=timeout,
body=body, fields=fields)
def PATCH(self, url, headers=None,
body=None, stream=False, timeout=None, fields=None) -> urllib3.HTTPResponse:
return self.request(\\"PATCH\\", url,
headers=headers,
stream=stream,
timeout=timeout,
body=body, fields=fields)
",
"generated/python/my_api_python/schemas.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
from collections import defaultdict
from datetime import date, datetime, timedelta # noqa: F401
import functools
import decimal
import io
import re
import types
import typing
import uuid
from dateutil.parser.isoparser import isoparser, _takes_ascii
import frozendict
from my_api_python.exceptions import (
ApiTypeError,
ApiValueError,
)
from my_api_python.configuration import (
Configuration,
)
class Unset(object):
\\"\\"\\"
An instance of this class is set as the default value for object type(dict) properties that are optional
When a property has an unset value, that property will not be assigned in the dict
\\"\\"\\"
pass
unset = Unset()
none_type = type(None)
file_type = io.IOBase
class FileIO(io.FileIO):
\\"\\"\\"
A class for storing files
Note: this class is not immutable
\\"\\"\\"
def __new__(cls, _arg: typing.Union[io.FileIO, io.BufferedReader]):
if isinstance(_arg, (io.FileIO, io.BufferedReader)):
if _arg.closed:
raise ApiValueError('Invalid file state; file is closed and must be open')
_arg.close()
inst = super(FileIO, cls).__new__(cls, _arg.name)
super(FileIO, inst).__init__(_arg.name)
return inst
raise ApiValueError('FileIO must be passed _arg which contains the open file')
def __init__(self, _arg: typing.Union[io.FileIO, io.BufferedReader]):
pass
def update(d: dict, u: dict):
\\"\\"\\"
Adds u to d
Where each dict is defaultdict(set)
\\"\\"\\"
if not u:
return d
for k, v in u.items():
if k not in d:
d[k] = v
else:
d[k] = d[k] | v
class ValidationMetadata(frozendict.frozendict):
\\"\\"\\"
A class storing metadata that is needed to validate OpenApi Schema payloads
\\"\\"\\"
def __new__(
cls,
path_to_item: typing.Tuple[typing.Union[str, int], ...] = tuple(['args[0]']),
from_server: bool = False,
configuration: typing.Optional[Configuration] = None,
seen_classes: typing.FrozenSet[typing.Type] = frozenset(),
validated_path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Type]] = frozendict.frozendict()
):
\\"\\"\\"
Args:
path_to_item: the path to the current data being instantiated.
For {'a': [1]} if the code is handling, 1, then the path is ('args[0]', 'a', 0)
This changes from location to location
from_server: whether or not this data came form the server
True when receiving server data
False when instantiating model with client side data not form the server
This does not change from location to location
configuration: the Configuration instance to use
This is needed because in Configuration:
- one can disable validation checking
This does not change from location to location
seen_classes: when deserializing data that matches multiple schemas, this is used to store
the schemas that have been traversed. This is used to stop processing when a cycle is seen.
This changes from location to location
validated_path_to_schemas: stores the already validated schema classes for a given path location
This does not change from location to location
\\"\\"\\"
return super().__new__(
cls,
path_to_item=path_to_item,
from_server=from_server,
configuration=configuration,
seen_classes=seen_classes,
validated_path_to_schemas=validated_path_to_schemas
)
def validation_ran_earlier(self, cls: type) -> bool:
validated_schemas = self.validated_path_to_schemas.get(self.path_to_item, set())
validation_ran_earlier = validated_schemas and cls in validated_schemas
if validation_ran_earlier:
return True
if cls in self.seen_classes:
return True
return False
@property
def path_to_item(self) -> typing.Tuple[typing.Union[str, int], ...]:
return self.get('path_to_item')
@property
def from_server(self) -> bool:
return self.get('from_server')
@property
def configuration(self) -> typing.Optional[Configuration]:
return self.get('configuration')
@property
def seen_classes(self) -> typing.FrozenSet[typing.Type]:
return self.get('seen_classes')
@property
def validated_path_to_schemas(self) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Type]]:
return self.get('validated_path_to_schemas')
def add_deeper_validated_schemas(validation_metadata: ValidationMetadata, path_to_schemas: dict):
# this is called if validation_ran_earlier and current and deeper locations need to be added
current_path_to_item = validation_metadata.path_to_item
other_path_to_schemas = {}
for path_to_item, schemas in validation_metadata.validated_path_to_schemas.items():
if len(path_to_item) < len(current_path_to_item):
continue
path_begins_with_current_path = path_to_item[:len(current_path_to_item)] == current_path_to_item
if path_begins_with_current_path:
other_path_to_schemas[path_to_item] = schemas
update(path_to_schemas, other_path_to_schemas)
class Singleton:
\\"\\"\\"
Enums and singletons are the same
The same instance is returned for a given key of (cls, _arg)
\\"\\"\\"
_instances = {}
def __new__(cls, _arg: typing.Any, **kwargs):
\\"\\"\\"
cls base classes: BoolClass, NoneClass, str, decimal.Decimal
The 3rd key is used in the tuple below for a corner case where an enum contains integer 1
However 1.0 can also be ingested into that enum schema because 1.0 == 1 and
Decimal('1.0') == Decimal('1')
But if we omitted the 3rd value in the key, then Decimal('1.0') would be stored as Decimal('1')
and json serializing that instance would be '1' rather than the expected '1.0'
Adding the 3rd value, the str of _arg ensures that 1.0 -> Decimal('1.0') which is serialized as 1.0
\\"\\"\\"
key = (cls, _arg, str(_arg))
if key not in cls._instances:
if isinstance(_arg, (none_type, bool, BoolClass, NoneClass)):
inst = super().__new__(cls)
cls._instances[key] = inst
else:
cls._instances[key] = super().__new__(cls, _arg)
return cls._instances[key]
def __repr__(self):
if isinstance(self, NoneClass):
return f'<{self.__class__.__name__}: None>'
elif isinstance(self, BoolClass):
if bool(self):
return f'<{self.__class__.__name__}: True>'
return f'<{self.__class__.__name__}: False>'
return f'<{self.__class__.__name__}: {super().__repr__()}>'
class classproperty:
def __init__(self, fget):
self.fget = fget
def __get__(self, owner_self, owner_cls):
return self.fget(owner_cls)
class NoneClass(Singleton):
@classproperty
def NONE(cls):
return cls(None)
def __bool__(self) -> bool:
return False
class BoolClass(Singleton):
@classproperty
def TRUE(cls):
return cls(True)
@classproperty
def FALSE(cls):
return cls(False)
@functools.lru_cache()
def __bool__(self) -> bool:
for key, instance in self._instances.items():
if self is instance:
return bool(key[1])
raise ValueError('Unable to find the boolean value of this instance')
class MetaOapgTyped:
exclusive_maximum: typing.Union[int, float]
inclusive_maximum: typing.Union[int, float]
exclusive_minimum: typing.Union[int, float]
inclusive_minimum: typing.Union[int, float]
max_items: int
min_items: int
discriminator: typing.Dict[str, typing.Dict[str, typing.Type['Schema']]]
class properties:
# to hold object properties
pass
additional_properties: typing.Optional[typing.Type['Schema']]
max_properties: int
min_properties: int
all_of: typing.List[typing.Type['Schema']]
one_of: typing.List[typing.Type['Schema']]
any_of: typing.List[typing.Type['Schema']]
not_schema: typing.Type['Schema']
max_length: int
min_length: int
items: typing.Type['Schema']
class Schema:
\\"\\"\\"
the base class of all swagger/openapi schemas/models
\\"\\"\\"
__inheritable_primitive_types_set = {decimal.Decimal, str, tuple, frozendict.frozendict, FileIO, bytes, BoolClass, NoneClass}
_types: typing.Set[typing.Type]
MetaOapg = MetaOapgTyped
@staticmethod
def __get_valid_classes_phrase(input_classes):
\\"\\"\\"Returns a string phrase describing what types are allowed\\"\\"\\"
all_classes = list(input_classes)
all_classes = sorted(all_classes, key=lambda cls: cls.__name__)
all_class_names = [cls.__name__ for cls in all_classes]
if len(all_class_names) == 1:
return \\"is {0}\\".format(all_class_names[0])
return \\"is one of [{0}]\\".format(\\", \\".join(all_class_names))
@staticmethod
def _get_class_oapg(item_cls: typing.Union[types.FunctionType, staticmethod, typing.Type['Schema']]) -> typing.Type['Schema']:
if isinstance(item_cls, types.FunctionType):
# referenced schema
return item_cls()
elif isinstance(item_cls, staticmethod):
# referenced schema
return item_cls.__func__()
return item_cls
@classmethod
def __type_error_message(
cls, var_value=None, var_name=None, valid_classes=None, key_type=None
):
\\"\\"\\"
Keyword Args:
var_value (any): the variable which has the type_error
var_name (str): the name of the variable which has the typ error
valid_classes (tuple): the accepted classes for current_item's
value
key_type (bool): False if our value is a value in a dict
True if it is a key in a dict
False if our item is an item in a tuple
\\"\\"\\"
key_or_value = \\"value\\"
if key_type:
key_or_value = \\"key\\"
valid_classes_phrase = cls.__get_valid_classes_phrase(valid_classes)
msg = \\"Invalid type. Required {1} type {2} and \\" \\"passed type was {3}\\".format(
var_name,
key_or_value,
valid_classes_phrase,
type(var_value).__name__,
)
return msg
@classmethod
def __get_type_error(cls, var_value, path_to_item, valid_classes, key_type=False):
error_msg = cls.__type_error_message(
var_name=path_to_item[-1],
var_value=var_value,
valid_classes=valid_classes,
key_type=key_type,
)
return ApiTypeError(
error_msg,
path_to_item=path_to_item,
valid_classes=valid_classes,
key_type=key_type,
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict.frozendict, tuple]]]:
\\"\\"\\"
Schema _validate_oapg
All keyword validation except for type checking was done in calling stack frames
If those validations passed, the validated classes are collected in path_to_schemas
Returns:
path_to_schemas: a map of path to schemas
Raises:
ApiValueError: when a string can't be converted into a date or datetime and it must be one of those classes
ApiTypeError: when the input type is not in the list of allowed spec types
\\"\\"\\"
base_class = type(arg)
if base_class not in cls._types:
raise cls.__get_type_error(
arg,
validation_metadata.path_to_item,
cls._types,
key_type=False,
)
path_to_schemas = {validation_metadata.path_to_item: set()}
path_to_schemas[validation_metadata.path_to_item].add(cls)
path_to_schemas[validation_metadata.path_to_item].add(base_class)
return path_to_schemas
@staticmethod
def _process_schema_classes_oapg(
schema_classes: typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict.frozendict, tuple]]
):
\\"\\"\\"
Processes and mutates schema_classes
If a SomeSchema is a subclass of DictSchema then remove DictSchema because it is already included
\\"\\"\\"
if len(schema_classes) < 2:
return
if len(schema_classes) > 2 and UnsetAnyTypeSchema in schema_classes:
schema_classes.remove(UnsetAnyTypeSchema)
x_schema = schema_type_classes & schema_classes
if not x_schema:
return
x_schema = x_schema.pop()
if any(c is not x_schema and issubclass(c, x_schema) for c in schema_classes):
# needed to not have a mro error in get_new_class
schema_classes.remove(x_schema)
@classmethod
def __get_new_cls(
cls,
arg,
validation_metadata: ValidationMetadata
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Type['Schema']]:
\\"\\"\\"
Make a new dynamic class and return an instance of that class
We are making an instance of cls, but instead of making cls
make a new class, new_cls
which includes dynamic bases including cls
return an instance of that new class
Dict property + List Item Assignment Use cases:
1. value is NOT an instance of the required schema class
the value is validated by _validate_oapg
_validate_oapg returns a key value pair
where the key is the path to the item, and the value will be the required manufactured class
made out of the matching schemas
2. value is an instance of the correct schema type
the value is NOT validated by _validate_oapg, _validate_oapg only checks that the instance is of the correct schema type
for this value, _validate_oapg does NOT return an entry for it in _path_to_schemas
and in list/dict _get_items_oapg,_get_properties_oapg the value will be directly assigned
because value is of the correct type, and validation was run earlier when the instance was created
\\"\\"\\"
_path_to_schemas = {}
if validation_metadata.validation_ran_earlier(cls):
add_deeper_validated_schemas(validation_metadata, _path_to_schemas)
else:
other_path_to_schemas = cls._validate_oapg(arg, validation_metadata=validation_metadata)
update(_path_to_schemas, other_path_to_schemas)
# loop through it make a new class for each entry
# do not modify the returned result because it is cached and we would be modifying the cached value
path_to_schemas = {}
for path, schema_classes in _path_to_schemas.items():
\\"\\"\\"
Use cases
1. N number of schema classes + enum + type != bool/None, classes in path_to_schemas: tuple/frozendict.frozendict/str/Decimal/bytes/FileIo
needs Singleton added
2. N number of schema classes + enum + type == bool/None, classes in path_to_schemas: BoolClass/NoneClass
Singleton already added
3. N number of schema classes, classes in path_to_schemas: BoolClass/NoneClass/tuple/frozendict.frozendict/str/Decimal/bytes/FileIo
\\"\\"\\"
cls._process_schema_classes_oapg(schema_classes)
enum_schema = any(
issubclass(this_cls, EnumBase) for this_cls in schema_classes)
inheritable_primitive_type = schema_classes.intersection(cls.__inheritable_primitive_types_set)
chosen_schema_classes = schema_classes - inheritable_primitive_type
suffix = tuple(inheritable_primitive_type)
if enum_schema and suffix[0] not in {NoneClass, BoolClass}:
suffix = (Singleton,) + suffix
used_classes = tuple(sorted(chosen_schema_classes, key=lambda a_cls: a_cls.__name__)) + suffix
mfg_cls = get_new_class(class_name='DynamicSchema', bases=used_classes)
path_to_schemas[path] = mfg_cls
return path_to_schemas
@classmethod
def _get_new_instance_without_conversion_oapg(
cls,
arg: typing.Any,
path_to_item: typing.Tuple[typing.Union[str, int], ...],
path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Type['Schema']]
):
# We have a Dynamic class and we are making an instance of it
if issubclass(cls, frozendict.frozendict) and issubclass(cls, DictBase):
properties = cls._get_properties_oapg(arg, path_to_item, path_to_schemas)
return super(Schema, cls).__new__(cls, properties)
elif issubclass(cls, tuple) and issubclass(cls, ListBase):
items = cls._get_items_oapg(arg, path_to_item, path_to_schemas)
return super(Schema, cls).__new__(cls, items)
\\"\\"\\"
str = openapi str, date, and datetime
decimal.Decimal = openapi int and float
FileIO = openapi binary type and the user inputs a file
bytes = openapi binary type and the user inputs bytes
\\"\\"\\"
return super(Schema, cls).__new__(cls, arg)
@classmethod
def from_openapi_data_oapg(
cls,
arg: typing.Union[
str,
date,
datetime,
int,
float,
decimal.Decimal,
bool,
None,
'Schema',
dict,
frozendict.frozendict,
tuple,
list,
io.FileIO,
io.BufferedReader,
bytes
],
_configuration: typing.Optional[Configuration]
):
\\"\\"\\"
Schema from_openapi_data_oapg
\\"\\"\\"
from_server = True
validated_path_to_schemas = {}
arg = cast_to_allowed_types(arg, from_server, validated_path_to_schemas)
validation_metadata = ValidationMetadata(
from_server=from_server, configuration=_configuration, validated_path_to_schemas=validated_path_to_schemas)
path_to_schemas = cls.__get_new_cls(arg, validation_metadata)
new_cls = path_to_schemas[validation_metadata.path_to_item]
new_inst = new_cls._get_new_instance_without_conversion_oapg(
arg,
validation_metadata.path_to_item,
path_to_schemas
)
return new_inst
@staticmethod
def __get_input_dict(*args, **kwargs) -> frozendict.frozendict:
input_dict = {}
if args and isinstance(args[0], (dict, frozendict.frozendict)):
input_dict.update(args[0])
if kwargs:
input_dict.update(kwargs)
return frozendict.frozendict(input_dict)
@staticmethod
def __remove_unsets(kwargs):
return {key: val for key, val in kwargs.items() if val is not unset}
def __new__(cls, *_args: typing.Union[dict, frozendict.frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, 'Schema'], _configuration: typing.Optional[Configuration] = None, **kwargs: typing.Union[dict, frozendict.frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, 'Schema', Unset]):
\\"\\"\\"
Schema __new__
Args:
_args (int/float/decimal.Decimal/str/list/tuple/dict/frozendict.frozendict/bool/None): the value
kwargs (str, int/float/decimal.Decimal/str/list/tuple/dict/frozendict.frozendict/bool/None): dict values
_configuration: contains the Configuration that enables json schema validation keywords
like minItems, minLength etc
Note: double underscores are used here because pycharm thinks that these variables
are instance properties if they are named normally :(
\\"\\"\\"
__kwargs = cls.__remove_unsets(kwargs)
if not _args and not __kwargs:
raise TypeError(
'No input given. args or kwargs must be given.'
)
if not __kwargs and _args and not isinstance(_args[0], dict):
__arg = _args[0]
else:
__arg = cls.__get_input_dict(*_args, **__kwargs)
__from_server = False
__validated_path_to_schemas = {}
__arg = cast_to_allowed_types(
__arg, __from_server, __validated_path_to_schemas)
__validation_metadata = ValidationMetadata(
configuration=_configuration, from_server=__from_server, validated_path_to_schemas=__validated_path_to_schemas)
__path_to_schemas = cls.__get_new_cls(__arg, __validation_metadata)
__new_cls = __path_to_schemas[__validation_metadata.path_to_item]
return __new_cls._get_new_instance_without_conversion_oapg(
__arg,
__validation_metadata.path_to_item,
__path_to_schemas
)
def __init__(
self,
*_args: typing.Union[
dict, frozendict.frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, 'Schema'],
_configuration: typing.Optional[Configuration] = None,
**kwargs: typing.Union[
dict, frozendict.frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, 'Schema', Unset
]
):
\\"\\"\\"
this is needed to fix 'Unexpected argument' warning in pycharm
this code does nothing because all Schema instances are immutable
this means that all input data is passed into and used in new, and after the new instance is made
no new attributes are assigned and init is not used
\\"\\"\\"
pass
\\"\\"\\"
import itertools
data_types = ('None', 'FrozenDict', 'Tuple', 'Str', 'Decimal', 'Bool')
type_to_cls = {
'None': 'NoneClass',
'FrozenDict': 'frozendict.frozendict',
'Tuple': 'tuple',
'Str': 'str',
'Decimal': 'decimal.Decimal',
'Bool': 'BoolClass'
}
cls_tuples = [v for v in itertools.combinations(data_types, 5)]
typed_classes = [f\\"class {''.join(cls_tuple)}Mixin({', '.join(type_to_cls[typ] for typ in cls_tuple)}):\\\\n pass\\" for cls_tuple in cls_tuples]
for cls in typed_classes:
print(cls)
object_classes = [f\\"{''.join(cls_tuple)}Mixin = object\\" for cls_tuple in cls_tuples]
for cls in object_classes:
print(cls)
\\"\\"\\"
if typing.TYPE_CHECKING:
# qty 1
NoneMixin = NoneClass
FrozenDictMixin = frozendict.frozendict
TupleMixin = tuple
StrMixin = str
DecimalMixin = decimal.Decimal
BoolMixin = BoolClass
BytesMixin = bytes
FileMixin = FileIO
# qty 2
class BinaryMixin(bytes, FileIO):
pass
class NoneFrozenDictMixin(NoneClass, frozendict.frozendict):
pass
class NoneTupleMixin(NoneClass, tuple):
pass
class NoneStrMixin(NoneClass, str):
pass
class NoneDecimalMixin(NoneClass, decimal.Decimal):
pass
class NoneBoolMixin(NoneClass, BoolClass):
pass
class FrozenDictTupleMixin(frozendict.frozendict, tuple):
pass
class FrozenDictStrMixin(frozendict.frozendict, str):
pass
class FrozenDictDecimalMixin(frozendict.frozendict, decimal.Decimal):
pass
class FrozenDictBoolMixin(frozendict.frozendict, BoolClass):
pass
class TupleStrMixin(tuple, str):
pass
class TupleDecimalMixin(tuple, decimal.Decimal):
pass
class TupleBoolMixin(tuple, BoolClass):
pass
class StrDecimalMixin(str, decimal.Decimal):
pass
class StrBoolMixin(str, BoolClass):
pass
class DecimalBoolMixin(decimal.Decimal, BoolClass):
pass
# qty 3
class NoneFrozenDictTupleMixin(NoneClass, frozendict.frozendict, tuple):
pass
class NoneFrozenDictStrMixin(NoneClass, frozendict.frozendict, str):
pass
class NoneFrozenDictDecimalMixin(NoneClass, frozendict.frozendict, decimal.Decimal):
pass
class NoneFrozenDictBoolMixin(NoneClass, frozendict.frozendict, BoolClass):
pass
class NoneTupleStrMixin(NoneClass, tuple, str):
pass
class NoneTupleDecimalMixin(NoneClass, tuple, decimal.Decimal):
pass
class NoneTupleBoolMixin(NoneClass, tuple, BoolClass):
pass
class NoneStrDecimalMixin(NoneClass, str, decimal.Decimal):
pass
class NoneStrBoolMixin(NoneClass, str, BoolClass):
pass
class NoneDecimalBoolMixin(NoneClass, decimal.Decimal, BoolClass):
pass
class FrozenDictTupleStrMixin(frozendict.frozendict, tuple, str):
pass
class FrozenDictTupleDecimalMixin(frozendict.frozendict, tuple, decimal.Decimal):
pass
class FrozenDictTupleBoolMixin(frozendict.frozendict, tuple, BoolClass):
pass
class FrozenDictStrDecimalMixin(frozendict.frozendict, str, decimal.Decimal):
pass
class FrozenDictStrBoolMixin(frozendict.frozendict, str, BoolClass):
pass
class FrozenDictDecimalBoolMixin(frozendict.frozendict, decimal.Decimal, BoolClass):
pass
class TupleStrDecimalMixin(tuple, str, decimal.Decimal):
pass
class TupleStrBoolMixin(tuple, str, BoolClass):
pass
class TupleDecimalBoolMixin(tuple, decimal.Decimal, BoolClass):
pass
class StrDecimalBoolMixin(str, decimal.Decimal, BoolClass):
pass
# qty 4
class NoneFrozenDictTupleStrMixin(NoneClass, frozendict.frozendict, tuple, str):
pass
class NoneFrozenDictTupleDecimalMixin(NoneClass, frozendict.frozendict, tuple, decimal.Decimal):
pass
class NoneFrozenDictTupleBoolMixin(NoneClass, frozendict.frozendict, tuple, BoolClass):
pass
class NoneFrozenDictStrDecimalMixin(NoneClass, frozendict.frozendict, str, decimal.Decimal):
pass
class NoneFrozenDictStrBoolMixin(NoneClass, frozendict.frozendict, str, BoolClass):
pass
class NoneFrozenDictDecimalBoolMixin(NoneClass, frozendict.frozendict, decimal.Decimal, BoolClass):
pass
class NoneTupleStrDecimalMixin(NoneClass, tuple, str, decimal.Decimal):
pass
class NoneTupleStrBoolMixin(NoneClass, tuple, str, BoolClass):
pass
class NoneTupleDecimalBoolMixin(NoneClass, tuple, decimal.Decimal, BoolClass):
pass
class NoneStrDecimalBoolMixin(NoneClass, str, decimal.Decimal, BoolClass):
pass
class FrozenDictTupleStrDecimalMixin(frozendict.frozendict, tuple, str, decimal.Decimal):
pass
class FrozenDictTupleStrBoolMixin(frozendict.frozendict, tuple, str, BoolClass):
pass
class FrozenDictTupleDecimalBoolMixin(frozendict.frozendict, tuple, decimal.Decimal, BoolClass):
pass
class FrozenDictStrDecimalBoolMixin(frozendict.frozendict, str, decimal.Decimal, BoolClass):
pass
class TupleStrDecimalBoolMixin(tuple, str, decimal.Decimal, BoolClass):
pass
# qty 5
class NoneFrozenDictTupleStrDecimalMixin(NoneClass, frozendict.frozendict, tuple, str, decimal.Decimal):
pass
class NoneFrozenDictTupleStrBoolMixin(NoneClass, frozendict.frozendict, tuple, str, BoolClass):
pass
class NoneFrozenDictTupleDecimalBoolMixin(NoneClass, frozendict.frozendict, tuple, decimal.Decimal, BoolClass):
pass
class NoneFrozenDictStrDecimalBoolMixin(NoneClass, frozendict.frozendict, str, decimal.Decimal, BoolClass):
pass
class NoneTupleStrDecimalBoolMixin(NoneClass, tuple, str, decimal.Decimal, BoolClass):
pass
class FrozenDictTupleStrDecimalBoolMixin(frozendict.frozendict, tuple, str, decimal.Decimal, BoolClass):
pass
# qty 6
class NoneFrozenDictTupleStrDecimalBoolMixin(NoneClass, frozendict.frozendict, tuple, str, decimal.Decimal, BoolClass):
pass
# qty 8
class NoneFrozenDictTupleStrDecimalBoolFileBytesMixin(NoneClass, frozendict.frozendict, tuple, str, decimal.Decimal, BoolClass, FileIO, bytes):
pass
else:
# qty 1
class NoneMixin:
_types = {NoneClass}
class FrozenDictMixin:
_types = {frozendict.frozendict}
class TupleMixin:
_types = {tuple}
class StrMixin:
_types = {str}
class DecimalMixin:
_types = {decimal.Decimal}
class BoolMixin:
_types = {BoolClass}
class BytesMixin:
_types = {bytes}
class FileMixin:
_types = {FileIO}
# qty 2
class BinaryMixin:
_types = {bytes, FileIO}
class NoneFrozenDictMixin:
_types = {NoneClass, frozendict.frozendict}
class NoneTupleMixin:
_types = {NoneClass, tuple}
class NoneStrMixin:
_types = {NoneClass, str}
class NoneDecimalMixin:
_types = {NoneClass, decimal.Decimal}
class NoneBoolMixin:
_types = {NoneClass, BoolClass}
class FrozenDictTupleMixin:
_types = {frozendict.frozendict, tuple}
class FrozenDictStrMixin:
_types = {frozendict.frozendict, str}
class FrozenDictDecimalMixin:
_types = {frozendict.frozendict, decimal.Decimal}
class FrozenDictBoolMixin:
_types = {frozendict.frozendict, BoolClass}
class TupleStrMixin:
_types = {tuple, str}
class TupleDecimalMixin:
_types = {tuple, decimal.Decimal}
class TupleBoolMixin:
_types = {tuple, BoolClass}
class StrDecimalMixin:
_types = {str, decimal.Decimal}
class StrBoolMixin:
_types = {str, BoolClass}
class DecimalBoolMixin:
_types = {decimal.Decimal, BoolClass}
# qty 3
class NoneFrozenDictTupleMixin:
_types = {NoneClass, frozendict.frozendict, tuple}
class NoneFrozenDictStrMixin:
_types = {NoneClass, frozendict.frozendict, str}
class NoneFrozenDictDecimalMixin:
_types = {NoneClass, frozendict.frozendict, decimal.Decimal}
class NoneFrozenDictBoolMixin:
_types = {NoneClass, frozendict.frozendict, BoolClass}
class NoneTupleStrMixin:
_types = {NoneClass, tuple, str}
class NoneTupleDecimalMixin:
_types = {NoneClass, tuple, decimal.Decimal}
class NoneTupleBoolMixin:
_types = {NoneClass, tuple, BoolClass}
class NoneStrDecimalMixin:
_types = {NoneClass, str, decimal.Decimal}
class NoneStrBoolMixin:
_types = {NoneClass, str, BoolClass}
class NoneDecimalBoolMixin:
_types = {NoneClass, decimal.Decimal, BoolClass}
class FrozenDictTupleStrMixin:
_types = {frozendict.frozendict, tuple, str}
class FrozenDictTupleDecimalMixin:
_types = {frozendict.frozendict, tuple, decimal.Decimal}
class FrozenDictTupleBoolMixin:
_types = {frozendict.frozendict, tuple, BoolClass}
class FrozenDictStrDecimalMixin:
_types = {frozendict.frozendict, str, decimal.Decimal}
class FrozenDictStrBoolMixin:
_types = {frozendict.frozendict, str, BoolClass}
class FrozenDictDecimalBoolMixin:
_types = {frozendict.frozendict, decimal.Decimal, BoolClass}
class TupleStrDecimalMixin:
_types = {tuple, str, decimal.Decimal}
class TupleStrBoolMixin:
_types = {tuple, str, BoolClass}
class TupleDecimalBoolMixin:
_types = {tuple, decimal.Decimal, BoolClass}
class StrDecimalBoolMixin:
_types = {str, decimal.Decimal, BoolClass}
# qty 4
class NoneFrozenDictTupleStrMixin:
_types = {NoneClass, frozendict.frozendict, tuple, str}
class NoneFrozenDictTupleDecimalMixin:
_types = {NoneClass, frozendict.frozendict, tuple, decimal.Decimal}
class NoneFrozenDictTupleBoolMixin:
_types = {NoneClass, frozendict.frozendict, tuple, BoolClass}
class NoneFrozenDictStrDecimalMixin:
_types = {NoneClass, frozendict.frozendict, str, decimal.Decimal}
class NoneFrozenDictStrBoolMixin:
_types = {NoneClass, frozendict.frozendict, str, BoolClass}
class NoneFrozenDictDecimalBoolMixin:
_types = {NoneClass, frozendict.frozendict, decimal.Decimal, BoolClass}
class NoneTupleStrDecimalMixin:
_types = {NoneClass, tuple, str, decimal.Decimal}
class NoneTupleStrBoolMixin:
_types = {NoneClass, tuple, str, BoolClass}
class NoneTupleDecimalBoolMixin:
_types = {NoneClass, tuple, decimal.Decimal, BoolClass}
class NoneStrDecimalBoolMixin:
_types = {NoneClass, str, decimal.Decimal, BoolClass}
class FrozenDictTupleStrDecimalMixin:
_types = {frozendict.frozendict, tuple, str, decimal.Decimal}
class FrozenDictTupleStrBoolMixin:
_types = {frozendict.frozendict, tuple, str, BoolClass}
class FrozenDictTupleDecimalBoolMixin:
_types = {frozendict.frozendict, tuple, decimal.Decimal, BoolClass}
class FrozenDictStrDecimalBoolMixin:
_types = {frozendict.frozendict, str, decimal.Decimal, BoolClass}
class TupleStrDecimalBoolMixin:
_types = {tuple, str, decimal.Decimal, BoolClass}
# qty 5
class NoneFrozenDictTupleStrDecimalMixin:
_types = {NoneClass, frozendict.frozendict, tuple, str, decimal.Decimal}
class NoneFrozenDictTupleStrBoolMixin:
_types = {NoneClass, frozendict.frozendict, tuple, str, BoolClass}
class NoneFrozenDictTupleDecimalBoolMixin:
_types = {NoneClass, frozendict.frozendict, tuple, decimal.Decimal, BoolClass}
class NoneFrozenDictStrDecimalBoolMixin:
_types = {NoneClass, frozendict.frozendict, str, decimal.Decimal, BoolClass}
class NoneTupleStrDecimalBoolMixin:
_types = {NoneClass, tuple, str, decimal.Decimal, BoolClass}
class FrozenDictTupleStrDecimalBoolMixin:
_types = {frozendict.frozendict, tuple, str, decimal.Decimal, BoolClass}
# qty 6
class NoneFrozenDictTupleStrDecimalBoolMixin:
_types = {NoneClass, frozendict.frozendict, tuple, str, decimal.Decimal, BoolClass}
# qty 8
class NoneFrozenDictTupleStrDecimalBoolFileBytesMixin:
_types = {NoneClass, frozendict.frozendict, tuple, str, decimal.Decimal, BoolClass, FileIO, bytes}
class ValidatorBase:
@staticmethod
def _is_json_validation_enabled_oapg(schema_keyword, configuration=None):
\\"\\"\\"Returns true if JSON schema validation is enabled for the specified
validation keyword. This can be used to skip JSON schema structural validation
as requested in the configuration.
Note: the suffix _oapg stands for openapi python (experimental) generator and
it has been added to prevent collisions with other methods and properties
Args:
schema_keyword (string): the name of a JSON schema validation keyword.
configuration (Configuration): the configuration class.
\\"\\"\\"
return (configuration is None or
not hasattr(configuration, '_disabled_client_side_validations') or
schema_keyword not in configuration._disabled_client_side_validations)
@staticmethod
def _raise_validation_error_message_oapg(value, constraint_msg, constraint_value, path_to_item, additional_txt=\\"\\"):
raise ApiValueError(
\\"Invalid value \`{value}\`, {constraint_msg} \`{constraint_value}\`{additional_txt} at {path_to_item}\\".format(
value=value,
constraint_msg=constraint_msg,
constraint_value=constraint_value,
additional_txt=additional_txt,
path_to_item=path_to_item,
)
)
class EnumBase:
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict.frozendict, tuple]]]:
\\"\\"\\"
EnumBase _validate_oapg
Validates that arg is in the enum's allowed values
\\"\\"\\"
try:
cls.MetaOapg.enum_value_to_name[arg]
except KeyError:
raise ApiValueError(\\"Invalid value {} passed in to {}, allowed_values={}\\".format(arg, cls, cls.MetaOapg.enum_value_to_name.keys()))
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class BoolBase:
def is_true_oapg(self) -> bool:
\\"\\"\\"
A replacement for x is True
True if the instance is a BoolClass True Singleton
\\"\\"\\"
if not issubclass(self.__class__, BoolClass):
return False
return bool(self)
def is_false_oapg(self) -> bool:
\\"\\"\\"
A replacement for x is False
True if the instance is a BoolClass False Singleton
\\"\\"\\"
if not issubclass(self.__class__, BoolClass):
return False
return bool(self) is False
class NoneBase:
def is_none_oapg(self) -> bool:
\\"\\"\\"
A replacement for x is None
True if the instance is a NoneClass None Singleton
\\"\\"\\"
if issubclass(self.__class__, NoneClass):
return True
return False
class StrBase(ValidatorBase):
MetaOapg: MetaOapgTyped
@property
def as_str_oapg(self) -> str:
return self
@property
def as_date_oapg(self) -> date:
raise Exception('not implemented')
@property
def as_datetime_oapg(self) -> datetime:
raise Exception('not implemented')
@property
def as_decimal_oapg(self) -> decimal.Decimal:
raise Exception('not implemented')
@property
def as_uuid_oapg(self) -> uuid.UUID:
raise Exception('not implemented')
@classmethod
def __check_str_validations(
cls,
arg: str,
validation_metadata: ValidationMetadata
):
if not hasattr(cls, 'MetaOapg'):
return
if (cls._is_json_validation_enabled_oapg('maxLength', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'max_length') and
len(arg) > cls.MetaOapg.max_length):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"length must be less than or equal to\\",
constraint_value=cls.MetaOapg.max_length,
path_to_item=validation_metadata.path_to_item
)
if (cls._is_json_validation_enabled_oapg('minLength', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'min_length') and
len(arg) < cls.MetaOapg.min_length):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"length must be greater than or equal to\\",
constraint_value=cls.MetaOapg.min_length,
path_to_item=validation_metadata.path_to_item
)
if (cls._is_json_validation_enabled_oapg('pattern', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'regex')):
for regex_dict in cls.MetaOapg.regex:
flags = regex_dict.get('flags', 0)
if not re.search(regex_dict['pattern'], arg, flags=flags):
if flags != 0:
# Don't print the regex flags if the flags are not
# specified in the OAS document.
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"must match regular expression\\",
constraint_value=regex_dict['pattern'],
path_to_item=validation_metadata.path_to_item,
additional_txt=\\" with flags=\`{}\`\\".format(flags)
)
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"must match regular expression\\",
constraint_value=regex_dict['pattern'],
path_to_item=validation_metadata.path_to_item
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict.frozendict, tuple]]]:
\\"\\"\\"
StrBase _validate_oapg
Validates that validations pass
\\"\\"\\"
if isinstance(arg, str):
cls.__check_str_validations(arg, validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class UUIDBase:
@property
@functools.lru_cache()
def as_uuid_oapg(self) -> uuid.UUID:
return uuid.UUID(self)
@classmethod
def __validate_format(cls, arg: typing.Optional[str], validation_metadata: ValidationMetadata):
if isinstance(arg, str):
try:
uuid.UUID(arg)
return True
except ValueError:
raise ApiValueError(
\\"Invalid value '{}' for type UUID at {}\\".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: typing.Optional[ValidationMetadata] = None,
):
\\"\\"\\"
UUIDBase _validate_oapg
\\"\\"\\"
cls.__validate_format(arg, validation_metadata=validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class CustomIsoparser(isoparser):
@_takes_ascii
def parse_isodatetime(self, dt_str):
components, pos = self._parse_isodate(dt_str)
if len(dt_str) > pos:
if self._sep is None or dt_str[pos:pos + 1] == self._sep:
components += self._parse_isotime(dt_str[pos + 1:])
else:
raise ValueError('String contains unknown ISO components')
if len(components) > 3 and components[3] == 24:
components[3] = 0
return datetime(*components) + timedelta(days=1)
if len(components) <= 3:
raise ValueError('Value is not a datetime')
return datetime(*components)
@_takes_ascii
def parse_isodate(self, datestr):
components, pos = self._parse_isodate(datestr)
if len(datestr) > pos:
raise ValueError('String contains invalid time components')
if len(components) > 3:
raise ValueError('String contains invalid time components')
return date(*components)
DEFAULT_ISOPARSER = CustomIsoparser()
class DateBase:
@property
@functools.lru_cache()
def as_date_oapg(self) -> date:
return DEFAULT_ISOPARSER.parse_isodate(self)
@classmethod
def __validate_format(cls, arg: typing.Optional[str], validation_metadata: ValidationMetadata):
if isinstance(arg, str):
try:
DEFAULT_ISOPARSER.parse_isodate(arg)
return True
except ValueError:
raise ApiValueError(
\\"Value does not conform to the required ISO-8601 date format. \\"
\\"Invalid value '{}' for type date at {}\\".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: typing.Optional[ValidationMetadata] = None,
):
\\"\\"\\"
DateBase _validate_oapg
\\"\\"\\"
cls.__validate_format(arg, validation_metadata=validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class DateTimeBase:
@property
@functools.lru_cache()
def as_datetime_oapg(self) -> datetime:
return DEFAULT_ISOPARSER.parse_isodatetime(self)
@classmethod
def __validate_format(cls, arg: typing.Optional[str], validation_metadata: ValidationMetadata):
if isinstance(arg, str):
try:
DEFAULT_ISOPARSER.parse_isodatetime(arg)
return True
except ValueError:
raise ApiValueError(
\\"Value does not conform to the required ISO-8601 datetime format. \\"
\\"Invalid value '{}' for type datetime at {}\\".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
):
\\"\\"\\"
DateTimeBase _validate_oapg
\\"\\"\\"
cls.__validate_format(arg, validation_metadata=validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class DecimalBase:
\\"\\"\\"
A class for storing decimals that are sent over the wire as strings
These schemas must remain based on StrBase rather than NumberBase
because picking base classes must be deterministic
\\"\\"\\"
@property
@functools.lru_cache()
def as_decimal_oapg(self) -> decimal.Decimal:
return decimal.Decimal(self)
@classmethod
def __validate_format(cls, arg: typing.Optional[str], validation_metadata: ValidationMetadata):
if isinstance(arg, str):
try:
decimal.Decimal(arg)
return True
except decimal.InvalidOperation:
raise ApiValueError(
\\"Value cannot be converted to a decimal. \\"
\\"Invalid value '{}' for type decimal at {}\\".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
):
\\"\\"\\"
DecimalBase _validate_oapg
\\"\\"\\"
cls.__validate_format(arg, validation_metadata=validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class NumberBase(ValidatorBase):
MetaOapg: MetaOapgTyped
@property
def as_int_oapg(self) -> int:
try:
return self._as_int
except AttributeError:
\\"\\"\\"
Note: for some numbers like 9.0 they could be represented as an
integer but our code chooses to store them as
>>> Decimal('9.0').as_tuple()
DecimalTuple(sign=0, digits=(9, 0), exponent=-1)
so we can tell that the value came from a float and convert it back to a float
during later serialization
\\"\\"\\"
if self.as_tuple().exponent < 0:
# this could be represented as an integer but should be represented as a float
# because that's what it was serialized from
raise ApiValueError(f'{self} is not an integer')
self._as_int = int(self)
return self._as_int
@property
def as_float_oapg(self) -> float:
try:
return self._as_float
except AttributeError:
if self.as_tuple().exponent >= 0:
raise ApiValueError(f'{self} is not a float')
self._as_float = float(self)
return self._as_float
@classmethod
def __check_numeric_validations(
cls,
arg,
validation_metadata: ValidationMetadata
):
if not hasattr(cls, 'MetaOapg'):
return
if cls._is_json_validation_enabled_oapg('multipleOf',
validation_metadata.configuration) and hasattr(cls.MetaOapg, 'multiple_of'):
multiple_of_value = cls.MetaOapg.multiple_of
if (not (float(arg) / multiple_of_value).is_integer()):
# Note 'multipleOf' will be as good as the floating point arithmetic.
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"value must be a multiple of\\",
constraint_value=multiple_of_value,
path_to_item=validation_metadata.path_to_item
)
checking_max_or_min_values = any(
hasattr(cls.MetaOapg, validation_key) for validation_key in {
'exclusive_maximum',
'inclusive_maximum',
'exclusive_minimum',
'inclusive_minimum',
}
)
if not checking_max_or_min_values:
return
if (cls._is_json_validation_enabled_oapg('exclusiveMaximum', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'exclusive_maximum') and
arg >= cls.MetaOapg.exclusive_maximum):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"must be a value less than\\",
constraint_value=cls.MetaOapg.exclusive_maximum,
path_to_item=validation_metadata.path_to_item
)
if (cls._is_json_validation_enabled_oapg('maximum', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'inclusive_maximum') and
arg > cls.MetaOapg.inclusive_maximum):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"must be a value less than or equal to\\",
constraint_value=cls.MetaOapg.inclusive_maximum,
path_to_item=validation_metadata.path_to_item
)
if (cls._is_json_validation_enabled_oapg('exclusiveMinimum', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'exclusive_minimum') and
arg <= cls.MetaOapg.exclusive_minimum):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"must be a value greater than\\",
constraint_value=cls.MetaOapg.exclusive_maximum,
path_to_item=validation_metadata.path_to_item
)
if (cls._is_json_validation_enabled_oapg('minimum', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'inclusive_minimum') and
arg < cls.MetaOapg.inclusive_minimum):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"must be a value greater than or equal to\\",
constraint_value=cls.MetaOapg.inclusive_minimum,
path_to_item=validation_metadata.path_to_item
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict.frozendict, tuple]]]:
\\"\\"\\"
NumberBase _validate_oapg
Validates that validations pass
\\"\\"\\"
if isinstance(arg, decimal.Decimal):
cls.__check_numeric_validations(arg, validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class ListBase(ValidatorBase):
MetaOapg: MetaOapgTyped
@classmethod
def __validate_items(cls, list_items, validation_metadata: ValidationMetadata):
\\"\\"\\"
Ensures that:
- values passed in for items are valid
Exceptions will be raised if:
- invalid arguments were passed in
Args:
list_items: the input list of items
Raises:
ApiTypeError - for missing required arguments, or for invalid properties
\\"\\"\\"
# if we have definitions for an items schema, use it
# otherwise accept anything
item_cls = getattr(cls.MetaOapg, 'items', UnsetAnyTypeSchema)
item_cls = cls._get_class_oapg(item_cls)
path_to_schemas = {}
for i, value in enumerate(list_items):
item_validation_metadata = ValidationMetadata(
from_server=validation_metadata.from_server,
configuration=validation_metadata.configuration,
path_to_item=validation_metadata.path_to_item+(i,),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if item_validation_metadata.validation_ran_earlier(item_cls):
add_deeper_validated_schemas(item_validation_metadata, path_to_schemas)
continue
other_path_to_schemas = item_cls._validate_oapg(
value, validation_metadata=item_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
@classmethod
def __check_tuple_validations(
cls, arg,
validation_metadata: ValidationMetadata):
if not hasattr(cls, 'MetaOapg'):
return
if (cls._is_json_validation_enabled_oapg('maxItems', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'max_items') and
len(arg) > cls.MetaOapg.max_items):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"number of items must be less than or equal to\\",
constraint_value=cls.MetaOapg.max_items,
path_to_item=validation_metadata.path_to_item
)
if (cls._is_json_validation_enabled_oapg('minItems', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'min_items') and
len(arg) < cls.MetaOapg.min_items):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"number of items must be greater than or equal to\\",
constraint_value=cls.MetaOapg.min_items,
path_to_item=validation_metadata.path_to_item
)
if (cls._is_json_validation_enabled_oapg('uniqueItems', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'unique_items') and cls.MetaOapg.unique_items and arg):
unique_items = set(arg)
if len(arg) > len(unique_items):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"duplicate items were found, and the tuple must not contain duplicates because\\",
constraint_value='unique_items==True',
path_to_item=validation_metadata.path_to_item
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
):
\\"\\"\\"
ListBase _validate_oapg
We return dynamic classes of different bases depending upon the inputs
This makes it so:
- the returned instance is always a subclass of our defining schema
- this allows us to check type based on whether an instance is a subclass of a schema
- the returned instance is a serializable type (except for None, True, and False) which are enums
Returns:
new_cls (type): the new class
Raises:
ApiValueError: when a string can't be converted into a date or datetime and it must be one of those classes
ApiTypeError: when the input type is not in the list of allowed spec types
\\"\\"\\"
if isinstance(arg, tuple):
cls.__check_tuple_validations(arg, validation_metadata)
_path_to_schemas = super()._validate_oapg(arg, validation_metadata=validation_metadata)
if not isinstance(arg, tuple):
return _path_to_schemas
updated_vm = ValidationMetadata(
configuration=validation_metadata.configuration,
from_server=validation_metadata.from_server,
path_to_item=validation_metadata.path_to_item,
seen_classes=validation_metadata.seen_classes | frozenset({cls}),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
other_path_to_schemas = cls.__validate_items(arg, validation_metadata=updated_vm)
update(_path_to_schemas, other_path_to_schemas)
return _path_to_schemas
@classmethod
def _get_items_oapg(
cls: 'Schema',
arg: typing.List[typing.Any],
path_to_item: typing.Tuple[typing.Union[str, int], ...],
path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Type['Schema']]
):
'''
ListBase _get_items_oapg
'''
cast_items = []
for i, value in enumerate(arg):
item_path_to_item = path_to_item + (i,)
item_cls = path_to_schemas[item_path_to_item]
new_value = item_cls._get_new_instance_without_conversion_oapg(
value,
item_path_to_item,
path_to_schemas
)
cast_items.append(new_value)
return cast_items
class Discriminable:
MetaOapg: MetaOapgTyped
@classmethod
def _ensure_discriminator_value_present_oapg(cls, disc_property_name: str, validation_metadata: ValidationMetadata, *args):
if not args or args and disc_property_name not in args[0]:
# The input data does not contain the discriminator property
raise ApiValueError(
\\"Cannot deserialize input data due to missing discriminator. \\"
\\"The discriminator property '{}' is missing at path: {}\\".format(disc_property_name, validation_metadata.path_to_item)
)
@classmethod
def get_discriminated_class_oapg(cls, disc_property_name: str, disc_payload_value: str):
\\"\\"\\"
Used in schemas with discriminators
\\"\\"\\"
if not hasattr(cls.MetaOapg, 'discriminator'):
return None
disc = cls.MetaOapg.discriminator()
if disc_property_name not in disc:
return None
discriminated_cls = disc[disc_property_name].get(disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
if not hasattr(cls, 'MetaOapg'):
return None
elif not (
hasattr(cls.MetaOapg, 'all_of') or
hasattr(cls.MetaOapg, 'one_of') or
hasattr(cls.MetaOapg, 'any_of')
):
return None
# TODO stop traveling if a cycle is hit
if hasattr(cls.MetaOapg, 'all_of'):
for allof_cls in cls.MetaOapg.all_of():
discriminated_cls = allof_cls.get_discriminated_class_oapg(
disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
if hasattr(cls.MetaOapg, 'one_of'):
for oneof_cls in cls.MetaOapg.one_of():
discriminated_cls = oneof_cls.get_discriminated_class_oapg(
disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
if hasattr(cls.MetaOapg, 'any_of'):
for anyof_cls in cls.MetaOapg.any_of():
discriminated_cls = anyof_cls.get_discriminated_class_oapg(
disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
return None
class DictBase(Discriminable, ValidatorBase):
@classmethod
def __validate_arg_presence(cls, arg):
\\"\\"\\"
Ensures that:
- all required arguments are passed in
- the input variable names are valid
- present in properties or
- accepted because additionalProperties exists
Exceptions will be raised if:
- invalid arguments were passed in
- a var_name is invalid if additional_properties == NotAnyTypeSchema
and var_name not in properties.__annotations__
- required properties were not passed in
Args:
arg: the input dict
Raises:
ApiTypeError - for missing required arguments, or for invalid properties
\\"\\"\\"
seen_required_properties = set()
invalid_arguments = []
required_property_names = getattr(cls.MetaOapg, 'required', set())
additional_properties = getattr(cls.MetaOapg, 'additional_properties', UnsetAnyTypeSchema)
properties = getattr(cls.MetaOapg, 'properties', {})
property_annotations = getattr(properties, '__annotations__', {})
for property_name in arg:
if property_name in required_property_names:
seen_required_properties.add(property_name)
elif property_name in property_annotations:
continue
elif additional_properties is not NotAnyTypeSchema:
continue
else:
invalid_arguments.append(property_name)
missing_required_arguments = list(required_property_names - seen_required_properties)
if missing_required_arguments:
missing_required_arguments.sort()
raise ApiTypeError(
\\"{} is missing {} required argument{}: {}\\".format(
cls.__name__,
len(missing_required_arguments),
\\"s\\" if len(missing_required_arguments) > 1 else \\"\\",
missing_required_arguments
)
)
if invalid_arguments:
invalid_arguments.sort()
raise ApiTypeError(
\\"{} was passed {} invalid argument{}: {}\\".format(
cls.__name__,
len(invalid_arguments),
\\"s\\" if len(invalid_arguments) > 1 else \\"\\",
invalid_arguments
)
)
@classmethod
def __validate_args(cls, arg, validation_metadata: ValidationMetadata):
\\"\\"\\"
Ensures that:
- values passed in for properties are valid
Exceptions will be raised if:
- invalid arguments were passed in
Args:
arg: the input dict
Raises:
ApiTypeError - for missing required arguments, or for invalid properties
\\"\\"\\"
path_to_schemas = {}
additional_properties = getattr(cls.MetaOapg, 'additional_properties', UnsetAnyTypeSchema)
properties = getattr(cls.MetaOapg, 'properties', {})
property_annotations = getattr(properties, '__annotations__', {})
for property_name, value in arg.items():
path_to_item = validation_metadata.path_to_item+(property_name,)
if property_name in property_annotations:
schema = property_annotations[property_name]
elif additional_properties is not NotAnyTypeSchema:
if additional_properties is UnsetAnyTypeSchema:
\\"\\"\\"
If additionalProperties is unset and this path_to_item does not yet have
any validations on it, validate it.
If it already has validations on it, skip this validation.
\\"\\"\\"
if path_to_item in path_to_schemas:
continue
schema = additional_properties
else:
raise ApiTypeError('Unable to find schema for value={} in class={} at path_to_item={}'.format(
value, cls, validation_metadata.path_to_item+(property_name,)
))
schema = cls._get_class_oapg(schema)
arg_validation_metadata = ValidationMetadata(
from_server=validation_metadata.from_server,
configuration=validation_metadata.configuration,
path_to_item=path_to_item,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if arg_validation_metadata.validation_ran_earlier(schema):
add_deeper_validated_schemas(arg_validation_metadata, path_to_schemas)
continue
other_path_to_schemas = schema._validate_oapg(value, validation_metadata=arg_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
@classmethod
def __check_dict_validations(
cls,
arg,
validation_metadata: ValidationMetadata
):
if not hasattr(cls, 'MetaOapg'):
return
if (cls._is_json_validation_enabled_oapg('maxProperties', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'max_properties') and
len(arg) > cls.MetaOapg.max_properties):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"number of properties must be less than or equal to\\",
constraint_value=cls.MetaOapg.max_properties,
path_to_item=validation_metadata.path_to_item
)
if (cls._is_json_validation_enabled_oapg('minProperties', validation_metadata.configuration) and
hasattr(cls.MetaOapg, 'min_properties') and
len(arg) < cls.MetaOapg.min_properties):
cls._raise_validation_error_message_oapg(
value=arg,
constraint_msg=\\"number of properties must be greater than or equal to\\",
constraint_value=cls.MetaOapg.min_properties,
path_to_item=validation_metadata.path_to_item
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
):
\\"\\"\\"
DictBase _validate_oapg
We return dynamic classes of different bases depending upon the inputs
This makes it so:
- the returned instance is always a subclass of our defining schema
- this allows us to check type based on whether an instance is a subclass of a schema
- the returned instance is a serializable type (except for None, True, and False) which are enums
Returns:
new_cls (type): the new class
Raises:
ApiValueError: when a string can't be converted into a date or datetime and it must be one of those classes
ApiTypeError: when the input type is not in the list of allowed spec types
\\"\\"\\"
if isinstance(arg, frozendict.frozendict):
cls.__check_dict_validations(arg, validation_metadata)
_path_to_schemas = super()._validate_oapg(arg, validation_metadata=validation_metadata)
if not isinstance(arg, frozendict.frozendict):
return _path_to_schemas
cls.__validate_arg_presence(arg)
other_path_to_schemas = cls.__validate_args(arg, validation_metadata=validation_metadata)
update(_path_to_schemas, other_path_to_schemas)
try:
discriminator = cls.MetaOapg.discriminator()
except AttributeError:
return _path_to_schemas
# discriminator exists
disc_prop_name = list(discriminator.keys())[0]
cls._ensure_discriminator_value_present_oapg(disc_prop_name, validation_metadata, arg)
discriminated_cls = cls.get_discriminated_class_oapg(
disc_property_name=disc_prop_name, disc_payload_value=arg[disc_prop_name])
if discriminated_cls is None:
raise ApiValueError(
\\"Invalid discriminator value was passed in to {}.{} Only the values {} are allowed at {}\\".format(
cls.__name__,
disc_prop_name,
list(discriminator[disc_prop_name].keys()),
validation_metadata.path_to_item + (disc_prop_name,)
)
)
updated_vm = ValidationMetadata(
configuration=validation_metadata.configuration,
from_server=validation_metadata.from_server,
path_to_item=validation_metadata.path_to_item,
seen_classes=validation_metadata.seen_classes | frozenset({cls}),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if updated_vm.validation_ran_earlier(discriminated_cls):
add_deeper_validated_schemas(updated_vm, _path_to_schemas)
return _path_to_schemas
other_path_to_schemas = discriminated_cls._validate_oapg(arg, validation_metadata=updated_vm)
update(_path_to_schemas, other_path_to_schemas)
return _path_to_schemas
@classmethod
def _get_properties_oapg(
cls,
arg: typing.Dict[str, typing.Any],
path_to_item: typing.Tuple[typing.Union[str, int], ...],
path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Type['Schema']]
):
\\"\\"\\"
DictBase _get_properties_oapg, this is how properties are set
These values already passed validation
\\"\\"\\"
dict_items = {}
for property_name_js, value in arg.items():
property_path_to_item = path_to_item + (property_name_js,)
property_cls = path_to_schemas[property_path_to_item]
new_value = property_cls._get_new_instance_without_conversion_oapg(
value,
property_path_to_item,
path_to_schemas
)
dict_items[property_name_js] = new_value
return dict_items
def __setattr__(self, name: str, value: typing.Any):
if not isinstance(self, FileIO):
raise AttributeError('property setting not supported on immutable instances')
def __getattr__(self, name: str):
\\"\\"\\"
for instance.name access
Properties are only type hinted for required properties
so that hasattr(instance, 'optionalProp') is False when that key is not present
\\"\\"\\"
if not isinstance(self, frozendict.frozendict):
return super().__getattr__(name)
if name not in self.__class__.__annotations__:
raise AttributeError(f\\"{self} has no attribute '{name}'\\")
try:
value = self[name]
return value
except KeyError as ex:
raise AttributeError(str(ex))
def __getitem__(self, name: str):
\\"\\"\\"
dict_instance[name] accessor
key errors thrown
\\"\\"\\"
if not isinstance(self, frozendict.frozendict):
return super().__getattr__(name)
return super().__getitem__(name)
def get_item_oapg(self, name: str) -> typing.Union['AnyTypeSchema', Unset]:
# dict_instance[name] accessor
if not isinstance(self, frozendict.frozendict):
raise NotImplementedError()
try:
return super().__getitem__(name)
except KeyError:
return unset
def cast_to_allowed_types(
arg: typing.Union[str, date, datetime, uuid.UUID, decimal.Decimal, int, float, None, dict, frozendict.frozendict, list, tuple, bytes, Schema, io.FileIO, io.BufferedReader],
from_server: bool,
validated_path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict.frozendict, tuple]]],
path_to_item: typing.Tuple[typing.Union[str, int], ...] = tuple(['args[0]']),
) -> typing.Union[frozendict.frozendict, tuple, decimal.Decimal, str, bytes, BoolClass, NoneClass, FileIO]:
\\"\\"\\"
Casts the input payload arg into the allowed types
The input validated_path_to_schemas is mutated by running this function
When from_server is False then
- date/datetime is cast to str
- int/float is cast to Decimal
If a Schema instance is passed in it is converted back to a primitive instance because
One may need to validate that data to the original Schema class AND additional different classes
those additional classes will need to be added to the new manufactured class for that payload
If the code didn't do this and kept the payload as a Schema instance it would fail to validate to other
Schema classes and the code wouldn't be able to mfg a new class that includes all valid schemas
TODO: store the validated schema classes in validation_metadata
Args:
arg: the payload
from_server: whether this payload came from the server or not
validated_path_to_schemas: a dict that stores the validated classes at any path location in the payload
\\"\\"\\"
if isinstance(arg, Schema):
# store the already run validations
schema_classes = set()
for cls in arg.__class__.__bases__:
if cls is Singleton:
# Skip Singleton
continue
schema_classes.add(cls)
validated_path_to_schemas[path_to_item] = schema_classes
type_error = ApiTypeError(f\\"Invalid type. Required value type is str and passed type was {type(arg)} at {path_to_item}\\")
if isinstance(arg, str):
return str(arg)
elif isinstance(arg, (dict, frozendict.frozendict)):
return frozendict.frozendict({key: cast_to_allowed_types(val, from_server, validated_path_to_schemas, path_to_item + (key,)) for key, val in arg.items()})
elif isinstance(arg, (bool, BoolClass)):
\\"\\"\\"
this check must come before isinstance(arg, (int, float))
because isinstance(True, int) is True
\\"\\"\\"
if arg:
return BoolClass.TRUE
return BoolClass.FALSE
elif isinstance(arg, int):
return decimal.Decimal(arg)
elif isinstance(arg, float):
decimal_from_float = decimal.Decimal(arg)
if decimal_from_float.as_integer_ratio()[1] == 1:
# 9.0 -> Decimal('9.0')
# 3.4028234663852886e+38 -> Decimal('340282346638528859811704183484516925440.0')
return decimal.Decimal(str(decimal_from_float)+'.0')
return decimal_from_float
elif isinstance(arg, (tuple, list)):
return tuple([cast_to_allowed_types(item, from_server, validated_path_to_schemas, path_to_item + (i,)) for i, item in enumerate(arg)])
elif isinstance(arg, (none_type, NoneClass)):
return NoneClass.NONE
elif isinstance(arg, (date, datetime)):
if not from_server:
return arg.isoformat()
raise type_error
elif isinstance(arg, uuid.UUID):
if not from_server:
return str(arg)
raise type_error
elif isinstance(arg, decimal.Decimal):
return decimal.Decimal(arg)
elif isinstance(arg, bytes):
return bytes(arg)
elif isinstance(arg, (io.FileIO, io.BufferedReader)):
return FileIO(arg)
raise ValueError('Invalid type passed in got input={} type={}'.format(arg, type(arg)))
class ComposedBase(Discriminable):
@classmethod
def __get_allof_classes(cls, arg, validation_metadata: ValidationMetadata):
path_to_schemas = defaultdict(set)
for allof_cls in cls.MetaOapg.all_of():
if validation_metadata.validation_ran_earlier(allof_cls):
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
continue
other_path_to_schemas = allof_cls._validate_oapg(arg, validation_metadata=validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
@classmethod
def __get_oneof_class(
cls,
arg,
discriminated_cls,
validation_metadata: ValidationMetadata,
):
oneof_classes = []
path_to_schemas = defaultdict(set)
for oneof_cls in cls.MetaOapg.one_of():
if oneof_cls in path_to_schemas[validation_metadata.path_to_item]:
oneof_classes.append(oneof_cls)
continue
if validation_metadata.validation_ran_earlier(oneof_cls):
oneof_classes.append(oneof_cls)
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
continue
try:
path_to_schemas = oneof_cls._validate_oapg(arg, validation_metadata=validation_metadata)
except (ApiValueError, ApiTypeError) as ex:
if discriminated_cls is not None and oneof_cls is discriminated_cls:
raise ex
continue
oneof_classes.append(oneof_cls)
if not oneof_classes:
raise ApiValueError(
\\"Invalid inputs given to generate an instance of {}. None \\"
\\"of the oneOf schemas matched the input data.\\".format(cls)
)
elif len(oneof_classes) > 1:
raise ApiValueError(
\\"Invalid inputs given to generate an instance of {}. Multiple \\"
\\"oneOf schemas {} matched the inputs, but a max of one is allowed.\\".format(cls, oneof_classes)
)
# exactly one class matches
return path_to_schemas
@classmethod
def __get_anyof_classes(
cls,
arg,
discriminated_cls,
validation_metadata: ValidationMetadata
):
anyof_classes = []
path_to_schemas = defaultdict(set)
for anyof_cls in cls.MetaOapg.any_of():
if validation_metadata.validation_ran_earlier(anyof_cls):
anyof_classes.append(anyof_cls)
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
continue
try:
other_path_to_schemas = anyof_cls._validate_oapg(arg, validation_metadata=validation_metadata)
except (ApiValueError, ApiTypeError) as ex:
if discriminated_cls is not None and anyof_cls is discriminated_cls:
raise ex
continue
anyof_classes.append(anyof_cls)
update(path_to_schemas, other_path_to_schemas)
if not anyof_classes:
raise ApiValueError(
\\"Invalid inputs given to generate an instance of {}. None \\"
\\"of the anyOf schemas matched the input data.\\".format(cls)
)
return path_to_schemas
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict.frozendict, tuple]]]:
\\"\\"\\"
ComposedBase _validate_oapg
We return dynamic classes of different bases depending upon the inputs
This makes it so:
- the returned instance is always a subclass of our defining schema
- this allows us to check type based on whether an instance is a subclass of a schema
- the returned instance is a serializable type (except for None, True, and False) which are enums
Returns:
new_cls (type): the new class
Raises:
ApiValueError: when a string can't be converted into a date or datetime and it must be one of those classes
ApiTypeError: when the input type is not in the list of allowed spec types
\\"\\"\\"
# validation checking on types, validations, and enums
path_to_schemas = super()._validate_oapg(arg, validation_metadata=validation_metadata)
updated_vm = ValidationMetadata(
configuration=validation_metadata.configuration,
from_server=validation_metadata.from_server,
path_to_item=validation_metadata.path_to_item,
seen_classes=validation_metadata.seen_classes | frozenset({cls}),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
# process composed schema
discriminator = None
if hasattr(cls, 'MetaOapg') and hasattr(cls.MetaOapg, 'discriminator'):
discriminator = cls.MetaOapg.discriminator()
discriminated_cls = None
if discriminator and arg and isinstance(arg, frozendict.frozendict):
disc_property_name = list(discriminator.keys())[0]
cls._ensure_discriminator_value_present_oapg(disc_property_name, updated_vm, arg)
# get discriminated_cls by looking at the dict in the current class
discriminated_cls = cls.get_discriminated_class_oapg(
disc_property_name=disc_property_name, disc_payload_value=arg[disc_property_name])
if discriminated_cls is None:
raise ApiValueError(
\\"Invalid discriminator value '{}' was passed in to {}.{} Only the values {} are allowed at {}\\".format(
arg[disc_property_name],
cls.__name__,
disc_property_name,
list(discriminator[disc_property_name].keys()),
updated_vm.path_to_item + (disc_property_name,)
)
)
if hasattr(cls, 'MetaOapg') and hasattr(cls.MetaOapg, 'all_of'):
other_path_to_schemas = cls.__get_allof_classes(arg, validation_metadata=updated_vm)
update(path_to_schemas, other_path_to_schemas)
if hasattr(cls, 'MetaOapg') and hasattr(cls.MetaOapg, 'one_of'):
other_path_to_schemas = cls.__get_oneof_class(
arg,
discriminated_cls=discriminated_cls,
validation_metadata=updated_vm
)
update(path_to_schemas, other_path_to_schemas)
if hasattr(cls, 'MetaOapg') and hasattr(cls.MetaOapg, 'any_of'):
other_path_to_schemas = cls.__get_anyof_classes(
arg,
discriminated_cls=discriminated_cls,
validation_metadata=updated_vm
)
update(path_to_schemas, other_path_to_schemas)
not_cls = None
if hasattr(cls, 'MetaOapg') and hasattr(cls.MetaOapg, 'not_schema'):
not_cls = cls.MetaOapg.not_schema
not_cls = cls._get_class_oapg(not_cls)
if not_cls:
other_path_to_schemas = None
not_exception = ApiValueError(
\\"Invalid value '{}' was passed in to {}. Value is invalid because it is disallowed by {}\\".format(
arg,
cls.__name__,
not_cls.__name__,
)
)
if updated_vm.validation_ran_earlier(not_cls):
raise not_exception
try:
other_path_to_schemas = not_cls._validate_oapg(arg, validation_metadata=updated_vm)
except (ApiValueError, ApiTypeError):
pass
if other_path_to_schemas:
raise not_exception
if discriminated_cls is not None and not updated_vm.validation_ran_earlier(discriminated_cls):
# TODO use an exception from this package here
add_deeper_validated_schemas(updated_vm, path_to_schemas)
assert discriminated_cls in path_to_schemas[updated_vm.path_to_item]
return path_to_schemas
# DictBase, ListBase, NumberBase, StrBase, BoolBase, NoneBase
class ComposedSchema(
ComposedBase,
DictBase,
ListBase,
NumberBase,
StrBase,
BoolBase,
NoneBase,
Schema,
NoneFrozenDictTupleStrDecimalBoolMixin
):
@classmethod
def from_openapi_data_oapg(cls, *args: typing.Any, _configuration: typing.Optional[Configuration] = None, **kwargs):
if not args:
if not kwargs:
raise ApiTypeError('{} is missing required input data in args or kwargs'.format(cls.__name__))
args = (kwargs, )
return super().from_openapi_data_oapg(args[0], _configuration=_configuration)
class ListSchema(
ListBase,
Schema,
TupleMixin
):
@classmethod
def from_openapi_data_oapg(cls, arg: typing.List[typing.Any], _configuration: typing.Optional[Configuration] = None):
return super().from_openapi_data_oapg(arg, _configuration=_configuration)
def __new__(cls, _arg: typing.Union[typing.List[typing.Any], typing.Tuple[typing.Any]], **kwargs: Configuration):
return super().__new__(cls, _arg, **kwargs)
class NoneSchema(
NoneBase,
Schema,
NoneMixin
):
@classmethod
def from_openapi_data_oapg(cls, arg: None, _configuration: typing.Optional[Configuration] = None):
return super().from_openapi_data_oapg(arg, _configuration=_configuration)
def __new__(cls, _arg: None, **kwargs: Configuration):
return super().__new__(cls, _arg, **kwargs)
class NumberSchema(
NumberBase,
Schema,
DecimalMixin
):
\\"\\"\\"
This is used for type: number with no format
Both integers AND floats are accepted
\\"\\"\\"
@classmethod
def from_openapi_data_oapg(cls, arg: typing.Union[int, float], _configuration: typing.Optional[Configuration] = None):
return super().from_openapi_data_oapg(arg, _configuration=_configuration)
def __new__(cls, _arg: typing.Union[decimal.Decimal, int, float], **kwargs: Configuration):
return super().__new__(cls, _arg, **kwargs)
class IntBase:
@property
def as_int_oapg(self) -> int:
try:
return self._as_int
except AttributeError:
self._as_int = int(self)
return self._as_int
@classmethod
def __validate_format(cls, arg: typing.Optional[decimal.Decimal], validation_metadata: ValidationMetadata):
if isinstance(arg, decimal.Decimal):
denominator = arg.as_integer_ratio()[-1]
if denominator != 1:
raise ApiValueError(
\\"Invalid value '{}' for type integer at {}\\".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
):
\\"\\"\\"
IntBase _validate_oapg
TODO what about types = (int, number) -> IntBase, NumberBase? We could drop int and keep number only
\\"\\"\\"
cls.__validate_format(arg, validation_metadata=validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class IntSchema(IntBase, NumberSchema):
@classmethod
def from_openapi_data_oapg(cls, arg: int, _configuration: typing.Optional[Configuration] = None):
return super().from_openapi_data_oapg(arg, _configuration=_configuration)
def __new__(cls, _arg: typing.Union[decimal.Decimal, int], **kwargs: Configuration):
return super().__new__(cls, _arg, **kwargs)
class Int32Base:
__inclusive_minimum = decimal.Decimal(-2147483648)
__inclusive_maximum = decimal.Decimal(2147483647)
@classmethod
def __validate_format(cls, arg: typing.Optional[decimal.Decimal], validation_metadata: ValidationMetadata):
if isinstance(arg, decimal.Decimal) and arg.as_tuple().exponent == 0:
if not cls.__inclusive_minimum <= arg <= cls.__inclusive_maximum:
raise ApiValueError(
\\"Invalid value '{}' for type int32 at {}\\".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
):
\\"\\"\\"
Int32Base _validate_oapg
\\"\\"\\"
cls.__validate_format(arg, validation_metadata=validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class Int32Schema(
Int32Base,
IntSchema
):
pass
class Int64Base:
__inclusive_minimum = decimal.Decimal(-9223372036854775808)
__inclusive_maximum = decimal.Decimal(9223372036854775807)
@classmethod
def __validate_format(cls, arg: typing.Optional[decimal.Decimal], validation_metadata: ValidationMetadata):
if isinstance(arg, decimal.Decimal) and arg.as_tuple().exponent == 0:
if not cls.__inclusive_minimum <= arg <= cls.__inclusive_maximum:
raise ApiValueError(
\\"Invalid value '{}' for type int64 at {}\\".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
):
\\"\\"\\"
Int64Base _validate_oapg
\\"\\"\\"
cls.__validate_format(arg, validation_metadata=validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class Int64Schema(
Int64Base,
IntSchema
):
pass
class Float32Base:
__inclusive_minimum = decimal.Decimal(-3.4028234663852886e+38)
__inclusive_maximum = decimal.Decimal(3.4028234663852886e+38)
@classmethod
def __validate_format(cls, arg: typing.Optional[decimal.Decimal], validation_metadata: ValidationMetadata):
if isinstance(arg, decimal.Decimal):
if not cls.__inclusive_minimum <= arg <= cls.__inclusive_maximum:
raise ApiValueError(
\\"Invalid value '{}' for type float at {}\\".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
):
\\"\\"\\"
Float32Base _validate_oapg
\\"\\"\\"
cls.__validate_format(arg, validation_metadata=validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class Float32Schema(
Float32Base,
NumberSchema
):
@classmethod
def from_openapi_data_oapg(cls, arg: float, _configuration: typing.Optional[Configuration] = None):
return super().from_openapi_data_oapg(arg, _configuration=_configuration)
class Float64Base:
__inclusive_minimum = decimal.Decimal(-1.7976931348623157E+308)
__inclusive_maximum = decimal.Decimal(1.7976931348623157E+308)
@classmethod
def __validate_format(cls, arg: typing.Optional[decimal.Decimal], validation_metadata: ValidationMetadata):
if isinstance(arg, decimal.Decimal):
if not cls.__inclusive_minimum <= arg <= cls.__inclusive_maximum:
raise ApiValueError(
\\"Invalid value '{}' for type double at {}\\".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate_oapg(
cls,
arg,
validation_metadata: ValidationMetadata,
):
\\"\\"\\"
Float64Base _validate_oapg
\\"\\"\\"
cls.__validate_format(arg, validation_metadata=validation_metadata)
return super()._validate_oapg(arg, validation_metadata=validation_metadata)
class Float64Schema(
Float64Base,
NumberSchema
):
@classmethod
def from_openapi_data_oapg(cls, arg: float, _configuration: typing.Optional[Configuration] = None):
# todo check format
return super().from_openapi_data_oapg(arg, _configuration=_configuration)
class StrSchema(
StrBase,
Schema,
StrMixin
):
\\"\\"\\"
date + datetime string types must inherit from this class
That is because one can validate a str payload as both:
- type: string (format unset)
- type: string, format: date
\\"\\"\\"
@classmethod
def from_openapi_data_oapg(cls, arg: str, _configuration: typing.Optional[Configuration] = None) -> 'StrSchema':
return super().from_openapi_data_oapg(arg, _configuration=_configuration)
def __new__(cls, _arg: typing.Union[str, date, datetime, uuid.UUID], **kwargs: Configuration):
return super().__new__(cls, _arg, **kwargs)
class UUIDSchema(UUIDBase, StrSchema):
def __new__(cls, _arg: typing.Union[str, uuid.UUID], **kwargs: Configuration):
return super().__new__(cls, _arg, **kwargs)
class DateSchema(DateBase, StrSchema):
def __new__(cls, _arg: typing.Union[str, date], **kwargs: Configuration):
return super().__new__(cls, _arg, **kwargs)
class DateTimeSchema(DateTimeBase, StrSchema):
def __new__(cls, _arg: typing.Union[str, datetime], **kwargs: Configuration):
return super().__new__(cls, _arg, **kwargs)
class DecimalSchema(DecimalBase, StrSchema):
def __new__(cls, _arg: str, **kwargs: Configuration):
\\"\\"\\"
Note: Decimals may not be passed in because cast_to_allowed_types is only invoked once for payloads
which can be simple (str) or complex (dicts or lists with nested values)
Because casting is only done once and recursively casts all values prior to validation then for a potential
client side Decimal input if Decimal was accepted as an input in DecimalSchema then one would not know
if one was using it for a StrSchema (where it should be cast to str) or one is using it for NumberSchema
where it should stay as Decimal.
\\"\\"\\"
return super().__new__(cls, _arg, **kwargs)
class BytesSchema(
Schema,
BytesMixin
):
\\"\\"\\"
this class will subclass bytes and is immutable
\\"\\"\\"
def __new__(cls, _arg: bytes, **kwargs: Configuration):
return super(Schema, cls).__new__(cls, _arg)
class FileSchema(
Schema,
FileMixin
):
\\"\\"\\"
This class is NOT immutable
Dynamic classes are built using it for example when AnyType allows in binary data
Al other schema classes ARE immutable
If one wanted to make this immutable one could make this a DictSchema with required properties:
- data = BytesSchema (which would be an immutable bytes based schema)
- file_name = StrSchema
and cast_to_allowed_types would convert bytes and file instances into dicts containing data + file_name
The downside would be that data would be stored in memory which one may not want to do for very large files
The developer is responsible for closing this file and deleting it
This class was kept as mutable:
- to allow file reading and writing to disk
- to be able to preserve file name info
\\"\\"\\"
def __new__(cls, _arg: typing.Union[io.FileIO, io.BufferedReader], **kwargs: Configuration):
return super(Schema, cls).__new__(cls, _arg)
class BinaryBase:
pass
class BinarySchema(
ComposedBase,
BinaryBase,
Schema,
BinaryMixin
):
class MetaOapg:
@staticmethod
def one_of():
return [
BytesSchema,
FileSchema,
]
def __new__(cls, _arg: typing.Union[io.FileIO, io.BufferedReader, bytes], **kwargs: Configuration):
return super().__new__(cls, _arg)
class BoolSchema(
BoolBase,
Schema,
BoolMixin
):
@classmethod
def from_openapi_data_oapg(cls, arg: bool, _configuration: typing.Optional[Configuration] = None):
return super().from_openapi_data_oapg(arg, _configuration=_configuration)
def __new__(cls, _arg: bool, **kwargs: ValidationMetadata):
return super().__new__(cls, _arg, **kwargs)
class AnyTypeSchema(
DictBase,
ListBase,
NumberBase,
StrBase,
BoolBase,
NoneBase,
Schema,
NoneFrozenDictTupleStrDecimalBoolFileBytesMixin
):
# Python representation of a schema defined as true or {}
pass
class UnsetAnyTypeSchema(AnyTypeSchema):
# Used when additionalProperties/items was not explicitly defined and a defining schema is needed
pass
class NotAnyTypeSchema(
ComposedSchema,
):
\\"\\"\\"
Python representation of a schema defined as false or {'not': {}}
Does not allow inputs in of AnyType
Note: validations on this class are never run because the code knows that no inputs will ever validate
\\"\\"\\"
class MetaOapg:
not_schema = AnyTypeSchema
def __new__(
cls,
*_args,
_configuration: typing.Optional[Configuration] = None,
) -> 'NotAnyTypeSchema':
return super().__new__(
cls,
*_args,
_configuration=_configuration,
)
class DictSchema(
DictBase,
Schema,
FrozenDictMixin
):
@classmethod
def from_openapi_data_oapg(cls, arg: typing.Dict[str, typing.Any], _configuration: typing.Optional[Configuration] = None):
return super().from_openapi_data_oapg(arg, _configuration=_configuration)
def __new__(cls, *_args: typing.Union[dict, frozendict.frozendict], **kwargs: typing.Union[dict, frozendict.frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, bytes, Schema, Unset, ValidationMetadata]):
return super().__new__(cls, *_args, **kwargs)
schema_type_classes = {NoneSchema, DictSchema, ListSchema, NumberSchema, StrSchema, BoolSchema, AnyTypeSchema}
@functools.lru_cache()
def get_new_class(
class_name: str,
bases: typing.Tuple[typing.Type[typing.Union[Schema, typing.Any]], ...]
) -> typing.Type[Schema]:
\\"\\"\\"
Returns a new class that is made with the subclass bases
\\"\\"\\"
new_cls: typing.Type[Schema] = type(class_name, bases, {})
return new_cls
LOG_CACHE_USAGE = False
def log_cache_usage(cache_fn):
if LOG_CACHE_USAGE:
print(cache_fn.__name__, cache_fn.cache_info())",
"generated/python/package.json": Object {
"//": "~~ Generated by projen. To modify, edit .projenrc.js and run \\"npx projen\\".",
"__pdk__": true,
"name": "my-api-python",
"scripts": Object {
"build": "npx projen build",
"compile": "npx projen compile",
"default": "npx projen default",
"install": "npx projen install",
"package": "npx projen package",
"post-compile": "npx projen post-compile",
"pre-compile": "npx projen pre-compile",
"test": "npx projen test",
},
"version": "0.0.0",
},
"generated/python/requirements-dev.txt": "# ~~ Generated by projen. To modify, edit .projenrc.js and run \\"npx projen\\".
",
"generated/python/requirements.txt": "certifi >= 14.5.14
frozendict ~= 2.3.4
python-dateutil ~= 2.7.0
setuptools >= 21.0.0
typing_extensions ~= 4.3.0
urllib3 ~= 1.26.7
",
"generated/python/setup.cfg": "[flake8]
max-line-length=99
",
"generated/python/setup.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
from setuptools import setup, find_packages # noqa: H301
NAME = \\"my-api-python\\"
VERSION = \\"1.0.0\\"
# To install the library, run the following
#
# python setup.py install
#
# prerequisite: setuptools
# http://pypi.python.org/pypi/setuptools
REQUIRES = [
\\"certifi >= 14.5.14\\",
\\"frozendict ~= 2.3.4\\",
\\"python-dateutil ~= 2.7.0\\",
\\"setuptools >= 21.0.0\\",
\\"typing_extensions ~= 4.3.0\\",
\\"urllib3 ~= 1.26.7\\",
]
setup(
name=NAME,
version=VERSION,
description=\\"Example API\\",
author=\\"OpenAPI Generator community\\",
author_email=\\"team@openapitools.org\\",
url=\\"\\",
keywords=[\\"OpenAPI\\", \\"OpenAPI-Generator\\", \\"Example API\\"],
python_requires=\\">=3.7\\",
install_requires=REQUIRES,
packages=find_packages(exclude=[\\"test\\", \\"tests\\"]),
include_package_data=True,
long_description=\\"\\"\\"\\\\
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
\\"\\"\\"
)
",
"generated/python/test-requirements.txt": "pytest~=4.6.7 # needed for python 3.4
pytest-cov>=2.8.1
pytest-randomly==1.2.3 # needed for python 3.4
",
"generated/python/test/__init__.py": "",
"generated/python/test/test_models/__init__.py": "",
"generated/python/test/test_models/test_api_error_response_content.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
import unittest
import my_api_python
from my_api_python.model.api_error_response_content import ApiErrorResponseContent
from my_api_python import configuration
class TestApiErrorResponseContent(unittest.TestCase):
\\"\\"\\"ApiErrorResponseContent unit test stubs\\"\\"\\"
_configuration = configuration.Configuration()
if __name__ == '__main__':
unittest.main()
",
"generated/python/test/test_models/test_say_hello_response_content.py": "# coding: utf-8
\\"\\"\\"
Example API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
\\"\\"\\"
import unittest
import my_api_python
from my_api_python.model.say_hello_response_content import SayHelloResponseContent
from my_api_python import configuration
class TestSayHelloResponseContent(unittest.TestCase):
\\"\\"\\"SayHelloResponseContent unit test stubs\\"\\"\\"
_configuration = configuration.Configuration()
if __name__ == '__main__':
unittest.main()
",
"generated/python/test/test_paths/__init__.py": "import json
import typing
import urllib3
from urllib3._collections import HTTPHeaderDict
class ApiTestMixin:
json_content_type = 'application/json'
user_agent = 'OpenAPI-Generator/1.0.0/python'
@classmethod
def assert_pool_manager_request_called_with(
cls,
mock_request,
url: str,
method: str = 'POST',
body: typing.Optional[bytes] = None,
content_type: typing.Optional[str] = None,
accept_content_type: typing.Optional[str] = None,
stream: bool = False,
):
headers = {
'User-Agent': cls.user_agent
}
if accept_content_type:
headers['Accept'] = accept_content_type
if content_type:
headers['Content-Type'] = content_type
kwargs = dict(
headers=HTTPHeaderDict(headers),
preload_content=not stream,
timeout=None,
)
if content_type and method != 'GET':
kwargs['body'] = body
mock_request.assert_called_with(
method,
url,
**kwargs
)
@staticmethod
def headers_for_content_type(content_type: str) -> typing.Dict[str, str]:
return {'content-type': content_type}
@classmethod
def response(
cls,
body: typing.Union[str, bytes],
status: int = 200,
content_type: str = json_content_type,
headers: typing.Optional[typing.Dict[str, str]] = None,
preload_content: bool = True
) -> urllib3.HTTPResponse:
if headers is None:
headers = {}
headers.update(cls.headers_for_content_type(content_type))
return urllib3.HTTPResponse(
body,
headers=headers,
status=status,
preload_content=preload_content
)
@staticmethod
def json_bytes(in_data: typing.Any) -> bytes:
return json.dumps(in_data, separators=(\\",\\", \\":\\"), ensure_ascii=False).encode('utf-8')
",
"generated/python/test/test_paths/test_hello/__init__.py": "",
"generated/python/test/test_paths/test_hello/test_get.py": "# coding: utf-8
\\"\\"\\"
Generated by: https://openapi-generator.tech
\\"\\"\\"
import unittest
from unittest.mock import patch
import urllib3
import my_api_python
from my_api_python.paths.hello import get # noqa: E501
from my_api_python import configuration, schemas, api_client
from .. import ApiTestMixin
class TestHello(ApiTestMixin, unittest.TestCase):
\\"\\"\\"
Hello unit test stubs
\\"\\"\\"
_configuration = configuration.Configuration()
def setUp(self):
used_api_client = api_client.ApiClient(configuration=self._configuration)
self.api = get.ApiForget(api_client=used_api_client) # noqa: E501
def tearDown(self):
pass
response_status = 200
if __name__ == '__main__':
unittest.main()
",
"generated/python/tox.ini": "[tox]
envlist = py37
[testenv]
passenv = PYTHON_VERSION
deps=-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands=
pytest --cov=my_api_python
",
"my_api/__init__.py": "#",
"my_api/api/__init__.py": "#",
"my_api/api/api.py": "from dataclasses import fields
from aws_prototyping_sdk.open_api_gateway import OpenApiGatewayRestApi, OpenApiIntegration
from my_api_python.apis.tags.default_api_operation_config import OperationLookup, OperationConfig
from my_api.spec_utils import SPEC, SPEC_PATH
class Api(OpenApiGatewayRestApi):
\\"\\"\\"
Type-safe construct for the API Gateway resources defined by the spec.
You will likely not need to modify this file, and can instead extend it and define your integrations.
\\"\\"\\"
def __init__(self, scope, id, integrations: OperationConfig[OpenApiIntegration], **kwargs):
super().__init__(scope, id,
**kwargs,
integrations={ field.name: getattr(integrations, field.name) for field in fields(integrations) },
spec=SPEC,
spec_path=SPEC_PATH,
operation_lookup=OperationLookup,
)
",
"my_api/api/handlers/say_hello_handler_sample.py": "from my_api_python.apis.tags.default_api_operation_config import say_hello_handler, SayHelloRequest, ApiResponse, SayHelloOperationResponses
from my_api_python.model.say_hello_response_content import SayHelloResponseContent
@say_hello_handler
def handler(input: SayHelloRequest, **kwargs) -> SayHelloOperationResponses:
\\"\\"\\"
An example lambda handler which uses the generated handler wrapper to manage marshalling inputs/outputs
\\"\\"\\"
return ApiResponse(
status_code=200,
body=SayHelloResponseContent(message=\\"Hello {}!\\".format(input.request_parameters[\\"name\\"])),
headers={}
)
",
"my_api/api/sample_api.py": "from aws_prototyping_sdk.open_api_gateway import Authorizers, Integrations, OpenApiIntegration
from my_api_python.apis.tags.default_api_operation_config import OperationConfig
from aws_cdk.aws_lambda import LayerVersion, Code, Function, Runtime
from .api import Api
from constructs import Construct
from my_api.spec_utils import get_generated_client_layer_directory
from pathlib import Path
from os import path
class SampleApi(Construct):
\\"\\"\\"
An example of how to add integrations to your api
\\"\\"\\"
def __init__(self, scope, id):
super().__init__(scope, id)
# Layer which contains the generated client.
self.generated_client_layer = LayerVersion(self, 'GeneratedClientLayer',
code=Code.from_asset(get_generated_client_layer_directory())
)
self.api = Api(self, 'Api',
default_authorizer=Authorizers.iam(),
integrations=OperationConfig(
say_hello=OpenApiIntegration(
integration=Integrations.lambda_(Function(self, 'SayHello',
runtime=Runtime.PYTHON_3_9,
code=Code.from_asset(path.join(str(Path(__file__).parent.absolute()), 'handlers')),
handler=\\"say_hello_handler_sample.handler\\",
layers=[self.generated_client_layer],
)),
),
),
)
",
"my_api/spec/.parsed-spec.json": Object {
"components": Object {
"schemas": Object {
"ApiErrorResponseContent": Object {
"properties": Object {
"errorMessage": Object {
"type": "string",
},
},
"required": Array [
"errorMessage",
],
"type": "object",
},
"SayHelloResponseContent": Object {
"properties": Object {
"message": Object {
"type": "string",
},
},
"required": Array [
"message",
],
"type": "object",
},
},
},
"info": Object {
"title": "Example API",
"version": "1.0.0",
},
"openapi": "3.0.3",
"paths": Object {
"/hello": Object {
"get": Object {
"operationId": "sayHello",
"parameters": Array [
Object {
"in": "query",
"name": "name",
"required": true,
"schema": Object {
"type": "string",
},
},
],
"responses": Object {
"200": Object {
"content": Object {
"application/json": Object {
"schema": Object {
"$ref": "#/components/schemas/SayHelloResponseContent",
},
},
},
"description": "Successful response",
},
"400": Object {
"content": Object {
"application/json": Object {
"schema": Object {
"$ref": "#/components/schemas/ApiErrorResponseContent",
},
},
},
"description": "Error response",
},
},
},
},
},
},
"my_api/spec/spec.yaml": "openapi: 3.0.3
info:
version: 1.0.0
title: Example API
paths:
/hello:
get:
operationId: sayHello
parameters:
- in: query
name: name
schema:
type: string
required: true
responses:
'200':
description: Successful response
content:
'application/json':
schema:
$ref: '#/components/schemas/SayHelloResponseContent'
'400':
description: Error response
content:
'application/json':
schema:
$ref: '#/components/schemas/ApiErrorResponseContent'
components:
schemas:
ApiErrorResponseContent:
type: object
properties:
errorMessage:
type: string
required:
- errorMessage
SayHelloResponseContent:
type: object
properties:
message:
type: string
required:
- message
",
"my_api/spec_utils.py": "import pkgutil, json
from os import path
from pathlib import Path
SPEC_PATH = path.join(str(Path(__file__).absolute().parent), \\"spec/.parsed-spec.json\\")
SPEC = json.loads(pkgutil.get_data(__name__, \\"spec/.parsed-spec.json\\"))
def get_project_root():
return Path(__file__).absolute().parent.parent
def get_generated_client_layer_directory():
return path.join(str(get_project_root()), \\"generated/python/dist/layer\\")
",
"requirements-dev.txt": "# ~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".
projen==99.99.99
twine==3.3.0
wheel==0.36.2
",
"requirements.txt": "# ~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".
aws_prototyping_sdk.open_api_gateway
aws-cdk-lib
cdk-nag
constructs
my_api_python
",
"setup.py": "# ~~ Generated by projen. To modify, edit .projenrc.py and run \\"npx projen\\".
import json
from setuptools import setup
kwargs = json.loads(
\\"\\"\\"
{
\\"name\\": \\"my_api\\",
\\"python_requires\\": \\">=3.7\\",
\\"author\\": \\"test\\",
\\"author_email\\": \\"test@example.com\\",
\\"version\\": \\"1.0.0\\"
}
\\"\\"\\"
)
setup(**kwargs)
",
}
`;