Update dependencies (most notably futures to 0.3)

This commit is contained in:
Jonas Herzig 2020-04-05 16:13:45 +02:00
parent 8bdd654952
commit cda7f0cd7a
6 changed files with 1055 additions and 1071 deletions

1631
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,20 +6,18 @@ edition = "2018"
[dependencies] [dependencies]
argparse = "0.2.2" argparse = "0.2.2"
bytes = "0.4" bytes = "0.5"
byteorder = "1.2" byteorder = "1.2"
futures = "0.1" futures = { version = "0.3", features = ["compat", "io-compat"] }
tokio = "0.1" tokio = { version = "0.2", features = ["full"] }
tokio-core = "0.1" tokio-util = { version = "0.3", features = ["codec"] }
tokio-codec = "0.1" tokio-tls = "0.3"
tokio-tls = "0.2"
native-tls = "0.2" native-tls = "0.2"
mumble-protocol = { version = "0.2", features = ["webrtc-extensions"] } mumble-protocol = { version = "0.3", features = ["webrtc-extensions"] }
tokio-tungstenite = "0.6" tokio-tungstenite = "0.10"
tungstenite = "0.6" http = "0.2"
rtp = { git = "https://github.com/johni0702/rtp", rev = "ee8be93", features = ["openssl", "tokio"] } tungstenite = "0.10"
# libnice = "0.2" rtp = { git = "https://github.com/johni0702/rtp", rev = "1444b3c", features = ["rfc5764-openssl"] }
libnice = { git = "https://github.com/johni0702/rust-libnice", rev = "1053c81" } libnice = "0.2"
# webrtc-sdp = "0.2" webrtc-sdp = "0.3"
webrtc-sdp = { git = "https://github.com/nils-ohlmeier/rsdparsa/", rev = "ccf6249" }
openssl = "0.10" openssl = "0.10"

View file

@ -1,5 +1,10 @@
use futures::future;
use futures::future::BoxFuture;
use futures::pin_mut;
use futures::ready;
use futures::stream; use futures::stream;
use futures::{Future, Sink, Stream}; use futures::stream::BoxStream;
use futures::{Future, FutureExt, Sink, Stream, StreamExt};
use libnice::ice; use libnice::ice;
use mumble_protocol::control::msgs; use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlPacket; use mumble_protocol::control::ControlPacket;
@ -18,15 +23,18 @@ use rtp::rfc3550::{
RtpPacketReader, RtpPacketWriter, RtpPacketReader, RtpPacketWriter,
}; };
use rtp::rfc5761::{MuxPacketReader, MuxPacketWriter, MuxedPacket}; use rtp::rfc5761::{MuxPacketReader, MuxPacketWriter, MuxedPacket};
use rtp::rfc5764::{DtlsSrtp, DtlsSrtpHandshakeResult}; use rtp::rfc5764::DtlsSrtp;
use rtp::traits::{ReadPacket, WritePacket}; use rtp::traits::{ReadPacket, WritePacket};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::ffi::CString; use std::ffi::CString;
use std::net::IpAddr; use std::net::IpAddr;
use std::time::{Duration, Instant}; use std::ops::DerefMut;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use tokio::io; use tokio::io;
use tokio::prelude::*; use tokio::time::Delay;
use tokio::timer::Delay;
use webrtc_sdp::attribute_type::SdpAttribute; use webrtc_sdp::attribute_type::SdpAttribute;
use crate::error::Error; use crate::error::Error;
@ -46,7 +54,7 @@ struct User {
} }
impl User { impl User {
fn set_inactive(&mut self) -> impl Stream<Item = Frame, Error = Error> { fn set_inactive(&mut self) -> impl Stream<Item = Result<Frame, Error>> {
self.timeout = None; self.timeout = None;
if self.active { if self.active {
@ -60,15 +68,14 @@ impl User {
let mut msg = msgs::TalkingState::new(); let mut msg = msgs::TalkingState::new();
msg.set_session(self.session); msg.set_session(self.session);
EitherS::A(stream::once(Ok(Frame::Client(msg.into())))) EitherS::A(stream::once(future::ready(Ok(Frame::Client(msg.into())))))
} else { } else {
EitherS::B(stream::empty()) EitherS::B(stream::empty())
} }
} }
fn set_active(&mut self, target: u8) -> impl Stream<Item = Frame, Error = Error> { fn set_active(&mut self, target: u8) -> impl Stream<Item = Result<Frame, Error>> {
let when = Instant::now() + Duration::from_millis(400); self.timeout = Some(tokio::time::delay_for(Duration::from_millis(400)));
self.timeout = Some(Delay::new(when));
if self.active { if self.active {
EitherS::A(stream::empty()) EitherS::A(stream::empty())
@ -78,25 +85,28 @@ impl User {
let mut msg = msgs::TalkingState::new(); let mut msg = msgs::TalkingState::new();
msg.set_session(self.session); msg.set_session(self.session);
msg.set_target(target.into()); msg.set_target(target.into());
EitherS::B(stream::once(Ok(Frame::Client(msg.into())))) EitherS::B(stream::once(future::ready(Ok(Frame::Client(msg.into())))))
} }
} }
} }
pub struct Connection { pub struct Connection {
config: Config, config: Config,
inbound_client: Box<dyn Stream<Item = ControlPacket<Serverbound>, Error = Error>>, inbound_client: Pin<Box<dyn Stream<Item = Result<ControlPacket<Serverbound>, Error>> + Send>>,
outbound_client: Box<dyn Sink<SinkItem = ControlPacket<Clientbound>, SinkError = Error>>, outbound_client: Pin<Box<dyn Sink<ControlPacket<Clientbound>, Error = Error> + Send>>,
inbound_server: Box<dyn Stream<Item = ControlPacket<Clientbound>, Error = Error>>, inbound_server: Pin<Box<dyn Stream<Item = Result<ControlPacket<Clientbound>, Error>> + Send>>,
outbound_server: Box<dyn Sink<SinkItem = ControlPacket<Serverbound>, SinkError = Error>>, outbound_server: Pin<Box<dyn Sink<ControlPacket<Serverbound>, Error = Error> + Send>>,
next_clientbound_frame: Option<ControlPacket<Clientbound>>, next_clientbound_frame: Option<ControlPacket<Clientbound>>,
next_serverbound_frame: Option<ControlPacket<Serverbound>>, next_serverbound_frame: Option<ControlPacket<Serverbound>>,
next_rtp_frame: Option<Vec<u8>>, next_rtp_frame: Option<Vec<u8>>,
stream_to_be_sent: Option<Box<dyn Stream<Item = Frame, Error = Error>>>, stream_to_be_sent: Option<BoxStream<'static, Result<Frame, Error>>>,
ice: Option<(ice::Agent, ice::Stream)>, ice: Option<(ice::Agent, ice::Stream)>,
candidate_gathering_done: bool,
dtls_srtp_future: Option<DtlsSrtpHandshakeResult<ice::StreamComponent, SslAcceptorBuilder>>, dtls_srtp_future: Option<
BoxFuture<'static, Result<DtlsSrtp<ice::StreamComponent, SslAcceptorBuilder>, io::Error>>,
>,
dtls_srtp: Option<DtlsSrtp<ice::StreamComponent, SslAcceptorBuilder>>, dtls_srtp: Option<DtlsSrtp<ice::StreamComponent, SslAcceptorBuilder>>,
dtls_key: PKey<Private>, dtls_key: PKey<Private>,
dtls_cert: X509, dtls_cert: X509,
@ -119,10 +129,10 @@ impl Connection {
server_stream: SSt, server_stream: SSt,
) -> Self ) -> Self
where where
CSi: Sink<SinkItem = ControlPacket<Clientbound>, SinkError = Error> + 'static, CSi: Sink<ControlPacket<Clientbound>, Error = Error> + 'static + Send,
CSt: Stream<Item = ControlPacket<Serverbound>, Error = Error> + 'static, CSt: Stream<Item = Result<ControlPacket<Serverbound>, Error>> + 'static + Send,
SSi: Sink<SinkItem = ControlPacket<Serverbound>, SinkError = Error> + 'static, SSi: Sink<ControlPacket<Serverbound>, Error = Error> + 'static + Send,
SSt: Stream<Item = ControlPacket<Clientbound>, Error = Error> + 'static, SSt: Stream<Item = Result<ControlPacket<Clientbound>, Error>> + 'static + Send,
{ {
let rsa = Rsa::generate(2048).unwrap(); let rsa = Rsa::generate(2048).unwrap();
let key = PKey::from_rsa(rsa).unwrap(); let key = PKey::from_rsa(rsa).unwrap();
@ -140,15 +150,16 @@ impl Connection {
Self { Self {
config, config,
inbound_client: Box::new(client_stream), inbound_client: Box::pin(client_stream),
outbound_client: Box::new(client_sink), outbound_client: Box::pin(client_sink),
inbound_server: Box::new(server_stream), inbound_server: Box::pin(server_stream),
outbound_server: Box::new(server_sink), outbound_server: Box::pin(server_sink),
next_clientbound_frame: None, next_clientbound_frame: None,
next_serverbound_frame: None, next_serverbound_frame: None,
next_rtp_frame: None, next_rtp_frame: None,
stream_to_be_sent: None, stream_to_be_sent: None,
ice: None, ice: None,
candidate_gathering_done: false,
dtls_srtp_future: None, dtls_srtp_future: None,
dtls_srtp: None, dtls_srtp: None,
dtls_key: key, dtls_key: key,
@ -195,7 +206,7 @@ impl Connection {
} }
} }
fn setup_ice(&mut self) -> impl Stream<Item = Frame, Error = Error> { fn setup_ice(&mut self) -> impl Stream<Item = Result<Frame, Error>> {
// Setup ICE agent // Setup ICE agent
let mut agent = ice::Agent::new_rfc5245(); let mut agent = ice::Agent::new_rfc5245();
agent.set_software("mumble-web-proxy"); agent.set_software("mumble-web-proxy");
@ -211,7 +222,11 @@ impl Connection {
} { } {
Ok(stream) => stream, Ok(stream) => stream,
Err(err) => { Err(err) => {
return stream::once(Err(io::Error::new(io::ErrorKind::Other, err).into())); return stream::once(future::ready(Err(io::Error::new(
io::ErrorKind::Other,
err,
)
.into())));
} }
}; };
let component = stream.take_components().pop().expect("one component"); let component = stream.take_components().pop().expect("one component");
@ -238,15 +253,55 @@ impl Connection {
acceptor.set_certificate(&self.dtls_cert).unwrap(); acceptor.set_certificate(&self.dtls_cert).unwrap();
acceptor.set_private_key(&self.dtls_key).unwrap(); acceptor.set_private_key(&self.dtls_key).unwrap();
// FIXME: verify remote fingerprint // FIXME: verify remote fingerprint
self.dtls_srtp_future = Some(DtlsSrtp::handshake(component, acceptor)); self.dtls_srtp_future = Some(DtlsSrtp::handshake(component, acceptor).boxed());
stream::once(Ok(Frame::Client(msg.into()))) stream::once(future::ready(Ok(Frame::Client(msg.into()))))
}
fn gather_ice_candidates(mut self: Pin<&mut Self>, cx: &mut Context) -> bool {
if self.candidate_gathering_done {
return false;
}
let stream = match self.ice {
Some((_, ref mut stream)) => stream,
None => return false,
};
pin_mut!(stream);
match stream.poll_next(cx) {
Poll::Ready(Some(mut candidate)) => {
println!("Local ice candidate: {}", candidate.to_string());
// Map to public addresses (if configured)
let config = &self.config;
match (&mut candidate.address, config.public_v4, config.public_v6) {
(webrtc_sdp::address::Address::Ip(IpAddr::V4(addr)), Some(public), _) => {
*addr = public;
}
(webrtc_sdp::address::Address::Ip(IpAddr::V6(addr)), _, Some(public)) => {
*addr = public;
}
_ => {} // non configured
};
// Got a new candidate, send it to the client
let mut msg = msgs::IceCandidate::new();
msg.set_content(format!("candidate:{}", candidate.to_string()));
let frame = Frame::Client(msg.into());
self.stream_to_be_sent = Some(Box::pin(stream::once(future::ready(Ok(frame)))));
true
}
Poll::Ready(None) => {
self.candidate_gathering_done = true;
false
}
_ => false,
}
} }
fn handle_voice_packet( fn handle_voice_packet(
&mut self, &mut self,
packet: VoicePacket<Clientbound>, packet: VoicePacket<Clientbound>,
) -> impl Stream<Item = Frame, Error = Error> { ) -> impl Stream<Item = Result<Frame, Error>> {
let (target, session_id, seq_num, opus_data, last_bit) = match packet { let (target, session_id, seq_num, opus_data, last_bit) = match packet {
VoicePacket::Audio { VoicePacket::Audio {
target, target,
@ -338,15 +393,15 @@ impl Connection {
padding: Vec::new(), padding: Vec::new(),
}; };
let frame = Frame::Rtp(MuxedPacket::Rtp(rtp)); let frame = Frame::Rtp(MuxedPacket::Rtp(rtp));
EitherS::A(activity_stream.chain(stream::once(Ok(frame)))) EitherS::A(activity_stream.chain(stream::once(future::ready(Ok(frame)))))
} }
fn process_packet_from_server( fn process_packet_from_server(
&mut self, &mut self,
packet: ControlPacket<Clientbound>, packet: ControlPacket<Clientbound>,
) -> impl Stream<Item = Frame, Error = Error> { ) -> impl Stream<Item = Result<Frame, Error>> {
if !self.supports_webrtc() { if !self.supports_webrtc() {
return EitherS::B(stream::once(Ok(Frame::Client(packet)))); return EitherS::B(stream::once(future::ready(Ok(Frame::Client(packet)))));
} }
match packet { match packet {
ControlPacket::UDPTunnel(voice) => EitherS::A(self.handle_voice_packet(*voice)), ControlPacket::UDPTunnel(voice) => EitherS::A(self.handle_voice_packet(*voice)),
@ -356,20 +411,24 @@ impl Connection {
let user = self.allocate_ssrc(session_id); let user = self.allocate_ssrc(session_id);
message.set_ssrc(user.ssrc); message.set_ssrc(user.ssrc);
} }
EitherS::B(stream::once(Ok(Frame::Client((*message).into())))) EitherS::B(stream::once(future::ready(Ok(Frame::Client(
(*message).into(),
)))))
} }
ControlPacket::UserRemove(message) => { ControlPacket::UserRemove(message) => {
self.free_ssrc(message.get_session()); self.free_ssrc(message.get_session());
EitherS::B(stream::once(Ok(Frame::Client((*message).into())))) EitherS::B(stream::once(future::ready(Ok(Frame::Client(
(*message).into(),
)))))
} }
other => EitherS::B(stream::once(Ok(Frame::Client(other)))), other => EitherS::B(stream::once(future::ready(Ok(Frame::Client(other))))),
} }
} }
fn process_packet_from_client( fn process_packet_from_client(
&mut self, &mut self,
packet: ControlPacket<Serverbound>, packet: ControlPacket<Serverbound>,
) -> Box<dyn Stream<Item = Frame, Error = Error>> { ) -> Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send>> {
match packet { match packet {
ControlPacket::Authenticate(mut message) => { ControlPacket::Authenticate(mut message) => {
println!("MSG Authenticate: {:?}", message); println!("MSG Authenticate: {:?}", message);
@ -381,9 +440,14 @@ impl Connection {
let stream = self.setup_ice(); let stream = self.setup_ice();
Box::new(stream::once(Ok(Frame::Server((*message).into()))).chain(stream)) Box::pin(
stream::once(future::ready(Ok(Frame::Server((*message).into()))))
.chain(stream),
)
} else { } else {
Box::new(stream::once(Ok(Frame::Server((*message).into())))) Box::pin(stream::once(future::ready(Ok(Frame::Server(
(*message).into(),
)))))
} }
} }
ControlPacket::WebRTC(mut message) => { ControlPacket::WebRTC(mut message) => {
@ -398,7 +462,7 @@ impl Connection {
// FIXME trigger ICE-restart if required // FIXME trigger ICE-restart if required
// FIXME store and use remote dtls fingerprint // FIXME store and use remote dtls fingerprint
} }
Box::new(stream::empty()) Box::pin(stream::empty())
} }
ControlPacket::IceCandidate(mut message) => { ControlPacket::IceCandidate(mut message) => {
let candidate = message.take_content(); let candidate = message.take_content();
@ -410,15 +474,15 @@ impl Connection {
} }
Ok(_) => unreachable!(), Ok(_) => unreachable!(),
Err(err) => { Err(err) => {
return Box::new(stream::once(Err(io::Error::new( return Box::pin(stream::once(future::ready(Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
format!("Error parsing ICE candidate: {}", err), format!("Error parsing ICE candidate: {}", err),
) )
.into()))); .into()))));
} }
} }
} }
Box::new(stream::empty()) Box::pin(stream::empty())
} }
ControlPacket::TalkingState(message) => { ControlPacket::TalkingState(message) => {
self.target = if message.has_target() { self.target = if message.has_target() {
@ -426,14 +490,14 @@ impl Connection {
} else { } else {
None None
}; };
Box::new(stream::empty()) Box::pin(stream::empty())
} }
other => Box::new(stream::once(Ok(Frame::Server(other)))), other => Box::pin(stream::once(future::ready(Ok(Frame::Server(other))))),
} }
} }
fn process_rtp_packet(&mut self, buf: &[u8]) -> impl Stream<Item = Frame, Error = Error> { fn process_rtp_packet(&mut self, buf: &[u8]) -> impl Stream<Item = Result<Frame, Error>> {
stream::iter_result(match self.rtp_reader.read_packet(&mut &buf[..]) { match self.rtp_reader.read_packet(&mut &buf[..]) {
Ok(MuxedPacket::Rtp(rtp)) => { Ok(MuxedPacket::Rtp(rtp)) => {
if let Some(target) = self.target { if let Some(target) = self.target {
// FIXME derive mumble seq_num from rtp timestamp to properly handle // FIXME derive mumble seq_num from rtp timestamp to properly handle
@ -449,66 +513,65 @@ impl Connection {
position_info: None, position_info: None,
}; };
Some(Ok(Frame::Server(voice_packet.into()))) EitherS::A(stream::once(future::ready(Ok(Frame::Server(
voice_packet.into(),
)))))
} else { } else {
None EitherS::B(stream::empty())
} }
} }
Ok(MuxedPacket::Rtcp(_rtcp)) => None, Ok(MuxedPacket::Rtcp(_rtcp)) => EitherS::B(stream::empty()),
Err(_err) => None, // FIXME maybe not silently drop the error? Err(_err) => EitherS::B(stream::empty()), // FIXME maybe not silently drop the error?
}) }
} }
} }
impl Future for Connection { impl Future for Connection {
type Item = (); type Output = Result<(), Error>;
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
'poll: loop { 'poll: loop {
if let Some((agent, _)) = &mut self.ice { if let Some((ref mut agent, _)) = self.ice {
agent.poll()?; pin_mut!(agent);
let _ = agent.poll(cx);
} }
// If there's a frame pending to be sent, sent it before everything else // If there's a frame pending to be sent, sent it before everything else
if let Some(frame) = self.next_serverbound_frame.take() { if self.next_serverbound_frame.is_some() {
match self.outbound_server.start_send(frame)? { ready!(self.outbound_server.as_mut().poll_ready(cx)?);
AsyncSink::NotReady(frame) => { let frame = self.next_serverbound_frame.take().unwrap();
self.next_serverbound_frame = Some(frame); self.outbound_server.as_mut().start_send(frame)?;
return Ok(Async::NotReady);
}
AsyncSink::Ready => {}
}
}
if let Some(frame) = self.next_clientbound_frame.take() {
match self.outbound_client.start_send(frame)? {
AsyncSink::NotReady(frame) => {
self.next_clientbound_frame = Some(frame);
return Ok(Async::NotReady);
}
AsyncSink::Ready => {}
} }
if self.next_clientbound_frame.is_some() {
ready!(self.outbound_client.as_mut().poll_ready(cx)?);
let frame = self.next_clientbound_frame.take().unwrap();
self.outbound_client.as_mut().start_send(frame)?;
} }
if let Some(frame) = self.next_rtp_frame.take() { if let Some(frame) = self.next_rtp_frame.take() {
if let Some(ref mut dtls_srtp) = self.dtls_srtp { if let Some(ref mut dtls_srtp) = self.dtls_srtp {
match dtls_srtp.start_send(frame)? { pin_mut!(dtls_srtp);
AsyncSink::NotReady(frame) => { match dtls_srtp.as_mut().poll_ready(cx)? {
Poll::Pending => {
self.next_rtp_frame = Some(frame); self.next_rtp_frame = Some(frame);
return Ok(Async::NotReady);
} }
AsyncSink::Ready => {} Poll::Ready(()) => {
dtls_srtp.as_mut().start_send(&frame)?;
}
} }
} else { } else {
// RTP not yet setup, just drop the frame // RTP not yet setup, just drop the frame
self.next_rtp_frame = None;
} }
} }
// Send out all pending frames // Send out all pending frames
if self.stream_to_be_sent.is_some() { if self.stream_to_be_sent.is_some() {
match self.stream_to_be_sent.as_mut().unwrap().poll()? { let mut stream = self.stream_to_be_sent.as_mut().unwrap();
Async::NotReady => return Ok(Async::NotReady), let stream = stream.deref_mut();
Async::Ready(Some(frame)) => { pin_mut!(stream);
match frame { match ready!(stream.poll_next(cx)) {
Some(frame) => {
match frame? {
Frame::Server(frame) => self.next_serverbound_frame = Some(frame), Frame::Server(frame) => self.next_serverbound_frame = Some(frame),
Frame::Client(frame) => self.next_clientbound_frame = Some(frame), Frame::Client(frame) => self.next_clientbound_frame = Some(frame),
Frame::Rtp(frame) => { Frame::Rtp(frame) => {
@ -519,17 +582,17 @@ impl Future for Connection {
} }
continue 'poll; continue 'poll;
} }
Async::Ready(None) => { None => {
self.stream_to_be_sent = None; self.stream_to_be_sent = None;
} }
} }
} }
// All frames have been sent (or queued), flush any buffers in the output path // All frames have been sent (or queued), flush any buffers in the output path
self.outbound_client.poll_complete()?; let _ = self.outbound_client.as_mut().poll_flush(cx)?;
self.outbound_server.poll_complete()?; let _ = self.outbound_server.as_mut().poll_flush(cx)?;
if let Some(ref mut dtls_srtp) = self.dtls_srtp { if let Some(ref mut dtls_srtp) = self.dtls_srtp {
dtls_srtp.poll_complete()?; let _ = Pin::new(dtls_srtp).poll_flush(cx)?;
} }
// Check/register voice timeouts // Check/register voice timeouts
@ -541,41 +604,25 @@ impl Future for Connection {
// anyway), hence this being positioned above the code for incoming packets below. // anyway), hence this being positioned above the code for incoming packets below.
// (same applies to the other futures directly below it) // (same applies to the other futures directly below it)
for session in self.sessions.values_mut() { for session in self.sessions.values_mut() {
if let Async::Ready(Some(())) = session.timeout.poll()? { if let Some(timeout) = &mut session.timeout {
pin_mut!(timeout);
if let Poll::Ready(()) = timeout.poll(cx) {
let stream = session.set_inactive(); let stream = session.set_inactive();
self.stream_to_be_sent = Some(Box::new(stream)); self.stream_to_be_sent = Some(Box::pin(stream));
continue 'poll; continue 'poll;
} }
} }
}
// Poll ice stream for new candidates // Poll ice stream for new candidates
if let Some((_, stream)) = &mut self.ice { if self.as_mut().gather_ice_candidates(cx) {
if let Async::Ready(Some(mut candidate)) = stream.poll()? {
println!("Local ice candidate: {}", candidate.to_string());
// Map to public addresses (if configured)
let config = &self.config;
match (&mut candidate.address, config.public_v4, config.public_v6) {
(IpAddr::V4(addr), Some(public), _) => {
*addr = public;
}
(IpAddr::V6(addr), _, Some(public)) => {
*addr = public;
}
_ => {} // non configured
};
// Got a new candidate, send it to the client
let mut msg = msgs::IceCandidate::new();
msg.set_content(format!("candidate:{}", candidate.to_string()));
let frame = Frame::Client(msg.into());
self.stream_to_be_sent = Some(Box::new(stream::once(Ok(frame))));
continue 'poll; continue 'poll;
} }
}
// Poll dtls_srtp future if required // Poll dtls_srtp future if required
if let Async::Ready(Some(mut dtls_srtp)) = self.dtls_srtp_future.poll()? { if let Some(ref mut future) = self.dtls_srtp_future {
pin_mut!(future);
if let Poll::Ready(mut dtls_srtp) = future.poll(cx)? {
self.dtls_srtp_future = None; self.dtls_srtp_future = None;
println!("DTLS-SRTP connection established."); println!("DTLS-SRTP connection established.");
@ -585,39 +632,41 @@ impl Future for Connection {
self.dtls_srtp = Some(dtls_srtp); self.dtls_srtp = Some(dtls_srtp);
} }
}
// Finally check for incoming packets // Finally check for incoming packets
match self.inbound_server.poll()? { match self.inbound_server.as_mut().poll_next(cx)? {
Async::NotReady => {} Poll::Pending => {}
Async::Ready(Some(frame)) => { Poll::Ready(Some(frame)) => {
let stream = self.process_packet_from_server(frame); let stream = self.process_packet_from_server(frame);
self.stream_to_be_sent = Some(Box::new(stream)); self.stream_to_be_sent = Some(Box::pin(stream));
continue 'poll; continue 'poll;
} }
Async::Ready(None) => return Ok(Async::Ready(())), Poll::Ready(None) => return Poll::Ready(Ok(())),
} }
match self.inbound_client.poll()? { match self.inbound_client.as_mut().poll_next(cx)? {
Async::NotReady => {} Poll::Pending => {}
Async::Ready(Some(frame)) => { Poll::Ready(Some(frame)) => {
let stream = self.process_packet_from_client(frame); let stream = self.process_packet_from_client(frame);
self.stream_to_be_sent = Some(Box::new(stream)); self.stream_to_be_sent = Some(stream);
continue 'poll; continue 'poll;
} }
Async::Ready(None) => return Ok(Async::Ready(())), Poll::Ready(None) => return Poll::Ready(Ok(())),
} }
if self.dtls_srtp.is_some() { if let Some(ref mut dtls_srtp) = self.dtls_srtp {
match self.dtls_srtp.as_mut().unwrap().poll()? { pin_mut!(dtls_srtp);
Async::NotReady => {} match dtls_srtp.poll_next(cx)? {
Async::Ready(Some(frame)) => { Poll::Pending => {}
Poll::Ready(Some(frame)) => {
let stream = self.process_rtp_packet(&frame); let stream = self.process_rtp_packet(&frame);
self.stream_to_be_sent = Some(Box::new(stream)); self.stream_to_be_sent = Some(Box::pin(stream));
continue 'poll; continue 'poll;
} }
Async::Ready(None) => return Ok(Async::Ready(())), Poll::Ready(None) => return Poll::Ready(Ok(())),
} }
} }
return Ok(Async::NotReady); return Poll::Pending;
} }
} }
} }

View file

@ -1,4 +1,4 @@
use futures::sync::mpsc; use futures::channel::mpsc;
// FIXME clean this up // FIXME clean this up
@ -7,13 +7,13 @@ pub enum Error {
Io(std::io::Error), Io(std::io::Error),
ServerTls(native_tls::Error), ServerTls(native_tls::Error),
ClientConnection(tungstenite::Error), ClientConnection(tungstenite::Error),
Misc(Box<dyn std::error::Error>), Misc(Box<dyn std::error::Error + Send>),
} }
impl Error { impl Error {
pub fn is_connection_closed(&self) -> bool { pub fn is_connection_closed(&self) -> bool {
match self { match self {
Error::ClientConnection(tungstenite::Error::ConnectionClosed(_)) => true, Error::ClientConnection(tungstenite::Error::ConnectionClosed) => true,
_ => false, _ => false,
} }
} }
@ -37,8 +37,8 @@ impl From<native_tls::Error> for Error {
} }
} }
impl From<tokio::timer::Error> for Error { impl From<tokio::time::Error> for Error {
fn from(e: tokio::timer::Error) -> Self { fn from(e: tokio::time::Error) -> Self {
Error::Misc(Box::new(e)) Error::Misc(Box::new(e))
} }
} }
@ -55,8 +55,8 @@ impl From<()> for Error {
} }
} }
impl<T> From<mpsc::SendError<T>> for Error { impl From<mpsc::SendError> for Error {
fn from(_: mpsc::SendError<T>) -> Self { fn from(_: mpsc::SendError) -> Self {
panic!(); panic!();
} }
} }

View file

@ -5,8 +5,9 @@ use argparse::StoreOption;
use argparse::StoreTrue; use argparse::StoreTrue;
use argparse::{ArgumentParser, Store}; use argparse::{ArgumentParser, Store};
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, BytesMut, IntoBuf}; use bytes::{BufMut, Bytes, BytesMut};
use futures::{Future, Sink, Stream}; use futures::{future, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use http::HeaderValue;
use mumble_protocol::control::ClientControlCodec; use mumble_protocol::control::ClientControlCodec;
use mumble_protocol::control::ControlPacket; use mumble_protocol::control::ControlPacket;
use mumble_protocol::control::RawControlPacket; use mumble_protocol::control::RawControlPacket;
@ -18,11 +19,10 @@ use std::net::Ipv6Addr;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_codec::Decoder;
use tokio_core::reactor::Core;
use tokio_tls::TlsConnector; use tokio_tls::TlsConnector;
use tokio_tungstenite::accept_hdr_async_with_config; use tokio_tungstenite::accept_hdr_async_with_config;
use tungstenite::handshake::server::Request; use tokio_util::codec::Decoder;
use tungstenite::handshake::server::{ErrorResponse, Request, Response};
use tungstenite::protocol::Message; use tungstenite::protocol::Message;
use tungstenite::protocol::WebSocketConfig; use tungstenite::protocol::WebSocketConfig;
@ -40,7 +40,8 @@ pub struct Config {
pub public_v6: Option<Ipv6Addr>, pub public_v6: Option<Ipv6Addr>,
} }
fn main() { #[tokio::main]
async fn main() -> Result<(), Error> {
let mut ws_port = 0_u16; let mut ws_port = 0_u16;
let mut upstream = "".to_string(); let mut upstream = "".to_string();
let mut accept_invalid_certs = false; let mut accept_invalid_certs = false;
@ -117,26 +118,24 @@ fn main() {
.next() .next()
.expect("Failed to resolve upstream address"); .expect("Failed to resolve upstream address");
let mut core = Core::new().unwrap(); let socket_addr = (Ipv6Addr::from(0), ws_port);
let handle = core.handle(); let mut server = TcpListener::bind(&socket_addr).await?;
let socket_addr = (Ipv6Addr::from(0), ws_port).into(); loop {
let server = TcpListener::bind(&socket_addr).unwrap(); let (client, _) = server.accept().await?;
let f = server.incoming().for_each(move |client| {
let addr = client.peer_addr().expect("peer to have an address"); let addr = client.peer_addr().expect("peer to have an address");
println!("New connection from {}", addr); println!("New connection from {}", addr);
// Connect to server // Connect to server
let server = TcpStream::connect(&upstream_addr) let server = async move {
.from_err() let stream = TcpStream::connect(&upstream_addr).await?;
.and_then(move |stream| {
let connector: TlsConnector = native_tls::TlsConnector::builder() let connector: TlsConnector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(accept_invalid_certs) .danger_accept_invalid_certs(accept_invalid_certs)
.build() .build()
.unwrap() .unwrap()
.into(); .into();
connector.connect(upstream_host, stream).from_err() let stream = connector.connect(upstream_host, stream).await?;
}) Ok::<_, Error>(ClientControlCodec::new().framed(stream))
.map(|stream| ClientControlCodec::new().framed(stream)); };
// Accept client // Accept client
let websocket_config = WebSocketConfig { let websocket_config = WebSocketConfig {
@ -146,42 +145,46 @@ fn main() {
}; };
fn header_callback( fn header_callback(
_req: &Request, _req: &Request,
) -> tungstenite::error::Result<Option<Vec<(String, String)>>> { mut response: Response,
Ok(Some(vec![( ) -> Result<Response, ErrorResponse> {
"Sec-WebSocket-Protocol".to_string(), response
"binary".to_string(), .headers_mut()
)])) .insert("Sec-WebSocket-Protocol", HeaderValue::from_static("binary"));
Ok(response)
} }
let client = accept_hdr_async_with_config(client, header_callback, Some(websocket_config)) let client = accept_hdr_async_with_config(client, header_callback, Some(websocket_config))
.from_err(); .err_into();
// Once both are done, begin proxy duty // Once both are done, begin proxy duty
let config = config.clone(); let config = config.clone();
let f = client let f = future::try_join(client, server)
.join(server)
.and_then(move |(client, server)| { .and_then(move |(client, server)| {
let (client_sink, client_stream) = client.split(); let (client_sink, client_stream) = client.split();
let client_sink = client_sink.with(|m: ControlPacket<Clientbound>| { let client_sink = client_sink.with(|m: ControlPacket<Clientbound>| {
let m = RawControlPacket::from(m); let m = RawControlPacket::from(m);
let mut header = BytesMut::with_capacity(6); let mut header = BytesMut::with_capacity(6);
header.put_u16_be(m.id); header.put_u16(m.id);
header.put_u32_be(m.bytes.len() as u32); header.put_u32(m.bytes.len() as u32);
let buf = header.into_buf().chain(m.bytes); let mut buf = Vec::new();
Ok::<_, Error>(Message::Binary(buf.collect())) buf.extend(header);
buf.extend(m.bytes);
future::ready(Ok::<_, Error>(Message::Binary(buf)))
}); });
let client_stream = client_stream.from_err().filter_map(|m| match m { let client_stream = client_stream.err_into().try_filter_map(|m| {
Message::Binary(ref b) if b.len() >= 6 => { future::ok(match m {
let id = BigEndian::read_u16(b); Message::Binary(b) if b.len() >= 6 => {
let id = BigEndian::read_u16(&b);
// b[2..6] is length which is implicit in websocket msgs // b[2..6] is length which is implicit in websocket msgs
let bytes = b[6..].into(); let bytes = Bytes::from(b).slice(6..);
RawControlPacket { id, bytes }.try_into().ok() RawControlPacket { id, bytes }.try_into().ok()
} }
_ => None, _ => None,
})
}); });
let (server_sink, server_stream) = server.split(); let (server_sink, server_stream) = server.split();
let server_sink = server_sink.sink_from_err(); let server_sink = server_sink.sink_err_into();
let server_stream = server_stream.from_err(); let server_stream = server_stream.err_into();
Connection::new( Connection::new(
config, config,
@ -192,6 +195,7 @@ fn main() {
) )
}) })
.or_else(move |err| { .or_else(move |err| {
future::ready({
if err.is_connection_closed() { if err.is_connection_closed() {
Ok(()) Ok(())
} else { } else {
@ -199,9 +203,8 @@ fn main() {
Err(()) Err(())
} }
}) })
.map(move |()| println!("Client connection closed: {}", addr)); })
handle.spawn(f); .map_ok(move |()| println!("Client connection closed: {}", addr));
Ok(()) tokio::spawn(f);
}); }
core.run(f).unwrap();
} }

View file

@ -1,5 +1,7 @@
use futures::pin_mut;
use futures::Stream; use futures::Stream;
use tokio::prelude::*; use std::pin::Pin;
use std::task::{Context, Poll};
/// Like `futures::future::Either` but for Streams /// Like `futures::future::Either` but for Streams
pub enum EitherS<A, B> { pub enum EitherS<A, B> {
@ -9,15 +11,20 @@ pub enum EitherS<A, B> {
impl<A, B> Stream for EitherS<A, B> impl<A, B> Stream for EitherS<A, B>
where where
A: Stream, A: Stream + Unpin,
B: Stream<Item = A::Item, Error = A::Error>, B: Stream<Item = A::Item> + Unpin,
{ {
type Item = A::Item; type Item = A::Item;
type Error = A::Error; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { match self.get_mut() {
match self { EitherS::A(s) => {
EitherS::A(s) => s.poll(), pin_mut!(s);
EitherS::B(s) => s.poll(), s.poll_next(cx)
}
EitherS::B(s) => {
pin_mut!(s);
s.poll_next(cx)
}
} }
} }
} }