//! The 'connect' module provides a function for connecting to a WebSocket over a Unix-domain
//! socket, which is a bit more finicky than normal.
use hyper::service::Service;
use hyper_unix_connector::{UnixClient, Uri, UDS};
use log::debug;
use snafu::{ensure, ResultExt};
use std::path::Path;
use tokio_tungstenite::{client_async, tungstenite::http::StatusCode, WebSocketStream};
/// Connects to a WebSocket over the given Unix-domain socket. 'path' is an HTTP request path on
/// the server that allows for WebSocket upgrades, like "/exec".
pub(crate) async fn websocket_connect
(socket_path: P, path: &str) -> Result>
where
P: AsRef,
{
// To talk over a Unix socket, we use hyper-unix-connector, which needs a different type of URI
// than our WebSocket client, tokio-tungstenite. This URI can't contain the schema/host. This
// initially has to be constructed as a hyper-unix-connector URI so we can use the socket, then
// transformed into a hyper URI that the client can accept.
let raw_uri = Uri::new(socket_path.as_ref(), path);
let uri: hyper::Uri = raw_uri.into();
debug!(
"Connecting to {} over {}",
uri,
socket_path.as_ref().display()
);
// We start with a plain HTTP request over the Unix-domain socket so we can upgrade it to a
// WebSocket afterward.
let response = UnixClient.call(uri).await.map_err(|e| {
// hyper-unix-connector doesn't have its own error type; not worth bringing in 'anyhow'
error::ConnectSnafu {
socket: socket_path.as_ref(),
message: e.to_string(),
}
.build()
})?;
// Create a request object that tokio-tungstenite understands, pointed at a local WebSocket
// URI. This is used to create the WebSocket client.
let ws_uri = format!("ws://localhost{}", path);
let ws_request = httparse::Request {
method: Some("GET"),
path: Some(&ws_uri),
version: Some(1), // HTTP/1.1
headers: &mut [],
};
// Now we can use tokio-tungstenite to upgrade the connection to a WebSocket. We get back a
// WebSocket stream that we can use to talk to the server, and the HTTP response.
let (ws_stream, resp) = client_async(ws_request, response)
.await
.context(error::UpgradeSnafu)?;
// We only use the HTTP response to confirm that we switched protocols correctly.
ensure!(
resp.status() == StatusCode::SWITCHING_PROTOCOLS,
error::ProtocolSnafu {
code: resp.status()
}
);
Ok(ws_stream)
}
pub(crate) mod error {
use super::StatusCode;
use snafu::Snafu;
use std::path::PathBuf;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(super)))]
pub enum Error {
#[snafu(display("Failed to connect to server at {}: {}", socket.display(), message))]
Connect { socket: PathBuf, message: String },
#[snafu(display(
"Server did not upgrade to WebSocket; expected 101 Switching Protocols, got {}",
code
))]
Protocol { code: StatusCode },
#[snafu(display("Failed to request upgrade to WebSocket: {}", source))]
Upgrade {
source: tokio_tungstenite::tungstenite::Error,
},
}
}
pub(crate) use error::Error;
type Result = std::result::Result;