Use async-await instead of combinators in main.rs
This commit is contained in:
parent
c2ebc19134
commit
936d12f4a0
90
src/main.rs
90
src/main.rs
|
@ -156,54 +156,50 @@ async fn main() -> Result<(), Error> {
|
||||||
|
|
||||||
// Once both are done, begin proxy duty
|
// Once both are done, begin proxy duty
|
||||||
let config = config.clone();
|
let config = config.clone();
|
||||||
let f = future::try_join(client, server)
|
tokio::spawn(async move {
|
||||||
.and_then(move |(client, server)| {
|
let (client, server) = future::try_join(client, server).await?;
|
||||||
let (client_sink, client_stream) = client.split();
|
let (client_sink, client_stream) = client.split();
|
||||||
let client_sink = client_sink.with(|m: ControlPacket<Clientbound>| {
|
let client_sink = client_sink.with(|m: ControlPacket<Clientbound>| {
|
||||||
let m = RawControlPacket::from(m);
|
let m = RawControlPacket::from(m);
|
||||||
let mut header = BytesMut::with_capacity(6);
|
let mut header = BytesMut::with_capacity(6);
|
||||||
header.put_u16(m.id);
|
header.put_u16(m.id);
|
||||||
header.put_u32(m.bytes.len() as u32);
|
header.put_u32(m.bytes.len() as u32);
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
buf.extend(header);
|
buf.extend(header);
|
||||||
buf.extend(m.bytes);
|
buf.extend(m.bytes);
|
||||||
future::ready(Ok::<_, Error>(Message::Binary(buf)))
|
future::ready(Ok::<_, Error>(Message::Binary(buf)))
|
||||||
});
|
});
|
||||||
let client_stream = client_stream.err_into().try_filter_map(|m| {
|
let client_stream = client_stream.err_into().try_filter_map(|m| {
|
||||||
future::ok(match m {
|
future::ok(match m {
|
||||||
Message::Binary(b) if b.len() >= 6 => {
|
Message::Binary(b) if b.len() >= 6 => {
|
||||||
let id = BigEndian::read_u16(&b);
|
let id = BigEndian::read_u16(&b);
|
||||||
// b[2..6] is length which is implicit in websocket msgs
|
// b[2..6] is length which is implicit in websocket msgs
|
||||||
let bytes = Bytes::from(b).slice(6..);
|
let bytes = Bytes::from(b).slice(6..);
|
||||||
RawControlPacket { id, bytes }.try_into().ok()
|
RawControlPacket { id, bytes }.try_into().ok()
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
let (server_sink, server_stream) = server.split();
|
|
||||||
let server_sink = server_sink.sink_err_into();
|
|
||||||
let server_stream = server_stream.err_into();
|
|
||||||
|
|
||||||
Connection::new(
|
|
||||||
config,
|
|
||||||
client_sink,
|
|
||||||
client_stream,
|
|
||||||
server_sink,
|
|
||||||
server_stream,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.or_else(move |err| {
|
|
||||||
future::ready({
|
|
||||||
if err.is_connection_closed() {
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
println!("Error on connection {}: {:?}", addr, err);
|
|
||||||
Err(())
|
|
||||||
}
|
}
|
||||||
|
_ => None,
|
||||||
})
|
})
|
||||||
})
|
});
|
||||||
.map_ok(move |()| println!("Client connection closed: {}", addr));
|
|
||||||
tokio::spawn(f);
|
let (server_sink, server_stream) = server.split();
|
||||||
|
let server_sink = server_sink.sink_err_into();
|
||||||
|
let server_stream = server_stream.err_into();
|
||||||
|
|
||||||
|
Connection::new(
|
||||||
|
config,
|
||||||
|
client_sink,
|
||||||
|
client_stream,
|
||||||
|
server_sink,
|
||||||
|
server_stream,
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
println!("Client connection closed: {}", addr);
|
||||||
|
|
||||||
|
Ok::<_, Error>(())
|
||||||
|
}.unwrap_or_else(move |err| {
|
||||||
|
if !err.is_connection_closed() {
|
||||||
|
println!("Error on connection {}: {:?}", addr, err);
|
||||||
|
}
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue