import copy import os import pytest from configuration import available_ports, ALL_TEST_CURVES, ALL_TEST_CERTS, TLS13_CIPHERS from common import ProviderOptions, Protocols, Curves, data_bytes from fixtures import managed_process # lgtm [py/unused-import] from providers import Provider, S2N as S2NBase, OpenSSL as OpenSSLBase from utils import invalid_test_parameters, get_parameter_name, to_bytes from test_hello_retry_requests import S2N_HRR_MARKER TICKET_FILE = 'ticket' EARLY_DATA_FILE = 'early_data' MAX_EARLY_DATA = 500 # Arbitrary largish number DATA_TO_SEND = data_bytes(500) # Arbitrary large number NUM_RESUMES = 5 # Hardcoded for s2nc --reconnect NUM_CONNECTIONS = NUM_RESUMES + 1 # resumes + initial S2N_DEFAULT_CURVE = Curves.X25519 # We have no plans to support this curve any time soon S2N_UNSUPPORTED_CURVE = 'X448' S2N_HRR_CURVES = list( curve for curve in ALL_TEST_CURVES if curve != S2N_DEFAULT_CURVE) S2N_EARLY_DATA_MARKER = to_bytes("WITH_EARLY_DATA") S2N_EARLY_DATA_RECV_MARKER = "Early Data received: " S2N_EARLY_DATA_STATUS_MARKER = "Early Data status: {status}" S2N_EARLY_DATA_ACCEPTED_MARKER = S2N_EARLY_DATA_STATUS_MARKER.format( status="ACCEPTED") S2N_EARLY_DATA_REJECTED_MARKER = S2N_EARLY_DATA_STATUS_MARKER.format( status="REJECTED") S2N_EARLY_DATA_NOT_REQUESTED_MARKER = S2N_EARLY_DATA_STATUS_MARKER.format( status="NOT REQUESTED") class S2N(S2NBase): def __init__(self, options: ProviderOptions): S2NBase.__init__(self, options) def setup_client(self): cmd_line = S2NBase.setup_client(self) early_data_file = self.options.early_data_file if early_data_file and os.path.exists(early_data_file): cmd_line.extend(['--early-data', early_data_file]) return cmd_line def setup_server(self): cmd_line = S2NBase.setup_server(self) cmd_line.extend(['--max-early-data', self.options.max_early_data]) return cmd_line class OpenSSL(OpenSSLBase): def __init__(self, options: ProviderOptions): OpenSSLBase.__init__(self, options) def setup_client(self): cmd_line = OpenSSLBase.setup_client(self) early_data_file = self.options.early_data_file if early_data_file and os.path.exists(early_data_file): cmd_line.extend(['-early_data', early_data_file]) ticket_file = self.options.ticket_file if ticket_file: if os.path.exists(ticket_file): cmd_line.extend(['-sess_in', ticket_file]) else: cmd_line.extend(['-sess_out', self.options.ticket_file]) return cmd_line def setup_server(self): cmd_line = OpenSSLBase.setup_server(self) if self.options.max_early_data > 0: cmd_line.extend(['-early_data']) return cmd_line # The `-reconnect` option is broken for TLS1.3 in OpenSSL s_client: https://github.com/openssl/openssl/issues/8517 # The `-sess_in`/`-sess_out` options can be used instead, but don't have an s2nc equivalent. # As we add more providers, we may need both a `-reconnect`-like and a `-sess_in/out`-like S2N server test, # but for now we can just use `-sess_in/out` and cover the S2N->S2N case in the S2N client tests. CLIENT_PROVIDERS = [OpenSSL] SERVER_PROVIDERS = [OpenSSL, S2N] def get_early_data_bytes(file_path, early_data_size): early_data = data_bytes(early_data_size) with open(file_path, 'wb') as fout: fout.write(early_data) return early_data def get_ticket_from_s2n_server(options, managed_process, provider, certificate): port = next(available_ports) """ Generally clients start checking for stdin EoF to exit as soon as they finish the handshake. To make sure the client reliably receives the post-handshake NST, do NOT indicate stdin EoF until after some data has been received from the server. """ close_marker_bytes = data_bytes(10) client_options = copy.copy(options) client_options.mode = Provider.ClientMode client_options.port = port server_options = copy.copy(options) server_options.mode = Provider.ServerMode server_options.port = port server_options.key = certificate.key server_options.cert = certificate.cert server_options.data_to_send = close_marker_bytes assert not os.path.exists(options.ticket_file) s2n_server = managed_process( S2N, server_options, send_marker=S2N.get_send_marker(), timeout=10 ) client = managed_process( provider, client_options, close_marker=str(close_marker_bytes), timeout=10 ) for results in s2n_server.get_results(): results.assert_success() for results in client.get_results(): results.assert_success() assert os.path.exists(options.ticket_file) def test_nothing(): """ Sometimes the early data test parameters in combination with the s2n libcrypto results in no test cases existing. In this case, pass a nothing test to avoid marking the entire codebuild run as failed. """ assert True """ Basic S2N server happy case. We make one full connection to get a session ticket with early data enabled, then another resumption connection with early data. """ @pytest.mark.uncollect_if(func=invalid_test_parameters) @pytest.mark.parametrize("cipher", TLS13_CIPHERS, ids=get_parameter_name) @pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name) @pytest.mark.parametrize("certificate", ALL_TEST_CERTS, ids=get_parameter_name) @pytest.mark.parametrize("protocol", [Protocols.TLS13], ids=get_parameter_name) @pytest.mark.parametrize("provider", CLIENT_PROVIDERS, ids=get_parameter_name) @pytest.mark.parametrize("other_provider", [S2N], ids=get_parameter_name) @pytest.mark.parametrize("early_data_size", [int(MAX_EARLY_DATA/2), int(MAX_EARLY_DATA-1), MAX_EARLY_DATA, 1]) def test_s2n_server_with_early_data(managed_process, tmp_path, cipher, curve, certificate, protocol, provider, other_provider, early_data_size): ticket_file = str(tmp_path / TICKET_FILE) early_data_file = str(tmp_path / EARLY_DATA_FILE) early_data = get_early_data_bytes(early_data_file, early_data_size) options = ProviderOptions( port=next(available_ports), cipher=cipher, curve=curve, protocol=protocol, insecure=True, use_session_ticket=True, data_to_send=DATA_TO_SEND, ) options.ticket_file = ticket_file options.early_data_file = early_data_file options.max_early_data = MAX_EARLY_DATA get_ticket_from_s2n_server(options, managed_process, provider, certificate) client_options = copy.copy(options) client_options.mode = Provider.ClientMode server_options = copy.copy(options) server_options.mode = Provider.ServerMode s2n_server = managed_process(S2N, server_options, timeout=10) client = managed_process(provider, client_options, timeout=10) for results in client.get_results(): results.assert_success() for results in s2n_server.get_results(): results.assert_success() assert S2N_EARLY_DATA_MARKER in results.stdout assert (to_bytes(S2N_EARLY_DATA_RECV_MARKER) + early_data) in results.stdout assert to_bytes(S2N_EARLY_DATA_ACCEPTED_MARKER) in results.stdout assert DATA_TO_SEND in results.stdout """ Basic S2N client happy case. The S2N client tests session resumption by repeatedly reconnecting. That means we don't need to manually perform the initial full connection, and there is no external ticket file. """ @pytest.mark.uncollect_if(func=invalid_test_parameters) @pytest.mark.parametrize("cipher", TLS13_CIPHERS, ids=get_parameter_name) @pytest.mark.parametrize("certificate", ALL_TEST_CERTS, ids=get_parameter_name) @pytest.mark.parametrize("protocol", [Protocols.TLS13], ids=get_parameter_name) @pytest.mark.parametrize("provider", SERVER_PROVIDERS, ids=get_parameter_name) @pytest.mark.parametrize("other_provider", [S2N], ids=get_parameter_name) @pytest.mark.parametrize("early_data_size", [int(MAX_EARLY_DATA/2), int(MAX_EARLY_DATA-1), MAX_EARLY_DATA, 1]) def test_s2n_client_with_early_data(managed_process, tmp_path, cipher, certificate, protocol, provider, other_provider, early_data_size): early_data_file = str(tmp_path / EARLY_DATA_FILE) early_data = get_early_data_bytes(early_data_file, early_data_size) options = ProviderOptions( port=next(available_ports), cipher=cipher, protocol=protocol, insecure=True, use_session_ticket=True, reconnect=True, ) options.ticket_file = None options.early_data_file = early_data_file options.max_early_data = MAX_EARLY_DATA client_options = copy.copy(options) client_options.mode = Provider.ClientMode server_options = copy.copy(options) server_options.mode = Provider.ServerMode server_options.key = certificate.key # Required for the initial connection server_options.cert = certificate.cert # Required for the initial connection server_options.reconnects_before_exit = NUM_CONNECTIONS server = managed_process(provider, server_options, timeout=10) s2n_client = managed_process(S2N, client_options, timeout=10) for results in s2n_client.get_results(): results.assert_success() assert S2N_EARLY_DATA_MARKER in results.stdout assert results.stdout.count( to_bytes(S2N_EARLY_DATA_ACCEPTED_MARKER)) == NUM_RESUMES for results in server.get_results(): results.assert_success() assert results.stdout.count(early_data) == NUM_RESUMES """ Verify that the S2N client doesn't request early data when a server doesn't support early data. We repeatedly reconnect with max_early_data set to 0. This is basically a test from test_session_resumption but with validation that no early data is sent. """ @pytest.mark.uncollect_if(func=invalid_test_parameters) @pytest.mark.parametrize("cipher", TLS13_CIPHERS, ids=get_parameter_name) @pytest.mark.parametrize("certificate", ALL_TEST_CERTS, ids=get_parameter_name) @pytest.mark.parametrize("protocol", [Protocols.TLS13], ids=get_parameter_name) @pytest.mark.parametrize("provider", SERVER_PROVIDERS, ids=get_parameter_name) @pytest.mark.parametrize("other_provider", [S2N], ids=get_parameter_name) def test_s2n_client_without_early_data(managed_process, tmp_path, cipher, certificate, protocol, provider, other_provider): early_data_file = str(tmp_path / EARLY_DATA_FILE) early_data = get_early_data_bytes(early_data_file, MAX_EARLY_DATA) options = ProviderOptions( port=next(available_ports), cipher=cipher, protocol=protocol, insecure=True, use_session_ticket=True, reconnect=True, ) options.ticket_file = None options.early_data_file = early_data_file options.max_early_data = 0 client_options = copy.copy(options) client_options.mode = Provider.ClientMode server_options = copy.copy(options) server_options.mode = Provider.ServerMode server_options.key = certificate.key # Required for the initial connection server_options.cert = certificate.cert # Required for the initial connection server_options.reconnects_before_exit = NUM_CONNECTIONS server = managed_process(provider, server_options, timeout=10) s2n_client = managed_process(S2N, client_options, timeout=10) for results in server.get_results(): results.assert_success() assert early_data not in results.stdout for results in s2n_client.get_results(): results.assert_success() assert S2N_EARLY_DATA_MARKER not in results.stdout assert results.stdout.count( to_bytes(S2N_EARLY_DATA_NOT_REQUESTED_MARKER)) == NUM_CONNECTIONS """ Test the S2N server rejecting early data. We do this by disabling early data on the server after the ticket is issued. When the client attempts to use the ticket to send early data, the server rejects the attempt. We can't perform an S2N client version of this test because the S2N client performs its hardcoded reconnects automatically, without any mechanism to modify the connection in between. """ @pytest.mark.flaky(reruns=5) @pytest.mark.uncollect_if(func=invalid_test_parameters) @pytest.mark.parametrize("cipher", TLS13_CIPHERS, ids=get_parameter_name) @pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name) @pytest.mark.parametrize("certificate", ALL_TEST_CERTS, ids=get_parameter_name) @pytest.mark.parametrize("protocol", [Protocols.TLS13], ids=get_parameter_name) @pytest.mark.parametrize("provider", CLIENT_PROVIDERS, ids=get_parameter_name) @pytest.mark.parametrize("other_provider", [S2N], ids=get_parameter_name) @pytest.mark.parametrize("early_data_size", [int(MAX_EARLY_DATA/2), int(MAX_EARLY_DATA-1), MAX_EARLY_DATA, 1]) def test_s2n_server_with_early_data_rejected(managed_process, tmp_path, cipher, curve, certificate, protocol, provider, other_provider, early_data_size): ticket_file = str(tmp_path / TICKET_FILE) early_data_file = str(tmp_path / EARLY_DATA_FILE) get_early_data_bytes(early_data_file, early_data_size) options = ProviderOptions( port=next(available_ports), cipher=cipher, curve=curve, protocol=protocol, insecure=True, use_session_ticket=True, data_to_send=DATA_TO_SEND, ) options.ticket_file = ticket_file options.early_data_file = early_data_file options.max_early_data = MAX_EARLY_DATA get_ticket_from_s2n_server(options, managed_process, provider, certificate) options.max_early_data = 0 client_options = copy.copy(options) client_options.mode = Provider.ClientMode server_options = copy.copy(options) server_options.mode = Provider.ServerMode s2n_server = managed_process(S2N, server_options, timeout=10) client = managed_process(provider, client_options, timeout=10) for results in client.get_results(): results.assert_success() for results in s2n_server.get_results(): results.assert_success() assert S2N_EARLY_DATA_MARKER not in results.stdout assert S2N_HRR_MARKER not in results.stdout assert to_bytes(S2N_EARLY_DATA_RECV_MARKER) not in results.stdout assert to_bytes(S2N_EARLY_DATA_REJECTED_MARKER) in results.stdout assert DATA_TO_SEND in results.stdout """ Test the S2N client attempting to send early data, but the server triggering a hello retry. We trigger the HRR by configuring the server to only accept curves that the S2N client does not send key shares for. """ @pytest.mark.uncollect_if(func=invalid_test_parameters) @pytest.mark.parametrize("cipher", TLS13_CIPHERS, ids=get_parameter_name) @pytest.mark.parametrize("curve", S2N_HRR_CURVES, ids=get_parameter_name) @pytest.mark.parametrize("certificate", ALL_TEST_CERTS, ids=get_parameter_name) @pytest.mark.parametrize("protocol", [Protocols.TLS13], ids=get_parameter_name) @pytest.mark.parametrize("provider", SERVER_PROVIDERS, ids=get_parameter_name) @pytest.mark.parametrize("other_provider", [S2N], ids=get_parameter_name) @pytest.mark.parametrize("early_data_size", [int(MAX_EARLY_DATA/2), int(MAX_EARLY_DATA-1), MAX_EARLY_DATA, 1]) def test_s2n_client_with_early_data_rejected_via_hrr(managed_process, tmp_path, cipher, curve, certificate, protocol, provider, other_provider, early_data_size): if provider == S2N: pytest.skip( "S2N does not respect ProviderOptions.curve, so does not trigger a retry") early_data_file = str(tmp_path / EARLY_DATA_FILE) early_data = get_early_data_bytes(early_data_file, early_data_size) options = ProviderOptions( port=next(available_ports), cipher=cipher, curve=curve, protocol=protocol, insecure=True, use_session_ticket=True, reconnect=True, ) options.ticket_file = None options.early_data_file = early_data_file options.max_early_data = MAX_EARLY_DATA client_options = copy.copy(options) client_options.mode = Provider.ClientMode server_options = copy.copy(options) server_options.mode = Provider.ServerMode server_options.key = certificate.key # Required for the initial connection server_options.cert = certificate.cert # Required for the initial connection server_options.reconnects_before_exit = NUM_CONNECTIONS server = managed_process(provider, server_options, timeout=10) s2n_client = managed_process(S2N, client_options, timeout=10) for results in s2n_client.get_results(): results.assert_success() assert S2N_EARLY_DATA_MARKER not in results.stdout assert S2N_HRR_MARKER in results.stdout assert results.stdout.count( to_bytes(S2N_EARLY_DATA_REJECTED_MARKER)) == NUM_RESUMES for results in server.get_results(): results.assert_success() assert early_data not in results.stdout """ Test the S2N server rejecting early data because of a hello retry request. In order to trigger a successful retry, we need to force the peer to offer us a key share that S2N doesn't support while still supporting at least one curve S2N does support. """ @pytest.mark.uncollect_if(func=invalid_test_parameters) @pytest.mark.parametrize("cipher", TLS13_CIPHERS, ids=get_parameter_name) @pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name) @pytest.mark.parametrize("certificate", ALL_TEST_CERTS, ids=get_parameter_name) @pytest.mark.parametrize("protocol", [Protocols.TLS13], ids=get_parameter_name) @pytest.mark.parametrize("provider", CLIENT_PROVIDERS, ids=get_parameter_name) @pytest.mark.parametrize("other_provider", [S2N], ids=get_parameter_name) @pytest.mark.parametrize("early_data_size", [int(MAX_EARLY_DATA/2), int(MAX_EARLY_DATA-1), MAX_EARLY_DATA, 1]) def test_s2n_server_with_early_data_rejected_via_hrr(managed_process, tmp_path, cipher, curve, certificate, protocol, provider, other_provider, early_data_size): ticket_file = str(tmp_path / TICKET_FILE) early_data_file = str(tmp_path / EARLY_DATA_FILE) early_data = get_early_data_bytes(early_data_file, early_data_size) options = ProviderOptions( port=next(available_ports), cipher=cipher, curve=(S2N_UNSUPPORTED_CURVE + ":" + str(curve)), protocol=protocol, insecure=True, use_session_ticket=True, data_to_send=DATA_TO_SEND, ) options.ticket_file = ticket_file options.early_data_file = early_data_file options.max_early_data = MAX_EARLY_DATA get_ticket_from_s2n_server(options, managed_process, provider, certificate) client_options = copy.copy(options) client_options.mode = Provider.ClientMode server_options = copy.copy(options) server_options.mode = Provider.ServerMode s2n_server = managed_process(S2N, server_options, timeout=10) client = managed_process(provider, client_options, timeout=10) for results in client.get_results(): results.assert_success() assert early_data not in results.stdout for results in s2n_server.get_results(): results.assert_success() assert S2N_EARLY_DATA_MARKER not in results.stdout assert S2N_HRR_MARKER in results.stdout assert to_bytes(S2N_EARLY_DATA_RECV_MARKER) not in results.stdout assert to_bytes(S2N_EARLY_DATA_REJECTED_MARKER) in results.stdout assert DATA_TO_SEND in results.stdout """ Test the S2N server fails if it receives too much early data. """ @pytest.mark.uncollect_if(func=invalid_test_parameters) @pytest.mark.parametrize("cipher", TLS13_CIPHERS, ids=get_parameter_name) @pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name) @pytest.mark.parametrize("certificate", ALL_TEST_CERTS, ids=get_parameter_name) @pytest.mark.parametrize("protocol", [Protocols.TLS13], ids=get_parameter_name) @pytest.mark.parametrize("provider", CLIENT_PROVIDERS, ids=get_parameter_name) @pytest.mark.parametrize("other_provider", [S2N], ids=get_parameter_name) @pytest.mark.parametrize("excess_early_data", [1, 10, MAX_EARLY_DATA]) def test_s2n_server_with_early_data_max_exceeded(managed_process, tmp_path, cipher, curve, certificate, protocol, provider, other_provider, excess_early_data): ticket_file = str(tmp_path / TICKET_FILE) early_data_file = str(tmp_path / EARLY_DATA_FILE) early_data = get_early_data_bytes( early_data_file, MAX_EARLY_DATA + excess_early_data) options = ProviderOptions( port=next(available_ports), cipher=cipher, curve=curve, protocol=protocol, insecure=True, use_session_ticket=True, data_to_send=DATA_TO_SEND, ) options.ticket_file = ticket_file options.early_data_file = early_data_file options.max_early_data = MAX_EARLY_DATA + excess_early_data get_ticket_from_s2n_server(options, managed_process, provider, certificate) options.max_early_data = MAX_EARLY_DATA client_options = copy.copy(options) client_options.mode = Provider.ClientMode server_options = copy.copy(options) server_options.mode = Provider.ServerMode s2n_server = managed_process(S2N, server_options, timeout=10) client = managed_process(provider, client_options, timeout=10) for results in client.get_results(): """ We can't make any assertions about the client exit_code. To avoid blinding delays, s2nd doesn't call s2n_shutdown for a failed negotiation. That means that instead of sending close_notify, we just close the socket. Whether the peer interprets this as a failure or EoF depends on its state. """ assert results.exception is None assert DATA_TO_SEND not in results.stdout for results in s2n_server.get_results(): assert results.exception is None assert results.exit_code != 0 # Full early data should not be reported assert early_data not in results.stdout # Partial early data should be reported assert (to_bytes(S2N_EARLY_DATA_RECV_MARKER) + early_data[:MAX_EARLY_DATA]) in results.stdout assert to_bytes("Bad message encountered") in results.stderr