Switch from websocket
to tungstenite
due to stability concerns (fixes #2)
This commit is contained in:
parent
443712cb22
commit
b630d7b584
4 changed files with 228 additions and 168 deletions
16
src/error.rs
16
src/error.rs
|
@ -1,5 +1,4 @@
|
|||
use futures::sync::mpsc;
|
||||
use websocket;
|
||||
|
||||
// FIXME clean this up
|
||||
|
||||
|
@ -7,12 +6,21 @@ use websocket;
|
|||
pub enum Error {
|
||||
Io(std::io::Error),
|
||||
ServerTls(native_tls::Error),
|
||||
ClientConnection(websocket::result::WebSocketError),
|
||||
ClientConnection(tungstenite::Error),
|
||||
Misc(Box<std::error::Error>),
|
||||
}
|
||||
|
||||
impl From<websocket::result::WebSocketError> for Error {
|
||||
fn from(e: websocket::result::WebSocketError) -> Self {
|
||||
impl Error {
|
||||
pub fn is_connection_closed(&self) -> bool {
|
||||
match self {
|
||||
Error::ClientConnection(tungstenite::Error::ConnectionClosed(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tungstenite::Error> for Error {
|
||||
fn from(e: tungstenite::Error) -> Self {
|
||||
Error::ClientConnection(e)
|
||||
}
|
||||
}
|
||||
|
|
140
src/main.rs
140
src/main.rs
|
@ -14,12 +14,13 @@ extern crate tokio;
|
|||
extern crate tokio_codec;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_tls;
|
||||
extern crate tokio_tungstenite;
|
||||
extern crate tungstenite;
|
||||
extern crate webrtc_sdp;
|
||||
extern crate websocket;
|
||||
|
||||
use argparse::{ArgumentParser, Store};
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::{Buf, BufMut, BytesMut, IntoBuf};
|
||||
use futures::{Future, Sink, Stream};
|
||||
use mumble_protocol::control::ClientControlCodec;
|
||||
use mumble_protocol::control::ControlPacket;
|
||||
|
@ -27,14 +28,16 @@ use mumble_protocol::control::RawControlPacket;
|
|||
use mumble_protocol::Clientbound;
|
||||
use std::convert::Into;
|
||||
use std::convert::TryInto;
|
||||
use std::net::Ipv6Addr;
|
||||
use std::net::ToSocketAddrs;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_codec::Decoder;
|
||||
use tokio_core::reactor::Core;
|
||||
use tokio_tls::TlsConnector;
|
||||
use websocket::async::Server;
|
||||
use websocket::message::OwnedMessage;
|
||||
use websocket::server::InvalidConnection;
|
||||
use tokio_tungstenite::accept_async_with_config;
|
||||
use tungstenite::protocol::Message;
|
||||
use tungstenite::protocol::WebSocketConfig;
|
||||
|
||||
mod connection;
|
||||
mod error;
|
||||
|
@ -82,68 +85,73 @@ fn main() {
|
|||
|
||||
let mut core = Core::new().unwrap();
|
||||
let handle = core.handle();
|
||||
let server = Server::bind(("0.0.0.0", ws_port), &handle).unwrap();
|
||||
let f = server
|
||||
.incoming()
|
||||
.map_err(|InvalidConnection { error, .. }| error)
|
||||
.for_each(move |(upgrade, addr)| {
|
||||
println!("New connection from {}", addr);
|
||||
let server_sock = TcpStream::connect(&upstream_addr);
|
||||
let f = upgrade
|
||||
.use_protocol("binary") // FIXME can we be more specific? *looks at chrome*
|
||||
.accept()
|
||||
.from_err()
|
||||
.join(server_sock.from_err().and_then(move |stream| {
|
||||
let connector: TlsConnector = native_tls::TlsConnector::builder()
|
||||
//.danger_accept_invalid_certs(true)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
connector.connect(upstream_host, stream).from_err()
|
||||
}))
|
||||
.and_then(move |((client, _), server)| {
|
||||
let (client_sink, client_stream) = client.split();
|
||||
// buffered client sink to prevent temporary lag on the control
|
||||
// channel from lagging the otherwise independent audio channel
|
||||
let client_sink =
|
||||
client_sink
|
||||
.buffer(10)
|
||||
.with(|m: ControlPacket<Clientbound>| {
|
||||
let m = RawControlPacket::from(m);
|
||||
let bytes = &m.bytes;
|
||||
let len = bytes.len();
|
||||
let mut buf = BytesMut::with_capacity(6 + len);
|
||||
buf.put_u16_be(m.id);
|
||||
buf.put_u32_be(len as u32);
|
||||
buf.put(bytes);
|
||||
Ok::<OwnedMessage, Error>(OwnedMessage::Binary(
|
||||
buf.freeze().to_vec(),
|
||||
))
|
||||
});
|
||||
let client_stream = client_stream
|
||||
.from_err()
|
||||
.take_while(|m| Ok(!m.is_close()))
|
||||
.filter_map(|m| match m {
|
||||
OwnedMessage::Binary(ref b) if b.len() >= 6 => {
|
||||
let id = BigEndian::read_u16(b);
|
||||
// b[2..6] is length which is implicit in websocket msgs
|
||||
let bytes = b[6..].into();
|
||||
RawControlPacket { id, bytes }.try_into().ok()
|
||||
}
|
||||
_ => None,
|
||||
});
|
||||
let socket_addr = (Ipv6Addr::from(0), ws_port).into();
|
||||
let server = TcpListener::bind(&socket_addr).unwrap();
|
||||
let f = server.incoming().for_each(move |client| {
|
||||
let addr = client.peer_addr().expect("peer to have an address");
|
||||
println!("New connection from {}", addr);
|
||||
|
||||
let server = ClientControlCodec::new().framed(server);
|
||||
let (server_sink, server_stream) = server.split();
|
||||
let server_sink = server_sink.sink_from_err();
|
||||
let server_stream = server_stream.from_err();
|
||||
// Connect to server
|
||||
let server = TcpStream::connect(&upstream_addr)
|
||||
.from_err()
|
||||
.and_then(move |stream| {
|
||||
let connector: TlsConnector = native_tls::TlsConnector::builder()
|
||||
//.danger_accept_invalid_certs(true)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
connector.connect(upstream_host, stream).from_err()
|
||||
})
|
||||
.map(|stream| ClientControlCodec::new().framed(stream));
|
||||
|
||||
Connection::new(client_sink, client_stream, server_sink, server_stream)
|
||||
})
|
||||
.map_err(move |e: Error| println!("Error on connection {}: {:?}", addr, e))
|
||||
.map(move |_| println!("Client connection closed: {}", addr));
|
||||
handle.spawn(f);
|
||||
Ok(())
|
||||
});
|
||||
// Accept client
|
||||
let websocket_config = WebSocketConfig {
|
||||
max_send_queue: Some(10), // can be fairly small as voice is using WebRTC instead
|
||||
max_message_size: Some(0x7f_ffff), // maximum size accepted by Murmur
|
||||
max_frame_size: Some(0x7f_ffff), // maximum size accepted by Murmur
|
||||
};
|
||||
let client = accept_async_with_config(client, Some(websocket_config)).from_err();
|
||||
|
||||
// Once both are done, begin proxy duty
|
||||
let f = client
|
||||
.join(server)
|
||||
.and_then(move |(client, server)| {
|
||||
let (client_sink, client_stream) = client.split();
|
||||
let client_sink = client_sink.with(|m: ControlPacket<Clientbound>| {
|
||||
let m = RawControlPacket::from(m);
|
||||
let mut header = BytesMut::with_capacity(6);
|
||||
header.put_u16_be(m.id);
|
||||
header.put_u32_be(m.bytes.len() as u32);
|
||||
let buf = header.into_buf().chain(m.bytes);
|
||||
Ok::<_, Error>(Message::Binary(buf.collect()))
|
||||
});
|
||||
let client_stream = client_stream.from_err().filter_map(|m| match m {
|
||||
Message::Binary(ref b) if b.len() >= 6 => {
|
||||
let id = BigEndian::read_u16(b);
|
||||
// b[2..6] is length which is implicit in websocket msgs
|
||||
let bytes = b[6..].into();
|
||||
RawControlPacket { id, bytes }.try_into().ok()
|
||||
}
|
||||
_ => None,
|
||||
});
|
||||
|
||||
let (server_sink, server_stream) = server.split();
|
||||
let server_sink = server_sink.sink_from_err();
|
||||
let server_stream = server_stream.from_err();
|
||||
|
||||
Connection::new(client_sink, client_stream, server_sink, server_stream)
|
||||
})
|
||||
.or_else(move |err| {
|
||||
if err.is_connection_closed() {
|
||||
Ok(())
|
||||
} else {
|
||||
println!("Error on connection {}: {:?}", addr, err);
|
||||
Err(())
|
||||
}
|
||||
})
|
||||
.map(move |()| println!("Client connection closed: {}", addr));
|
||||
handle.spawn(f);
|
||||
Ok(())
|
||||
});
|
||||
core.run(f).unwrap();
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue