//! The 'exec' module lets you run a command in another container through the apiserver. A
//! WebSocket is used for communication with the server. Process input and output is sent back and
//! forth directly through a binary channel, and control messages are sent through a multiplexed
//! text channel.
// Implementation note: the main job of this module is managing communication to and from the
// server through a WebSocket. This is accomplished mainly with threads and channels - a thread is
// started to manage each particular resource, like input, output, signals, heartbeat, etc. If it
// needs to send messages to the server, it's given a channel to the server. If the caller needs to
// hear back from the thread, it's given back a channel.
//
// This behavior is encapsulated in structs. For example, there's a Heartbeat struct; you create
// it and give it a channel it can use to send to the server, it starts a thread, and you get back
// the struct, which contains a channel that tells you if the heartbeat dies.
use futures::{Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use futures_channel::{mpsc, oneshot};
use libc::{ioctl, winsize as WinSize, STDOUT_FILENO, TIOCGWINSZ as GetWinSize};
use log::{debug, error, trace, warn};
use model::exec::{ClientMessage, Initialize, ServerMessage, Size};
use retry_read::RetryRead;
use signal_hook::{consts::signal, iterator::Signals};
use snafu::{OptionExt, ResultExt};
use std::ffi::OsString;
use std::io::Read;
use std::os::unix::io::RawFd;
use std::path::Path;
use std::pin::Pin;
use std::process;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
};
use std::thread::{self, sleep};
use std::time::{Duration, Instant};
use tokio::io::AsyncWriteExt;
use tokio_tungstenite::tungstenite::{
protocol::{frame::coding::CloseCode, CloseFrame, Message},
Error as WsError,
};
mod connect;
mod terminal;
use connect::websocket_connect;
use terminal::Terminal;
/// To guard against stale connections, we send ping and pong messages through the channel
/// regularly as a 'heartbeat'; this is how often we send them.
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(2);
/// If we haven't heard from the server in this much time, we consider it gone and we stop.
const SERVER_TIMEOUT: Duration = Duration::from_secs(10);
/// This is the main entry point. We start a connection with the server, request a command be run,
/// set up helper threads to manage communication, and wait for a result.
pub async fn exec
(
socket_path: P,
command: Vec,
target: String,
tty: Option,
) -> Result<()>
where
P: AsRef,
{
// We want to send the user's input to the server untouched; for interactive use cases, this
// means we need to set the terminal to 'raw mode' so that certain keystrokes aren't
// interpreted and turned into signals, etc. The Terminal type manages that for us, and resets
// the terminal when it's dropped later. We set this up first so that we don't unnecessarily
// talk to the server if it fails.
let terminal = Terminal::new(tty).context(error::TerminalSnafu)?;
// Connect to the server over the Unix-domain socket and upgrade to a WebSocket.
let ws_stream = websocket_connect(socket_path, "/exec")
.await
.context(error::ConnectSnafu)?;
// We're going to split the stream into write and read halves so we can manage them with
// separate threads, which simplifies the use of blocking calls, not requiring a totally new
// async infrastructure.
let (write, read) = ws_stream.split();
// We make a multi-producer channel that forwards anything it receives to the WebSocket; we can
// share the transmission end of the channel with any number of threads that need to send
// messages to the server.
let (ws_tx, ws_rx) = mpsc::unbounded();
let forward_to_ws = ws_rx.map(Ok).forward(write);
debug!("Spawning task to write to WebSocket");
tokio::spawn(forward_to_ws);
// The first thing we want to send is an initialize message that tells the server what program
// we want to run, what container to run it in, and whether we want a TTY. (It's important
// not to send other types of messages first or the server won't have a process to act on and
// will reject us. It'd be nice to send initialization parameters in the HTTP request body,
// but not all WebSocket clients support it.)
debug!(
"Sending initialize request for target '{}' with tty: {} and command: {:?}",
target,
terminal.tty().is_some(),
command
);
let init = Initialize {
command,
target,
tty: terminal.tty().clone(),
};
// Control messages go to the server in a text channel, so we serialize to JSON before sending.
let msg =
serde_json::to_string(&ClientMessage::Initialize(init)).context(error::SerializeSnafu)?;
ws_tx
.unbounded_send(Message::Text(msg))
.context(error::SendMessageSnafu {
kind: "initialization",
})?;
// Now that the server knows what we want, we set up helper threads to manage communication.
// First, a heartbeat type that regularly pings the server and keeps track of responses.
let mut heartbeat = Heartbeat::new(ws_tx.clone());
// Next, a type that watches for signals to the local process, and either forwards them to the
// server (e.g. if you change your window size) or ends communication (e.g. for SIGTERM).
let mut signal_handler = HandleSignals::new(ws_tx.clone())?;
// We don't want to overload the server with our process input. It sends us capacity updates
// to let us know how many more messages we can send before we should wait. We keep track of
// that capacity in an AtomicCapacity that we can share across threads - the one where we
// receive messages from the server, and the one that's reading input to send to the server.
let capacity = Arc::new(AtomicCapacity::default());
let capacity_reader = Arc::clone(&capacity);
// Start a thread that reads input from the user and sends it across the WebSocket, waiting for
// capacity between reads if necessary.
let mut read_from_user =
ReadFromUser::new(ws_tx.clone(), capacity_reader, terminal.tty().is_some());
// Start a future that reads the stream of messages from the server.
let mut read_from_server = ReadFromServer::new(read, heartbeat.setter, capacity);
// We're all set up! Wait for something that indicates we're done.
debug!("Waiting for completion: server, signal, heartbeat, or read error");
// Store the signal number, if that's why we stop.
let mut signal_ret = None;
// We drop Terminal early in each branch so we can print results cleanly.
tokio::select! {
// This is the normal case; the server finishes running the program.
res = &mut read_from_server.future => {
drop(terminal);
debug!("Server read completed");
// If our ReadFromServer future hit an error, log it, except for the special case of a
// Close error, which is just an empty marker that we're done.
if let Err(e) = res {
let msg = e.to_string();
if !msg.is_empty() {
error!("{}", e);
}
}
}
// Stop if we fail to read input.
// Match against Ok(err) because the Err case means the other end of the channel was
// dropped; that would imply the ReadFromUser thread was dropped, but that doesn't mean the
// process is done, just that our input is done.
Ok(err) = &mut read_from_user.error_rx => {
drop(terminal);
Err(err)?;
}
// Stop if we receive a terminal signal.
signal = &mut signal_handler.signal_rx => {
drop(terminal);
debug!("Received signal: {:?}", signal);
signal_ret = Some(signal);
}
// Stop if the server heartbeat dies.
_ = &mut heartbeat.finished_rx => {
drop(terminal);
warn!("Server heartbeat died");
}
}
// Determine how to exit based on the information we got back from the server, or from a
// local signal.
if let Some(Some(ret)) = read_from_server.ret_rx.next().now_or_never() {
match ret.code {
// The connection is closing normally, we expect the process exit code in the reason message.
CloseCode::Normal => {
if !ret.reason.is_empty() {
// This is the normal case where the server gives us the exit code of the process.
if let Ok(exit_code) = ret.reason.parse::() {
process::exit(i32::from(exit_code))
}
}
// If there is no exit code in the reason message, we assume the worst and exit 1.
warn!("Connection close reason: {}", ret.reason);
process::exit(1)
}
// We don't expect any other CloseCode in normal operation. The server will send
// specific CloseCodes if the client disobeyed protocol, but we obey. The server can
// also send a generic Error if it's unhealthy.
_ => {
if !ret.reason.is_empty() {
warn!("Connection close reason: {}", ret.reason);
}
process::exit(1)
}
}
} else if let Some(Ok(signal)) = signal_ret {
// Use shell-style return codes for signals.
process::exit(128 + signal);
} else {
warn!("Didn't receive a return code or signal; unsure what happened");
process::exit(1)
}
}
/// ReadFromServer is responsible for handling WebSocket messages received from the server.
struct ReadFromServer {
/// Represents the task that handles the stream of server messages; when it completes, either
/// the server has closed the connection or we've hit an error.
future: Pin>>>,
/// If the server sends us a reason for closing the connection (which normally includes the
/// return code of the command) we'll forward it on this channel.
ret_rx: mpsc::UnboundedReceiver>,
}
impl ReadFromServer {
/// Parameters:
/// * read: The stream of messages from the server.
///
/// * heartbeat_setter: An atomic handle to a timestamp; this will be updated whenever we
/// receive a ping or pong from the server so we can make sure the connection isn't stale.
///
/// * capacity: When the server sends a capacity update, we update this AtomicCapacity, so we
/// can make sure we're not sending (or even reading) data the server can't handle.
fn new(
read: impl Stream- > + 'static,
heartbeat_setter: Arc>,
capacity: Arc,
) -> Self {
// Create a channel we use to tell the caller if we get a return value from the server.
let (ret_tx, ret_rx) = mpsc::unbounded();
let future = Self::read_from_server(read, heartbeat_setter, ret_tx, capacity);
Self { future, ret_rx }
}
fn read_from_server(
read: impl TryStream + 'static,
heartbeat_setter: Arc>,
ret_tx: mpsc::UnboundedSender>,
capacity: Arc,
) -> Pin>>> {
// Turn tungstenite errors into our own error type.
read.err_into::()
// Process each message from the server, stopping on Close or error.
.try_for_each(move |ws_msg| {
// For ownership reasons, make copies of the atomic handles that can be passed into
// the async closure.
let heartbeat_setter = heartbeat_setter.clone();
let capacity = capacity.clone();
let ret_tx = ret_tx.clone();
async move {
match ws_msg {
// Binary messages represent process output, not encoded in any way. Write
// it to stdout.
Message::Binary(data) => {
trace!("Received {} bytes of output from server", data.len());
let mut stdout = tokio::io::stdout();
stdout.write_all(&data).await.context(error::WriteOutputSnafu)?;
// May not be a full line of output, so flush any bytes we got. Failure here
// isn't worthy of stopping the whole process.
let _ = stdout.flush().await;
}
// tokio-tungstenite replies to ping with pong; we just update our heartbeat.
Message::Ping(_) | Message::Pong(_) => {
// If we fail to get the mutex, the heartbeat thread has panicked, which means
// we'll no longer send pings to the server, and it'll disconnect us at some
// point. Might as well try to finish our processing in the meantime.
if let Ok(mut hb) = heartbeat_setter.lock() {
trace!("Got ping/pong from server, updating heartbeat");
*hb = Instant::now();
}
}
// The server requested we close the connection, so we stop processing.
// Usually it includes the return code of the requested process.
Message::Close(c) => {
if let Some(ret) = c {
// If we fail to send the return code, there's nothing we can do to rectify
// the situation, and this is a Close so we definitely want to return below
// anyway.
let _ = ret_tx.unbounded_send(ret);
}
return error::CloseSnafu.fail();
}
// Text messages represent encoded control messages from the server.
Message::Text(raw_msg) => {
let server_message =
serde_json::from_str(&raw_msg).context(error::DeserializeSnafu)?;
match server_message {
// Capacity messages tell us how many messages the server is
// willing to receive before it rejects us.
ServerMessage::Capacity(new) => {
debug!(
"Received capacity update from server: {} max outstanding, {} written",
new.max_messages_outstanding,
new.messages_written
);
capacity
.max_messages_outstanding
.store(new.max_messages_outstanding, Ordering::SeqCst);
capacity
.messages_written
.store(new.messages_written, Ordering::SeqCst);
}
}
}
}
Ok(())
}
})
// This puts the future in a Pin; we use Box so we don't have to name the exact
// future type, and Pin is required for tokio to select! it.
.boxed_local()
}
}
/// ReadFromUser is responsible for reading user input from stdin and sending it to the given
/// channel so it can be forwarded to the server.
struct ReadFromUser {
/// If we fail to read input, we'll return the error on this channel so the client can be
/// stopped.
error_rx: oneshot::Receiver,
}
impl ReadFromUser {
/// Parameters:
/// * stdin_tx: The channel to which we should send messages containing user input.
///
/// * capacity_reader: We'll only read input when the server has capacity, according to this
/// parameter, so that we don't unnecessarily fill buffers or overwhelm the server.
///
/// * is_tty: whether input is coming from a TTY; think of it as whether the command is
/// interactive. If so, we read a byte at a time and send it immediately to the server so that
/// things like tab completion work.
fn new(
stdin_tx: mpsc::UnboundedSender,
capacity_reader: Arc,
is_tty: bool,
) -> Self {
// Create a channel we use to tell the caller if reading fails.
let (error_tx, error_rx) = oneshot::channel();
debug!("Spawning thread to read from user");
let stdin_fn = if is_tty {
Self::read_stdin_tty
} else {
Self::read_stdin
};
thread::spawn(move || {
if let Err(e) = stdin_fn(stdin_tx, capacity_reader) {
let _ = error_tx.send(e);
}
});
Self { error_rx }
}
/// Read from stdin with the lowest possible latency, sending each byte to the server.
fn read_stdin_tty(
tx: mpsc::UnboundedSender,
capacity: Arc,
) -> Result<()> {
let mut stdin = std::io::stdin();
// Keep track of the number of messages we've read. We compare this to the number of
// messages the server has written, as received in its regular capacity update messages, so
// that we don't overwhelm the server.
let mut messages_read = 0u64;
loop {
// Wait for server to have capacity for writes before reading; don't give "false hope" to
// whatever's writing to our stdin that it'll be read until there's room for it.
// (Note: we're unlikely to hit this interactively, which is the primary use for TTY.)
Self::wait_for_capacity(messages_read, &capacity)?;
// Read a byte at a time.
let mut buf = [0; 1];
match stdin.read_exact(&mut buf) {
Ok(()) => {
messages_read += 1;
// Send the data to the server in a Binary message without encoding.
tx.unbounded_send(Message::Binary(Vec::from(buf)))
.context(error::SendMessageSnafu { kind: "user input" })?;
}
// We don't normally get Err, since the server will close connection first, but for
// completeness...
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
debug!("Finished reading input");
// If we can, send a ContentComplete message to the server so we can exit more
// cleanly, but either way we're done. (Again, we shouldn't get here.)
match serde_json::to_string(&ClientMessage::ContentComplete) {
Ok(msg) => {
if let Err(e) = tx.unbounded_send(Message::Text(msg)) {
warn!("Unable to send ContentComplete to server, may hang if process doesn't exit: {}", e);
}
}
Err(e) => warn!("Unable to serialize ContentComplete, may hang if process doesn't exit: {}", e),
}
return Ok(());
} else {
// Any error other than EOF is a real read error.
Err(e).context(error::ReadFromUserSnafu)?
}
}
}
}
}
/// Read from stdin in bulk, sending larger batches of data at a time.
fn read_stdin(tx: mpsc::UnboundedSender, capacity: Arc) -> Result<()> {
let mut stdin = std::io::stdin();
// Keep track of the number of messages we've read. We compare this to the number of
// messages the server has written, as received in its regular capacity update messages, so
// that we don't overwhelm the server.
let mut messages_read = 0u64;
loop {
// Wait for server to have capacity for writes before reading; don't give "false hope" to
// whatever's writing to our stdin that it'll be read until there's room for it.
Self::wait_for_capacity(messages_read, &capacity)?;
// Read a batch of data at a time; 4k is a balanced number for small and large jobs.
let mut buf = [0; 4096];
let count = stdin
.retry_read(&mut buf)
.context(error::ReadFromUserSnafu)?;
// A read of 0 indicates EOF, so we're done.
if count == 0 {
break;
}
messages_read += 1;
// Send the data to the server in a Binary message without encoding.
let msg = Vec::from(&buf[..count]);
tx.unbounded_send(Message::Binary(msg))
.context(error::SendMessageSnafu { kind: "user input" })?;
}
debug!("Finished reading input");
// Send a ContentComplete message to the server so it can exit the process more cleanly.
// This is more important than the TTY case; interactive use typically has users typing
// exit, or quit, or ctrl-d... noninteractive programs typically wait for EOF.
let msg = serde_json::to_string(&ClientMessage::ContentComplete)
.context(error::SerializeSnafu)?;
tx.unbounded_send(Message::Text(msg))
.context(error::SendMessageSnafu {
kind: "content complete",
})?;
Ok(())
}
/// Sleeps until the server has capacity to receive more process input.
///
/// We know how many messages we've read from user input, and the AtomicCapacity is updated any
/// time the server sends us a capacity update. We compare read count to written count to know
/// how many messages the server has yet to write, and if that's over the maximum number of
/// messages the server wants outstanding, we wait. (The server will terminate us otherwise.)
fn wait_for_capacity(messages_read: u64, capacity: &Arc) -> Result<()> {
let mut waited = 0u64;
loop {
let max_outstanding = capacity.max_messages_outstanding.load(Ordering::SeqCst);
let messages_written = capacity.messages_written.load(Ordering::SeqCst);
// Check how many messages are currently waiting to be written; read - written.
// If the server has written more than we've read, something is quite wrong!
let messages_outstanding =
messages_read
.checked_sub(messages_written)
.context(error::ServerCountSnafu {
messages_read,
messages_written,
})?;
// If there's capacity, we're done waiting.
if messages_outstanding <= max_outstanding {
break;
}
// Occasionally log that we're still waiting, if someone is watching at trace level.
waited += 1;
if waited % 100 == 0 {
trace!("Waiting for server capacity...");
}
sleep(Duration::from_millis(10));
}
trace!("Server capacity OK, reading input");
Ok(())
}
}
/// AtomicCapacity is used to track the numbers we receive in capacity updates from the server in a
/// way that can be shared across our threads.
struct AtomicCapacity {
/// The server will reject us if we have more than this number of input messages outstanding.
max_messages_outstanding: AtomicU64,
/// The number of messages that the server has confirmed it's written. Messages are always
/// handled in order, so we can directly compare this to the number of inputs we've read.
messages_written: AtomicU64,
}
impl Default for AtomicCapacity {
fn default() -> Self {
Self {
// We assume 0 capacity until the server tells us otherwise so that we don't send data
// the server isn't ready to process.
max_messages_outstanding: AtomicU64::new(0),
messages_written: AtomicU64::new(0),
}
}
}
/// Heartbeat is responsible for confirming our connection to the server isn't stale. We ping the
/// server regularly so it knows we're alive, and we confirm that the server has pinged us recently
/// so we know it's alive.
struct Heartbeat {
/// An atomic handle to a timestamp; this should be updated whenever we receive a ping or pong
/// from the server so we can make sure the connection isn't stale.
setter: Arc>,
/// If the heartbeat dies, we send a message on this channel so the client can stop.
finished_rx: oneshot::Receiver<()>,
}
impl Heartbeat {
/// Parameters:
/// * ping_tx: The channel to which we should send ping messages.
fn new(ping_tx: mpsc::UnboundedSender) -> Self {
// Create the Instant we use to track when we last heard from the server.
let getter = Arc::new(Mutex::new(Instant::now()));
// Create another handle to the Instant that the caller uses to update the Instant.
let setter = getter.clone();
// Create a channel we use to tell the caller when the heartbeat dies.
let (finished_tx, finished_rx) = oneshot::channel();
debug!("Spawning heartbeat thread");
thread::spawn(move || Self::heartbeat(ping_tx, getter, finished_tx));
Self {
setter,
finished_rx,
}
}
fn heartbeat(
ping_tx: mpsc::UnboundedSender,
heartbeat_getter: Arc>,
finished_tx: oneshot::Sender<()>,
) {
// Runs forever, unless we don't hear from the server for longer than SERVER_TIMEOUT, or if
// the thread that updates the heartbeat dies.
loop {
sleep(HEARTBEAT_INTERVAL);
match heartbeat_getter.lock() {
Ok(hb) => {
if Instant::now().duration_since(*hb) > SERVER_TIMEOUT {
break;
}
}
Err(_) => {
// If we fail to get the mutex, the thread reading from the WebSocket has
// panicked, so there's no more need for a heartbeat; we're dead.
break;
}
}
// There's not much we can do if we fail to send a ping; we're progressing toward a
// timeout in any case, so we'll naturally do the right thing.
let _ = ping_tx.unbounded_send(Message::Ping(vec![]));
}
// Tell the caller the heartbeat died.
let _ = finished_tx.send(());
}
}
/// HandleSignals is responsible for managing non-terminal signals (like when your window changes
/// size) and alerting the caller for terminal signals.
struct HandleSignals {
/// If a terminal signal is received, its value is sent over this channel.
signal_rx: oneshot::Receiver,
}
impl HandleSignals {
/// Parameters:
/// * winch_tx: The channel to which we should send window size change messages.
fn new(winch_tx: mpsc::UnboundedSender) -> Result {
// Create a channel we use to tell the caller when we receive a terminal signal.
let (signal_tx, signal_rx) = oneshot::channel();
// Set up the signal handler; do this before starting a thread so we can die quickly on
// failure.
use signal::*;
let signals = Signals::new([SIGWINCH, SIGTERM, SIGINT, SIGQUIT])
.context(error::HandleSignalsSnafu)?;
debug!("Spawning thread to manage signals");
thread::spawn(move || {
if let Err(e) = Self::handle_signals(signals, winch_tx, signal_tx) {
error!("Signal manager failed: {}", e);
}
});
Ok(Self { signal_rx })
}
fn handle_signals(
mut signals: Signals,
winch_tx: mpsc::UnboundedSender,
signal_tx: oneshot::Sender,
) -> Result<()> {
use signal::*;
loop {
// Block until our process receives a signal.
for signal in signals.wait() {
if signal == SIGWINCH {
// Window size changes can happen any number of times; send an update to the
// server and wait for more signals.
Self::send_winch(&winch_tx);
} else {
// Anything else is terminal; notify the caller and exit.
signal_tx
.send(signal)
.ok()
.context(error::SendSignalSnafu { signal })?;
// The signal and our handler have done their job, it's not an error.
return Ok(());
}
}
}
}
/// Try to send a window size update to the server. We don't consider window size updates to
/// be critical, since the program is still functioning, so we don't return errors.
fn send_winch(tx: &mpsc::UnboundedSender) {
if let Some(winsize) = get_winsize(STDOUT_FILENO) {
debug!(
"Sending new window size to server: {} cols {} rows",
winsize.cols, winsize.rows
);
if let Ok(msg) = serde_json::to_string(&ClientMessage::Winch(winsize)) {
let _ = tx.unbounded_send(Message::Text(msg));
}
}
}
}
/// Get the current window size of the user's terminal, if possible. We don't consider window size
/// to be critical, since the program is still functioning, so we return Option rather than Result.
fn get_winsize(fd: RawFd) -> Option {
let mut winsize = WinSize {
ws_row: 0,
ws_col: 0,
ws_xpixel: 0, // unused
ws_ypixel: 0, // unused
};
// unsafe because ioctls can do any number of crazy things and this is a libc call, but it's
// about as safe an ioctl as there is.
let ret = unsafe { ioctl(fd, GetWinSize, &mut winsize) };
if ret != 0 {
debug!("Failed to get window size");
return None;
}
// Convert to our type that we can serialize for the server.
Some(Size::from(winsize))
}
mod error {
use super::{connect, mpsc, terminal, Message};
use snafu::{IntoError, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(super)))]
pub enum Error {
// This is used as a sort of marker; the user doesn't need wording about the connection
// being closed because the process will end, and if because of an error, they'll see that.
#[snafu(display(""))]
Close,
// This is from our own module which includes enough context.
#[snafu(display("{}", source))]
Connect { source: connect::Error },
#[snafu(display("Failed to deserialize message from server: {}", source))]
Deserialize { source: serde_json::Error },
#[snafu(display("Failed to set up signal handler: {}", source))]
HandleSignals { source: std::io::Error },
#[snafu(display("Failed to read input: {}", source))]
ReadFromUser { source: std::io::Error },
#[snafu(display("Failed to read from WebSocket: {}", source))]
ReadWebSocket {
source: tokio_tungstenite::tungstenite::Error,
},
#[snafu(display("Failed to send {} message to server: {}", kind, source))]
SendMessage {
kind: String,
source: mpsc::TrySendError,
},
#[snafu(display("Received signal {}", signal))]
SendSignal { signal: i32 },
#[snafu(display(
"Server said {} messages written, but we've only read {}? Logic error!",
messages_written,
messages_read
))]
ServerCount {
messages_read: u64,
messages_written: u64,
},
#[snafu(display("Failed to serialize message to server: {}", source))]
Serialize { source: serde_json::Error },
// This is from our own module which includes enough context.
#[snafu(display("{}", source))]
Terminal { source: terminal::Error },
#[snafu(display("Failed to write output: {}", source))]
WriteOutput { source: std::io::Error },
}
// This allows for the nice usage of err_into() on our WebSocket stream.
impl From for Error {
fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
ReadWebSocketSnafu.into_error(e)
}
}
}
pub use error::Error;
type Result = std::result::Result;