"""Utilities for implementing `nbio_interface.AbstractIOServices` for pika connection adapters. """ import collections import errno import functools import logging import numbers import os import socket import ssl import sys import traceback from pika.adapters.utils.nbio_interface import (AbstractIOReference, AbstractStreamTransport) import pika.compat import pika.diagnostic_utils # "Try again" error codes for non-blocking socket I/O - send()/recv(). # NOTE: POSIX.1 allows either error to be returned for this case and doesn't require # them to have the same value. _TRY_IO_AGAIN_SOCK_ERROR_CODES = ( errno.EAGAIN, errno.EWOULDBLOCK, ) # "Connection establishment pending" error codes for non-blocking socket # connect() call. # NOTE: EINPROGRESS for Posix and EWOULDBLOCK for Windows _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES = ( errno.EINPROGRESS, errno.EWOULDBLOCK, ) _LOGGER = logging.getLogger(__name__) # Decorator that logs exceptions escaping from the decorated function _log_exceptions = pika.diagnostic_utils.create_log_exception_decorator(_LOGGER) # pylint: disable=C0103 def check_callback_arg(callback, name): """Raise TypeError if callback is not callable :param callback: callback to check :param name: Name to include in exception text :raises TypeError: """ if not callable(callback): raise TypeError('{} must be callable, but got {!r}'.format( name, callback)) def check_fd_arg(fd): """Raise TypeError if file descriptor is not an integer :param fd: file descriptor :raises TypeError: """ if not isinstance(fd, numbers.Integral): raise TypeError( 'Paramter must be a file descriptor, but got {!r}'.format(fd)) def _retry_on_sigint(func): """Function decorator for retrying on SIGINT. """ @functools.wraps(func) def retry_sigint_wrap(*args, **kwargs): """Wrapper for decorated function""" while True: try: return func(*args, **kwargs) except pika.compat.SOCKET_ERROR as error: if error.errno == errno.EINTR: continue else: raise return retry_sigint_wrap class SocketConnectionMixin(object): """Implements `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()` on top of `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and basic `pika.adapters.utils.nbio_interface.AbstractIOServices`. """ def connect_socket(self, sock, resolved_addr, on_done): """Implement :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`. """ return _AsyncSocketConnector( nbio=self, sock=sock, resolved_addr=resolved_addr, on_done=on_done).start() class StreamingConnectionMixin(object): """Implements `.nbio_interface.AbstractIOServices.create_streaming_connection()` on top of `.nbio_interface.AbstractFileDescriptorServices` and basic `nbio_interface.AbstractIOServices` services. """ def create_streaming_connection(self, protocol_factory, sock, on_done, ssl_context=None, server_hostname=None): """Implement :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`. """ try: return _AsyncStreamConnector( nbio=self, protocol_factory=protocol_factory, sock=sock, ssl_context=ssl_context, server_hostname=server_hostname, on_done=on_done).start() except Exception as error: _LOGGER.error('create_streaming_connection(%s) failed: %r', sock, error) # Close the socket since this function takes ownership try: sock.close() except Exception as error: # pylint: disable=W0703 # We log and suppress the exception from sock.close() so that # the original error from _AsyncStreamConnector constructor will # percolate _LOGGER.error('%s.close() failed: %r', sock, error) raise class _AsyncServiceAsyncHandle(AbstractIOReference): """This module's adaptation of `.nbio_interface.AbstractIOReference` """ def __init__(self, subject): """ :param subject: subject of the reference containing a `cancel()` method """ self._cancel = subject.cancel def cancel(self): """Cancel pending operation :returns: False if was already done or cancelled; True otherwise :rtype: bool """ return self._cancel() class _AsyncSocketConnector(object): """Connects the given non-blocking socket asynchronously using `.nbio_interface.AbstractFileDescriptorServices` and basic `.nbio_interface.AbstractIOServices`. Used for implementing `.nbio_interface.AbstractIOServices.connect_socket()`. """ _STATE_NOT_STARTED = 0 # start() not called yet _STATE_ACTIVE = 1 # workflow started _STATE_CANCELED = 2 # workflow aborted by user's cancel() call _STATE_COMPLETED = 3 # workflow completed: succeeded or failed def __init__(self, nbio, sock, resolved_addr, on_done): """ :param AbstractIOServices | AbstractFileDescriptorServices nbio: :param socket.socket sock: non-blocking socket that needs to be connected via `socket.socket.connect()` :param tuple resolved_addr: resolved destination address/port two-tuple which is compatible with the given's socket's address family :param callable on_done: user callback that takes None upon successful completion or exception upon error (check for `BaseException`) as its only arg. It will not be called if the operation was cancelled. :raises ValueError: if host portion of `resolved_addr` is not an IP address or is inconsistent with the socket's address family as validated via `socket.inet_pton()` """ check_callback_arg(on_done, 'on_done') try: socket.inet_pton(sock.family, resolved_addr[0]) except Exception as error: # pylint: disable=W0703 if not hasattr(socket, 'inet_pton'): _LOGGER.debug( 'Unable to check resolved address: no socket.inet_pton().') else: msg = ('Invalid or unresolved IP address ' '{!r} for socket {}: {!r}').format( resolved_addr, sock, error) _LOGGER.error(msg) raise ValueError(msg) self._nbio = nbio self._sock = sock self._addr = resolved_addr self._on_done = on_done self._state = self._STATE_NOT_STARTED self._watching_socket_events = False @_log_exceptions def _cleanup(self): """Remove socket watcher, if any """ if self._watching_socket_events: self._watching_socket_events = False self._nbio.remove_writer(self._sock.fileno()) def start(self): """Start asynchronous connection establishment. :rtype: AbstractIOReference """ assert self._state == self._STATE_NOT_STARTED, ( '_AsyncSocketConnector.start(): expected _STATE_NOT_STARTED', self._state) self._state = self._STATE_ACTIVE # Continue the rest of the operation on the I/O loop to avoid calling # user's completion callback from the scope of user's call self._nbio.add_callback_threadsafe(self._start_async) return _AsyncServiceAsyncHandle(self) def cancel(self): """Cancel pending connection request without calling user's completion callback. :returns: False if was already done or cancelled; True otherwise :rtype: bool """ if self._state == self._STATE_ACTIVE: self._state = self._STATE_CANCELED _LOGGER.debug('User canceled connection request for %s to %s', self._sock, self._addr) self._cleanup() return True _LOGGER.debug( '_AsyncSocketConnector cancel requested when not ACTIVE: ' 'state=%s; %s', self._state, self._sock) return False @_log_exceptions def _report_completion(self, result): """Advance to COMPLETED state, remove socket watcher, and invoke user's completion callback. :param BaseException | None result: value to pass in user's callback """ _LOGGER.debug('_AsyncSocketConnector._report_completion(%r); %s', result, self._sock) assert isinstance(result, (BaseException, type(None))), ( '_AsyncSocketConnector._report_completion() expected exception or ' 'None as result.', result) assert self._state == self._STATE_ACTIVE, ( '_AsyncSocketConnector._report_completion() expected ' '_STATE_NOT_STARTED', self._state) self._state = self._STATE_COMPLETED self._cleanup() self._on_done(result) @_log_exceptions def _start_async(self): """Called as callback from I/O loop to kick-start the workflow, so it's safe to call user's completion callback from here, if needed """ if self._state != self._STATE_ACTIVE: # Must have been canceled by user before we were called _LOGGER.debug( 'Abandoning sock=%s connection establishment to %s ' 'due to inactive state=%s', self._sock, self._addr, self._state) return try: self._sock.connect(self._addr) except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703 if (isinstance(error, pika.compat.SOCKET_ERROR) and error.errno in _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES): # Connection establishment is pending pass else: _LOGGER.error('%s.connect(%s) failed: %r', self._sock, self._addr, error) self._report_completion(error) return # Get notified when the socket becomes writable try: self._nbio.set_writer(self._sock.fileno(), self._on_writable) except Exception as error: # pylint: disable=W0703 _LOGGER.exception('async.set_writer(%s) failed: %r', self._sock, error) self._report_completion(error) return else: self._watching_socket_events = True _LOGGER.debug('Connection-establishment is in progress for %s.', self._sock) @_log_exceptions def _on_writable(self): """Called when socket connects or fails to. Check for predicament and invoke user's completion callback. """ if self._state != self._STATE_ACTIVE: # This should never happen since we remove the watcher upon # `cancel()` _LOGGER.error( 'Socket connection-establishment event watcher ' 'called in inactive state (ignoring): %s; state=%s', self._sock, self._state) return # The moment of truth... error_code = self._sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if not error_code: _LOGGER.info('Socket connected: %s', self._sock) result = None else: error_msg = os.strerror(error_code) _LOGGER.error('Socket failed to connect: %s; error=%s (%s)', self._sock, error_code, error_msg) result = pika.compat.SOCKET_ERROR(error_code, error_msg) self._report_completion(result) class _AsyncStreamConnector(object): """Performs asynchronous SSL session establishment, if requested, on the already-connected socket and links the streaming transport to protocol. Used for implementing `.nbio_interface.AbstractIOServices.create_streaming_connection()`. """ _STATE_NOT_STARTED = 0 # start() not called yet _STATE_ACTIVE = 1 # start() called and kicked off the workflow _STATE_CANCELED = 2 # workflow terminated by cancel() request _STATE_COMPLETED = 3 # workflow terminated by success or failure def __init__(self, nbio, protocol_factory, sock, ssl_context, server_hostname, on_done): """ NOTE: We take ownership of the given socket upon successful completion of the constructor. See `AbstractIOServices.create_streaming_connection()` for detailed documentation of the corresponding args. :param AbstractIOServices | AbstractFileDescriptorServices nbio: :param callable protocol_factory: :param socket.socket sock: :param ssl.SSLContext | None ssl_context: :param str | None server_hostname: :param callable on_done: """ check_callback_arg(protocol_factory, 'protocol_factory') check_callback_arg(on_done, 'on_done') if not isinstance(ssl_context, (type(None), ssl.SSLContext)): raise ValueError('Expected ssl_context=None | ssl.SSLContext, but ' 'got {!r}'.format(ssl_context)) if server_hostname is not None and ssl_context is None: raise ValueError('Non-None server_hostname must not be passed ' 'without ssl context') # Check that the socket connection establishment had completed in order # to avoid stalling while waiting for the socket to become readable # and/or writable. try: sock.getpeername() except Exception as error: raise ValueError( 'Expected connected socket, but getpeername() failed: ' 'error={!r}; {}; '.format(error, sock)) self._nbio = nbio self._protocol_factory = protocol_factory self._sock = sock self._ssl_context = ssl_context self._server_hostname = server_hostname self._on_done = on_done self._state = self._STATE_NOT_STARTED self._watching_socket = False @_log_exceptions def _cleanup(self, close): """Cancel pending async operations, if any :param bool close: close the socket if true """ _LOGGER.debug('_AsyncStreamConnector._cleanup(%r)', close) if self._watching_socket: _LOGGER.debug( '_AsyncStreamConnector._cleanup(%r): removing RdWr; %s', close, self._sock) self._watching_socket = False self._nbio.remove_reader(self._sock.fileno()) self._nbio.remove_writer(self._sock.fileno()) try: if close: _LOGGER.debug( '_AsyncStreamConnector._cleanup(%r): closing socket; %s', close, self._sock) try: self._sock.close() except Exception as error: # pylint: disable=W0703 _LOGGER.exception('_sock.close() failed: error=%r; %s', error, self._sock) raise finally: self._sock = None self._nbio = None self._protocol_factory = None self._ssl_context = None self._server_hostname = None self._on_done = None def start(self): """Kick off the workflow :rtype: AbstractIOReference """ _LOGGER.debug('_AsyncStreamConnector.start(); %s', self._sock) assert self._state == self._STATE_NOT_STARTED, ( '_AsyncStreamConnector.start() expected ' '_STATE_NOT_STARTED', self._state) self._state = self._STATE_ACTIVE # Request callback from I/O loop to start processing so that we don't # end up making callbacks from the caller's scope self._nbio.add_callback_threadsafe(self._start_async) return _AsyncServiceAsyncHandle(self) def cancel(self): """Cancel pending connection request without calling user's completion callback. :returns: False if was already done or cancelled; True otherwise :rtype: bool """ if self._state == self._STATE_ACTIVE: self._state = self._STATE_CANCELED _LOGGER.debug('User canceled streaming linkup for %s', self._sock) # Close the socket, since we took ownership self._cleanup(close=True) return True _LOGGER.debug( '_AsyncStreamConnector cancel requested when not ACTIVE: ' 'state=%s; %s', self._state, self._sock) return False @_log_exceptions def _report_completion(self, result): """Advance to COMPLETED state, cancel async operation(s), and invoke user's completion callback. :param BaseException | tuple result: value to pass in user's callback. `tuple(transport, protocol)` on success, exception on error """ _LOGGER.debug('_AsyncStreamConnector._report_completion(%r); %s', result, self._sock) assert isinstance(result, (BaseException, tuple)), ( '_AsyncStreamConnector._report_completion() expected exception or ' 'tuple as result.', result, self._state) assert self._state == self._STATE_ACTIVE, ( '_AsyncStreamConnector._report_completion() expected ' '_STATE_ACTIVE', self._state) self._state = self._STATE_COMPLETED # Notify user try: self._on_done(result) except Exception: _LOGGER.exception('%r: _on_done(%r) failed.', self._report_completion, result) raise finally: # NOTE: Close the socket on error, since we took ownership of it self._cleanup(close=isinstance(result, BaseException)) @_log_exceptions def _start_async(self): """Called as callback from I/O loop to kick-start the workflow, so it's safe to call user's completion callback from here if needed """ _LOGGER.debug('_AsyncStreamConnector._start_async(); %s', self._sock) if self._state != self._STATE_ACTIVE: # Must have been canceled by user before we were called _LOGGER.debug( 'Abandoning streaming linkup due to inactive state ' 'transition; state=%s; %s; .', self._state, self._sock) return # Link up protocol and transport if this is a plaintext linkup; # otherwise kick-off SSL workflow first if self._ssl_context is None: self._linkup() else: _LOGGER.debug('Starting SSL handshake on %s', self._sock) # Wrap our plain socket in ssl socket try: self._sock = self._ssl_context.wrap_socket( self._sock, server_side=False, do_handshake_on_connect=False, suppress_ragged_eofs=False, # False = error on incoming EOF server_hostname=self._server_hostname) except Exception as error: # pylint: disable=W0703 _LOGGER.exception('SSL wrap_socket(%s) failed: %r', self._sock, error) self._report_completion(error) return self._do_ssl_handshake() @_log_exceptions def _linkup(self): """Connection is ready: instantiate and link up transport and protocol, and invoke user's completion callback. """ _LOGGER.debug('_AsyncStreamConnector._linkup()') transport = None try: # Create the protocol try: protocol = self._protocol_factory() except Exception as error: _LOGGER.exception('protocol_factory() failed: error=%r; %s', error, self._sock) raise if self._ssl_context is None: # Create plaintext streaming transport try: transport = _AsyncPlaintextTransport( self._sock, protocol, self._nbio) except Exception as error: _LOGGER.exception('PlainTransport() failed: error=%r; %s', error, self._sock) raise else: # Create SSL streaming transport try: transport = _AsyncSSLTransport(self._sock, protocol, self._nbio) except Exception as error: _LOGGER.exception('SSLTransport() failed: error=%r; %s', error, self._sock) raise _LOGGER.debug('_linkup(): created transport %r', transport) # Acquaint protocol with its transport try: protocol.connection_made(transport) except Exception as error: _LOGGER.exception( 'protocol.connection_made(%r) failed: error=%r; %s', transport, error, self._sock) raise _LOGGER.debug('_linkup(): introduced transport to protocol %r; %r', transport, protocol) except Exception as error: # pylint: disable=W0703 result = error else: result = (transport, protocol) self._report_completion(result) @_log_exceptions def _do_ssl_handshake(self): """Perform asynchronous SSL handshake on the already wrapped socket """ _LOGGER.debug('_AsyncStreamConnector._do_ssl_handshake()') if self._state != self._STATE_ACTIVE: _LOGGER.debug( '_do_ssl_handshake: Abandoning streaming linkup due ' 'to inactive state transition; state=%s; %s; .', self._state, self._sock) return done = False try: try: self._sock.do_handshake() except ssl.SSLError as error: if error.errno == ssl.SSL_ERROR_WANT_READ: _LOGGER.debug('SSL handshake wants read; %s.', self._sock) self._watching_socket = True self._nbio.set_reader(self._sock.fileno(), self._do_ssl_handshake) self._nbio.remove_writer(self._sock.fileno()) elif error.errno == ssl.SSL_ERROR_WANT_WRITE: _LOGGER.debug('SSL handshake wants write. %s', self._sock) self._watching_socket = True self._nbio.set_writer(self._sock.fileno(), self._do_ssl_handshake) self._nbio.remove_reader(self._sock.fileno()) else: # Outer catch will report it raise else: done = True _LOGGER.info('SSL handshake completed successfully: %s', self._sock) except Exception as error: # pylint: disable=W0703 _LOGGER.exception('SSL do_handshake failed: error=%r; %s', error, self._sock) self._report_completion(error) return if done: # Suspend I/O and link up transport with protocol _LOGGER.debug( '_do_ssl_handshake: removing watchers ahead of linkup: %s', self._sock) self._nbio.remove_reader(self._sock.fileno()) self._nbio.remove_writer(self._sock.fileno()) # So that our `_cleanup()` won't interfere with the transport's # socket watcher configuration. self._watching_socket = False _LOGGER.debug( '_do_ssl_handshake: pre-linkup removal of watchers is done; %s', self._sock) self._linkup() class _AsyncTransportBase( # pylint: disable=W0223 AbstractStreamTransport): """Base class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`. """ _STATE_ACTIVE = 1 _STATE_FAILED = 2 # connection failed _STATE_ABORTED_BY_USER = 3 # cancel() called _STATE_COMPLETED = 4 # done with connection _MAX_RECV_BYTES = 4096 # per socket.recv() documentation recommendation # Max per consume call to prevent event starvation _MAX_CONSUME_BYTES = 1024 * 100 class RxEndOfFile(OSError): """We raise this internally when EOF (empty read) is detected on input. """ def __init__(self): super(_AsyncTransportBase.RxEndOfFile, self).__init__( -1, 'End of input stream (EOF)') def __init__(self, sock, protocol, nbio): """ :param socket.socket | ssl.SSLSocket sock: connected socket :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol: corresponding protocol in this transport/protocol pairing; the protocol already had its `connection_made()` method called. :param AbstractIOServices | AbstractFileDescriptorServices nbio: """ _LOGGER.debug('_AsyncTransportBase.__init__: %s', sock) self._sock = sock self._protocol = protocol self._nbio = nbio self._state = self._STATE_ACTIVE self._tx_buffers = collections.deque() self._tx_buffered_byte_count = 0 def abort(self): """Close connection abruptly without waiting for pending I/O to complete. Will invoke the corresponding protocol's `connection_lost()` method asynchronously (not in context of the abort() call). :raises Exception: Exception-based exception on error """ _LOGGER.info('Aborting transport connection: state=%s; %s', self._state, self._sock) self._initiate_abort(None) def get_protocol(self): """Return the protocol linked to this transport. :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol """ return self._protocol def get_write_buffer_size(self): """ :returns: Current size of output data buffered by the transport :rtype: int """ return self._tx_buffered_byte_count def _buffer_tx_data(self, data): """Buffer the given data until it can be sent asynchronously. :param bytes data: :raises ValueError: if called with empty data """ if not data: _LOGGER.error('write() called with empty data: state=%s; %s', self._state, self._sock) raise ValueError('write() called with empty data {!r}'.format(data)) if self._state != self._STATE_ACTIVE: _LOGGER.debug( 'Ignoring write() called during inactive state: ' 'state=%s; %s', self._state, self._sock) return self._tx_buffers.append(data) self._tx_buffered_byte_count += len(data) def _consume(self): """Utility method for use by subclasses to ingest data from socket and dispatch it to protocol's `data_received()` method socket-specific "try again" exception, per-event data consumption limit is reached, transport becomes inactive, or a fatal failure. Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or until state becomes inactive (e.g., `protocol.data_received()` callback aborts the transport) :raises: Whatever the corresponding `sock.recv()` raises except the socket error with errno.EINTR :raises: Whatever the `protocol.data_received()` callback raises :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream """ bytes_consumed = 0 while (self._state == self._STATE_ACTIVE and bytes_consumed < self._MAX_CONSUME_BYTES): data = self._sigint_safe_recv(self._sock, self._MAX_RECV_BYTES) bytes_consumed += len(data) # Empty data, should disconnect if not data: _LOGGER.error('Socket EOF; %s', self._sock) raise self.RxEndOfFile() # Pass the data to the protocol try: self._protocol.data_received(data) except Exception as error: _LOGGER.exception( 'protocol.data_received() failed: error=%r; %s', error, self._sock) raise def _produce(self): """Utility method for use by subclasses to emit data from tx_buffers. This method sends chunks from `tx_buffers` until all chunks are exhausted or sending is interrupted by an exception. Maintains integrity of `self.tx_buffers`. :raises: whatever the corresponding `sock.send()` raises except the socket error with errno.EINTR """ while self._tx_buffers: num_bytes_sent = self._sigint_safe_send(self._sock, self._tx_buffers[0]) chunk = self._tx_buffers.popleft() if num_bytes_sent < len(chunk): _LOGGER.debug('Partial send, requeing remaining data; %s of %s', num_bytes_sent, len(chunk)) self._tx_buffers.appendleft(chunk[num_bytes_sent:]) self._tx_buffered_byte_count -= num_bytes_sent assert self._tx_buffered_byte_count >= 0, ( '_AsyncTransportBase._produce() tx buffer size underflow', self._tx_buffered_byte_count, self._state) @staticmethod @_retry_on_sigint def _sigint_safe_recv(sock, max_bytes): """Receive data from socket, retrying on SIGINT. :param sock: stream or SSL socket :param max_bytes: maximum number of bytes to receive :returns: received data or empty bytes uppon end of file :rtype: bytes :raises: whatever the corresponding `sock.recv()` raises except socket error with errno.EINTR """ return sock.recv(max_bytes) @staticmethod @_retry_on_sigint def _sigint_safe_send(sock, data): """Send data to socket, retrying on SIGINT. :param sock: stream or SSL socket :param data: data bytes to send :returns: number of bytes actually sent :rtype: int :raises: whatever the corresponding `sock.send()` raises except socket error with errno.EINTR """ return sock.send(data) @_log_exceptions def _deactivate(self): """Unregister the transport from I/O events """ if self._state == self._STATE_ACTIVE: _LOGGER.info('Deactivating transport: state=%s; %s', self._state, self._sock) self._nbio.remove_reader(self._sock.fileno()) self._nbio.remove_writer(self._sock.fileno()) self._tx_buffers.clear() @_log_exceptions def _close_and_finalize(self): """Close the transport's socket and unlink the transport it from references to other assets (protocol, etc.) """ if self._state != self._STATE_COMPLETED: _LOGGER.info('Closing transport socket and unlinking: state=%s; %s', self._state, self._sock) try: self._sock.shutdown(socket.SHUT_RDWR) except pika.compat.SOCKET_ERROR: pass self._sock.close() self._sock = None self._protocol = None self._nbio = None self._state = self._STATE_COMPLETED @_log_exceptions def _initiate_abort(self, error): """Initiate asynchronous abort of the transport that concludes with a call to the protocol's `connection_lost()` method. No flushing of output buffers will take place. :param BaseException | None error: None if being canceled by user, including via falsie return value from protocol.eof_received; otherwise the exception corresponding to the the failed connection. """ _LOGGER.info( '_AsyncTransportBase._initate_abort(): Initiating abrupt ' 'asynchronous transport shutdown: state=%s; error=%r; %s', self._state, error, self._sock) assert self._state != self._STATE_COMPLETED, ( '_AsyncTransportBase._initate_abort() expected ' 'non-_STATE_COMPLETED', self._state) if self._state == self._STATE_COMPLETED: return self._deactivate() # Update state if error is None: # Being aborted by user if self._state == self._STATE_ABORTED_BY_USER: # Abort by user already pending _LOGGER.debug('_AsyncTransportBase._initiate_abort(): ' 'ignoring - user-abort already pending.') return # Notification priority is given to user-initiated abort over # failed connection self._state = self._STATE_ABORTED_BY_USER else: # Connection failed if self._state != self._STATE_ACTIVE: assert self._state == self._STATE_ABORTED_BY_USER, ( '_AsyncTransportBase._initate_abort() expected ' '_STATE_ABORTED_BY_USER', self._state) return self._state = self._STATE_FAILED # Schedule callback from I/O loop to avoid potential reentry into user # code self._nbio.add_callback_threadsafe( functools.partial(self._connection_lost_notify_async, error)) @_log_exceptions def _connection_lost_notify_async(self, error): """Handle aborting of transport either due to socket error or user- initiated `abort()` call. Must be called from an I/O loop callback owned by us in order to avoid reentry into user code from user's API call into the transport. :param BaseException | None error: None if being canceled by user; otherwise the exception corresponding to the the failed connection. """ _LOGGER.debug('Concluding transport shutdown: state=%s; error=%r', self._state, error) if self._state == self._STATE_COMPLETED: return if error is not None and self._state != self._STATE_FAILED: # Priority is given to user-initiated abort notification assert self._state == self._STATE_ABORTED_BY_USER, ( '_AsyncTransportBase._connection_lost_notify_async() ' 'expected _STATE_ABORTED_BY_USER', self._state) return # Inform protocol try: self._protocol.connection_lost(error) except Exception as exc: # pylint: disable=W0703 _LOGGER.exception('protocol.connection_lost(%r) failed: exc=%r; %s', error, exc, self._sock) # Re-raise, since we've exhausted our normal failure notification # mechanism (i.e., connection_lost()) raise finally: self._close_and_finalize() class _AsyncPlaintextTransport(_AsyncTransportBase): """Implementation of `nbio_interface.AbstractStreamTransport` for a plaintext connection. """ def __init__(self, sock, protocol, nbio): """ :param socket.socket sock: non-blocking connected socket :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol: corresponding protocol in this transport/protocol pairing; the protocol already had its `connection_made()` method called. :param AbstractIOServices | AbstractFileDescriptorServices nbio: """ super(_AsyncPlaintextTransport, self).__init__(sock, protocol, nbio) # Request to be notified of incoming data; we'll watch for writability # only when our write buffer is non-empty self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable) def write(self, data): """Buffer the given data until it can be sent asynchronously. :param bytes data: :raises ValueError: if called with empty data """ if self._state != self._STATE_ACTIVE: _LOGGER.debug( 'Ignoring write() called during inactive state: ' 'state=%s; %s', self._state, self._sock) return assert data, ('_AsyncPlaintextTransport.write(): empty data from user.', data, self._state) # pika/pika#1286 # NOTE: Modify code to write data to buffer before setting writer. # Otherwise a race condition can occur where ioloop executes writer # while buffer is still empty. tx_buffer_was_empty = self.get_write_buffer_size() == 0 self._buffer_tx_data(data) if tx_buffer_was_empty: self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable) _LOGGER.debug('Turned on writability watcher: %s', self._sock) @_log_exceptions def _on_socket_readable(self): """Ingest data from socket and dispatch it to protocol until exception occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption limit is reached, transport becomes inactive, or failure. """ if self._state != self._STATE_ACTIVE: _LOGGER.debug( 'Ignoring readability notification due to inactive ' 'state: state=%s; %s', self._state, self._sock) return try: self._consume() except self.RxEndOfFile: try: keep_open = self._protocol.eof_received() except Exception as error: # pylint: disable=W0703 _LOGGER.exception( 'protocol.eof_received() failed: error=%r; %s', error, self._sock) self._initiate_abort(error) else: if keep_open: _LOGGER.info( 'protocol.eof_received() elected to keep open: %s', self._sock) self._nbio.remove_reader(self._sock.fileno()) else: _LOGGER.info('protocol.eof_received() elected to close: %s', self._sock) self._initiate_abort(None) except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703 if (isinstance(error, pika.compat.SOCKET_ERROR) and error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES): _LOGGER.debug('Recv would block on %s', self._sock) else: _LOGGER.exception( '_AsyncBaseTransport._consume() failed, aborting ' 'connection: error=%r; sock=%s; Caller\'s stack:\n%s', error, self._sock, ''.join( traceback.format_exception(*sys.exc_info()))) self._initiate_abort(error) else: if self._state != self._STATE_ACTIVE: # Most likely our protocol's `data_received()` aborted the # transport _LOGGER.debug( 'Leaving Plaintext consumer due to inactive ' 'state: state=%s; %s', self._state, self._sock) @_log_exceptions def _on_socket_writable(self): """Handle writable socket notification """ if self._state != self._STATE_ACTIVE: _LOGGER.debug( 'Ignoring writability notification due to inactive ' 'state: state=%s; %s', self._state, self._sock) return # We shouldn't be getting called with empty tx buffers assert self._tx_buffers, ( '_AsyncPlaintextTransport._on_socket_writable() called, ' 'but _tx_buffers is empty.', self._state) try: # Transmit buffered data to remote socket self._produce() except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703 if (isinstance(error, pika.compat.SOCKET_ERROR) and error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES): _LOGGER.debug('Send would block on %s', self._sock) else: _LOGGER.exception( '_AsyncBaseTransport._produce() failed, aborting ' 'connection: error=%r; sock=%s; Caller\'s stack:\n%s', error, self._sock, ''.join( traceback.format_exception(*sys.exc_info()))) self._initiate_abort(error) else: if not self._tx_buffers: self._nbio.remove_writer(self._sock.fileno()) _LOGGER.debug('Turned off writability watcher: %s', self._sock) class _AsyncSSLTransport(_AsyncTransportBase): """Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL connection. """ def __init__(self, sock, protocol, nbio): """ :param ssl.SSLSocket sock: non-blocking connected socket :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol: corresponding protocol in this transport/protocol pairing; the protocol already had its `connection_made()` method called. :param AbstractIOServices | AbstractFileDescriptorServices nbio: """ super(_AsyncSSLTransport, self).__init__(sock, protocol, nbio) self._ssl_readable_action = self._consume self._ssl_writable_action = None # Bootstrap consumer; we'll take care of producer once data is buffered self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable) # Try reading asap just in case read-ahead caused some self._nbio.add_callback_threadsafe(self._on_socket_readable) def write(self, data): """Buffer the given data until it can be sent asynchronously. :param bytes data: :raises ValueError: if called with empty data """ if self._state != self._STATE_ACTIVE: _LOGGER.debug( 'Ignoring write() called during inactive state: ' 'state=%s; %s', self._state, self._sock) return assert data, ('_AsyncSSLTransport.write(): empty data from user.', data, self._state) # pika/pika#1286 # NOTE: Modify code to write data to buffer before setting writer. # Otherwise a race condition can occur where ioloop executes writer # while buffer is still empty. tx_buffer_was_empty = self.get_write_buffer_size() == 0 self._buffer_tx_data(data) if tx_buffer_was_empty and self._ssl_writable_action is None: self._ssl_writable_action = self._produce self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable) _LOGGER.debug('Turned on writability watcher: %s', self._sock) @_log_exceptions def _on_socket_readable(self): """Handle readable socket indication """ if self._state != self._STATE_ACTIVE: _LOGGER.debug( 'Ignoring readability notification due to inactive ' 'state: state=%s; %s', self._state, self._sock) return if self._ssl_readable_action: try: self._ssl_readable_action() except Exception as error: # pylint: disable=W0703 self._initiate_abort(error) else: _LOGGER.debug( 'SSL readable action was suppressed: ' 'ssl_writable_action=%r; %s', self._ssl_writable_action, self._sock) @_log_exceptions def _on_socket_writable(self): """Handle writable socket notification """ if self._state != self._STATE_ACTIVE: _LOGGER.debug( 'Ignoring writability notification due to inactive ' 'state: state=%s; %s', self._state, self._sock) return if self._ssl_writable_action: try: self._ssl_writable_action() except Exception as error: # pylint: disable=W0703 self._initiate_abort(error) else: _LOGGER.debug( 'SSL writable action was suppressed: ' 'ssl_readable_action=%r; %s', self._ssl_readable_action, self._sock) @_log_exceptions def _consume(self): """[override] Ingest data from socket and dispatch it to protocol until exception occurs (typically ssl.SSLError with SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached, transport becomes inactive, or failure. Update consumer/producer registration. :raises Exception: error that signals that connection needs to be aborted """ next_consume_on_readable = True try: super(_AsyncSSLTransport, self)._consume() except ssl.SSLError as error: if error.errno == ssl.SSL_ERROR_WANT_READ: _LOGGER.debug('SSL ingester wants read: %s', self._sock) elif error.errno == ssl.SSL_ERROR_WANT_WRITE: # Looks like SSL re-negotiation _LOGGER.debug('SSL ingester wants write: %s', self._sock) next_consume_on_readable = False else: _LOGGER.exception( '_AsyncBaseTransport._consume() failed, aborting ' 'connection: error=%r; sock=%s; Caller\'s stack:\n%s', error, self._sock, ''.join( traceback.format_exception(*sys.exc_info()))) raise # let outer catch block abort the transport else: if self._state != self._STATE_ACTIVE: # Most likely our protocol's `data_received()` aborted the # transport _LOGGER.debug( 'Leaving SSL consumer due to inactive ' 'state: state=%s; %s', self._state, self._sock) return # Consumer exited without exception; there may still be more, # possibly unprocessed, data records in SSL input buffers that # can be read without waiting for socket to become readable. # In case buffered input SSL data records still remain self._nbio.add_callback_threadsafe(self._on_socket_readable) # Update consumer registration if next_consume_on_readable: if not self._ssl_readable_action: self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable) self._ssl_readable_action = self._consume # NOTE: can't use identity check, it fails for instance methods if self._ssl_writable_action == self._consume: # pylint: disable=W0143 self._nbio.remove_writer(self._sock.fileno()) self._ssl_writable_action = None else: # WANT_WRITE if not self._ssl_writable_action: self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable) self._ssl_writable_action = self._consume if self._ssl_readable_action: self._nbio.remove_reader(self._sock.fileno()) self._ssl_readable_action = None # Update producer registration if self._tx_buffers and not self._ssl_writable_action: self._ssl_writable_action = self._produce self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable) @_log_exceptions def _produce(self): """[override] Emit data from tx_buffers all chunks are exhausted or sending is interrupted by an exception (typically ssl.SSLError with SSL_ERROR_WANT_READ/WRITE). Update consumer/producer registration. :raises Exception: error that signals that connection needs to be aborted """ next_produce_on_writable = None # None means no need to produce try: super(_AsyncSSLTransport, self)._produce() except ssl.SSLError as error: if error.errno == ssl.SSL_ERROR_WANT_READ: # Looks like SSL re-negotiation _LOGGER.debug('SSL emitter wants read: %s', self._sock) next_produce_on_writable = False elif error.errno == ssl.SSL_ERROR_WANT_WRITE: _LOGGER.debug('SSL emitter wants write: %s', self._sock) next_produce_on_writable = True else: _LOGGER.exception( '_AsyncBaseTransport._produce() failed, aborting ' 'connection: error=%r; sock=%s; Caller\'s stack:\n%s', error, self._sock, ''.join( traceback.format_exception(*sys.exc_info()))) raise # let outer catch block abort the transport else: # No exception, so everything must have been written to the socket assert not self._tx_buffers, ( '_AsyncSSLTransport._produce(): no exception from parent ' 'class, but data remains in _tx_buffers.', len( self._tx_buffers)) # Update producer registration if self._tx_buffers: assert next_produce_on_writable is not None, ( '_AsyncSSLTransport._produce(): next_produce_on_writable is ' 'still None', self._state) if next_produce_on_writable: if not self._ssl_writable_action: self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable) self._ssl_writable_action = self._produce # NOTE: can't use identity check, it fails for instance methods if self._ssl_readable_action == self._produce: # pylint: disable=W0143 self._nbio.remove_reader(self._sock.fileno()) self._ssl_readable_action = None else: # WANT_READ if not self._ssl_readable_action: self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable) self._ssl_readable_action = self._produce if self._ssl_writable_action: self._nbio.remove_writer(self._sock.fileno()) self._ssl_writable_action = None else: # NOTE: can't use identity check, it fails for instance methods if self._ssl_readable_action == self._produce: # pylint: disable=W0143 self._nbio.remove_reader(self._sock.fileno()) self._ssl_readable_action = None assert self._ssl_writable_action != self._produce, ( # pylint: disable=W0143 '_AsyncSSLTransport._produce(): with empty tx_buffers, ' 'writable_action cannot be _produce when readable is ' '_produce', self._state) else: # NOTE: can't use identity check, it fails for instance methods assert self._ssl_writable_action == self._produce, ( # pylint: disable=W0143 '_AsyncSSLTransport._produce(): with empty tx_buffers, ' 'expected writable_action as _produce when readable_action ' 'is not _produce', 'writable_action:', self._ssl_writable_action, 'readable_action:', self._ssl_readable_action, 'state:', self._state) self._ssl_writable_action = None self._nbio.remove_writer(self._sock.fileno()) # Update consumer registration if not self._ssl_readable_action: self._ssl_readable_action = self._consume self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable) # In case input SSL data records have been buffered self._nbio.add_callback_threadsafe(self._on_socket_readable) elif self._sock.pending(): self._nbio.add_callback_threadsafe(self._on_socket_readable)