# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """ This customization adds the following scalar parameters to the authorize operations: * --protocol: tcp | udp | icmp or any protocol number * --port: A single integer or a range (min-max). You can specify ``all`` to mean all ports (for example, port range 0-65535) * --source-group: Either the source security group ID or name. * --cidr - The IPv4 address range, in CIDR format. Cannot be used when specifying a source or destination security group. """ from awscli.arguments import CustomArgument def _add_params(argument_table, **kwargs): arg = ProtocolArgument('protocol', help_text=PROTOCOL_DOCS) argument_table['protocol'] = arg argument_table['ip-protocol']._UNDOCUMENTED = True arg = PortArgument('port', help_text=PORT_DOCS) argument_table['port'] = arg # Port handles both the from-port and to-port, # we need to not document both args. argument_table['from-port']._UNDOCUMENTED = True argument_table['to-port']._UNDOCUMENTED = True arg = CidrArgument('cidr', help_text=CIDR_DOCS) argument_table['cidr'] = arg argument_table['cidr-ip']._UNDOCUMENTED = True arg = SourceGroupArgument('source-group', help_text=SOURCEGROUP_DOCS) argument_table['source-group'] = arg argument_table['source-security-group-name']._UNDOCUMENTED = True arg = GroupOwnerArgument('group-owner', help_text=GROUPOWNER_DOCS) argument_table['group-owner'] = arg argument_table['source-security-group-owner-id']._UNDOCUMENTED = True def _check_args(parsed_args, **kwargs): # This function checks the parsed args. If the user specified # the --ip-permissions option with any of the scalar options we # raise an error. arg_dict = vars(parsed_args) if arg_dict['ip_permissions']: for key in ('protocol', 'port', 'cidr', 'source_group', 'group_owner'): if arg_dict[key]: msg = ('The --%s option is not compatible ' 'with the --ip-permissions option ') % key raise ValueError(msg) def _add_docs(help_command, **kwargs): doc = help_command.doc doc.style.new_paragraph() doc.style.start_note() msg = ('To specify multiple rules in a single command ' 'use the --ip-permissions option') doc.include_doc_string(msg) doc.style.end_note() EVENTS = [ ('building-argument-table.ec2.authorize-security-group-ingress', _add_params), ('building-argument-table.ec2.authorize-security-group-egress', _add_params), ('building-argument-table.ec2.revoke-security-group-ingress', _add_params), ('building-argument-table.ec2.revoke-security-group-egress', _add_params), ('operation-args-parsed.ec2.authorize-security-group-ingress', _check_args), ('operation-args-parsed.ec2.authorize-security-group-egress', _check_args), ('operation-args-parsed.ec2.revoke-security-group-ingress', _check_args), ('operation-args-parsed.ec2.revoke-security-group-egress', _check_args), ('doc-description.ec2.authorize-security-group-ingress', _add_docs), ('doc-description.ec2.authorize-security-group-egress', _add_docs), ('doc-description.ec2.revoke-security-group-ingress', _add_docs), ('doc-description.ec2.revoke-security-groupdoc-ingress', _add_docs), ] PROTOCOL_DOCS = ('

The IP protocol: tcp | ' 'udp | icmp

' '

(VPC only) Use all to specify all protocols.

' '

If this argument is provided without also providing the ' 'port argument, then it will be applied to all ' 'ports for the specified protocol.

') PORT_DOCS = ('

For TCP or UDP: The range of ports to allow.' ' A single integer or a range (min-max).

' '

For ICMP: A single integer or a range (type-code)' ' representing the ICMP type' ' number and the ICMP code number respectively.' ' A value of -1 indicates all ICMP codes for' ' all ICMP types. A value of -1 just for type' ' indicates all ICMP codes for the specified ICMP type.

') CIDR_DOCS = '

The IPv4 address range, in CIDR format.

' SOURCEGROUP_DOCS = ('

The name or ID of the source security group.

') GROUPOWNER_DOCS = ('

The AWS account ID that owns the source security ' 'group. Cannot be used when specifying a CIDR IP ' 'address.

') def register_secgroup(event_handler): for event, handler in EVENTS: event_handler.register(event, handler) def _build_ip_permissions(params, key, value): if 'IpPermissions' not in params: params['IpPermissions'] = [{}] if key == 'CidrIp': if 'IpRanges' not in params['ip_permissions'][0]: params['IpPermissions'][0]['IpRanges'] = [] params['IpPermissions'][0]['IpRanges'].append(value) elif key in ('GroupId', 'GroupName', 'UserId'): if 'UserIdGroupPairs' not in params['IpPermissions'][0]: params['IpPermissions'][0]['UserIdGroupPairs'] = [{}] params['IpPermissions'][0]['UserIdGroupPairs'][0][key] = value else: params['IpPermissions'][0][key] = value class ProtocolArgument(CustomArgument): def add_to_params(self, parameters, value): if value: try: int_value = int(value) if (int_value < 0 or int_value > 255) and int_value != -1: msg = ('protocol numbers must be in the range 0-255 ' 'or -1 to specify all protocols') raise ValueError(msg) except ValueError: if value not in ('tcp', 'udp', 'icmp', 'all'): msg = ('protocol parameter should be one of: ' 'tcp|udp|icmp|all or any valid protocol number.') raise ValueError(msg) if value == 'all': value = '-1' _build_ip_permissions(parameters, 'IpProtocol', value) class PortArgument(CustomArgument): def add_to_params(self, parameters, value): if value: try: if value == '-1' or value == 'all': fromstr = '-1' tostr = '-1' elif '-' in value: # We can get away with simple logic here because # argparse will not allow values such as # "-1-8", and these aren't actually valid # values any from from/to ports. fromstr, tostr = value.split('-', 1) else: fromstr, tostr = (value, value) _build_ip_permissions(parameters, 'FromPort', int(fromstr)) _build_ip_permissions(parameters, 'ToPort', int(tostr)) except ValueError: msg = ('port parameter should be of the ' 'form (e.g. 22 or 22-25)') raise ValueError(msg) class CidrArgument(CustomArgument): def add_to_params(self, parameters, value): if value: value = [{'CidrIp': value}] _build_ip_permissions(parameters, 'IpRanges', value) class SourceGroupArgument(CustomArgument): def add_to_params(self, parameters, value): if value: if value.startswith('sg-'): _build_ip_permissions(parameters, 'GroupId', value) else: _build_ip_permissions(parameters, 'GroupName', value) class GroupOwnerArgument(CustomArgument): def add_to_params(self, parameters, value): if value: _build_ip_permissions(parameters, 'UserId', value)