diff --git a/src/main.rs b/src/main.rs index db65187..ac248a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -156,54 +156,50 @@ async fn main() -> Result<(), Error> { // Once both are done, begin proxy duty let config = config.clone(); - let f = future::try_join(client, server) - .and_then(move |(client, server)| { - let (client_sink, client_stream) = client.split(); - let client_sink = client_sink.with(|m: ControlPacket| { - let m = RawControlPacket::from(m); - let mut header = BytesMut::with_capacity(6); - header.put_u16(m.id); - header.put_u32(m.bytes.len() as u32); - let mut buf = Vec::new(); - buf.extend(header); - buf.extend(m.bytes); - future::ready(Ok::<_, Error>(Message::Binary(buf))) - }); - let client_stream = client_stream.err_into().try_filter_map(|m| { - future::ok(match m { - Message::Binary(b) if b.len() >= 6 => { - let id = BigEndian::read_u16(&b); - // b[2..6] is length which is implicit in websocket msgs - let bytes = Bytes::from(b).slice(6..); - RawControlPacket { id, bytes }.try_into().ok() - } - _ => None, - }) - }); - - let (server_sink, server_stream) = server.split(); - let server_sink = server_sink.sink_err_into(); - let server_stream = server_stream.err_into(); - - Connection::new( - config, - client_sink, - client_stream, - server_sink, - server_stream, - ) - }) - .or_else(move |err| { - future::ready({ - if err.is_connection_closed() { - Ok(()) - } else { - println!("Error on connection {}: {:?}", addr, err); - Err(()) + tokio::spawn(async move { + let (client, server) = future::try_join(client, server).await?; + let (client_sink, client_stream) = client.split(); + let client_sink = client_sink.with(|m: ControlPacket| { + let m = RawControlPacket::from(m); + let mut header = BytesMut::with_capacity(6); + header.put_u16(m.id); + header.put_u32(m.bytes.len() as u32); + let mut buf = Vec::new(); + buf.extend(header); + buf.extend(m.bytes); + future::ready(Ok::<_, Error>(Message::Binary(buf))) + }); + let client_stream = client_stream.err_into().try_filter_map(|m| { + future::ok(match m { + Message::Binary(b) if b.len() >= 6 => { + let id = BigEndian::read_u16(&b); + // b[2..6] is length which is implicit in websocket msgs + let bytes = Bytes::from(b).slice(6..); + RawControlPacket { id, bytes }.try_into().ok() } + _ => None, }) - }) - .map_ok(move |()| println!("Client connection closed: {}", addr)); - tokio::spawn(f); + }); + + let (server_sink, server_stream) = server.split(); + let server_sink = server_sink.sink_err_into(); + let server_stream = server_stream.err_into(); + + Connection::new( + config, + client_sink, + client_stream, + server_sink, + server_stream, + ).await?; + + println!("Client connection closed: {}", addr); + + Ok::<_, Error>(()) + }.unwrap_or_else(move |err| { + if !err.is_connection_closed() { + println!("Error on connection {}: {:?}", addr, err); + } + })); } }