From 995eed0a9b09ab3c8dcf4f38df65210c5239d738 Mon Sep 17 00:00:00 2001 From: Jonas Herzig Date: Sun, 5 Apr 2020 17:46:29 +0200 Subject: [PATCH] Replace stream_to_be_sent with much simpler outbound_buf We've been passing around `Stream`s of `Frame`s everywhere even though we never made use of the async nature of those (i.e. we used future::ready everywhere). This resulted in unnecessarily complicated an hard to read code. Instead we now have a simple VecDeque outbound_buf which we push packets on if we want them to be sent. No more passing around complicated return values. --- src/connection.rs | 223 +++++++++++++++++++++------------------------- src/main.rs | 1 - src/utils.rs | 30 ------- 3 files changed, 100 insertions(+), 154 deletions(-) delete mode 100644 src/utils.rs diff --git a/src/connection.rs b/src/connection.rs index 4d741a1..b9e6e6f 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,10 +1,7 @@ -use futures::future; use futures::future::BoxFuture; use futures::pin_mut; use futures::ready; -use futures::stream; -use futures::stream::BoxStream; -use futures::{Future, FutureExt, Sink, Stream, StreamExt}; +use futures::{Future, FutureExt, Sink, Stream}; use libnice::ice; use mumble_protocol::control::msgs; use mumble_protocol::control::ControlPacket; @@ -25,10 +22,9 @@ use rtp::rfc3550::{ use rtp::rfc5761::{MuxPacketReader, MuxPacketWriter, MuxedPacket}; use rtp::rfc5764::DtlsSrtp; use rtp::traits::{ReadPacket, WritePacket}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::ffi::CString; use std::net::IpAddr; -use std::ops::DerefMut; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -38,7 +34,6 @@ use tokio::time::Delay; use webrtc_sdp::attribute_type::SdpAttribute; use crate::error::Error; -use crate::utils::EitherS; use crate::Config; type SessionId = u32; @@ -54,7 +49,7 @@ struct User { } impl User { - fn set_inactive(&mut self) -> impl Stream> { + fn set_inactive(&mut self) -> Option { self.timeout = None; if self.active { @@ -68,24 +63,24 @@ impl User { let mut msg = msgs::TalkingState::new(); msg.set_session(self.session); - EitherS::A(stream::once(future::ready(Ok(Frame::Client(msg.into()))))) + Some(Frame::Client(msg.into())) } else { - EitherS::B(stream::empty()) + None } } - fn set_active(&mut self, target: u8) -> impl Stream> { + fn set_active(&mut self, target: u8) -> Option { self.timeout = Some(tokio::time::delay_for(Duration::from_millis(400))); if self.active { - EitherS::A(stream::empty()) + None } else { self.active = true; let mut msg = msgs::TalkingState::new(); msg.set_session(self.session); msg.set_target(target.into()); - EitherS::B(stream::once(future::ready(Ok(Frame::Client(msg.into()))))) + Some(Frame::Client(msg.into())) } } } @@ -99,7 +94,7 @@ pub struct Connection { next_clientbound_frame: Option>, next_serverbound_frame: Option>, next_rtp_frame: Option>, - stream_to_be_sent: Option>>, + outbound_buf: VecDeque, ice: Option<(ice::Agent, ice::Stream)>, candidate_gathering_done: bool, @@ -157,7 +152,7 @@ impl Connection { next_clientbound_frame: None, next_serverbound_frame: None, next_rtp_frame: None, - stream_to_be_sent: None, + outbound_buf: VecDeque::new(), ice: None, candidate_gathering_done: false, dtls_srtp_future: None, @@ -206,7 +201,7 @@ impl Connection { } } - fn setup_ice(&mut self) -> impl Stream> { + fn setup_ice(&mut self) -> Result<(), Error> { // Setup ICE agent let mut agent = ice::Agent::new_rfc5245(); agent.set_software("mumble-web-proxy"); @@ -222,11 +217,7 @@ impl Connection { } { Ok(stream) => stream, Err(err) => { - return stream::once(future::ready(Err(io::Error::new( - io::ErrorKind::Other, - err, - ) - .into()))); + return Err(io::Error::new(io::ErrorKind::Other, err).into()); } }; let component = stream.take_components().pop().expect("one component"); @@ -255,7 +246,8 @@ impl Connection { // FIXME: verify remote fingerprint self.dtls_srtp_future = Some(DtlsSrtp::handshake(component, acceptor).boxed()); - stream::once(future::ready(Ok(Frame::Client(msg.into())))) + self.outbound_buf.push_back(Frame::Client(msg.into())); + Ok(()) } fn gather_ice_candidates(mut self: Pin<&mut Self>, cx: &mut Context) -> bool { @@ -287,7 +279,7 @@ impl Connection { let mut msg = msgs::IceCandidate::new(); msg.set_content(format!("candidate:{}", candidate.to_string())); let frame = Frame::Client(msg.into()); - self.stream_to_be_sent = Some(Box::pin(stream::once(future::ready(Ok(frame))))); + self.outbound_buf.push_back(frame); true } Poll::Ready(None) => { @@ -298,10 +290,7 @@ impl Connection { } } - fn handle_voice_packet( - &mut self, - packet: VoicePacket, - ) -> impl Stream> { + fn handle_voice_packet(&mut self, packet: VoicePacket) -> Result<(), Error> { let (target, session_id, seq_num, opus_data, last_bit) = match packet { VoicePacket::Audio { target, @@ -310,7 +299,7 @@ impl Connection { payload: VoicePacketPayload::Opus(data, last_bit), .. } => (target, session_id, seq_num, data, last_bit), - _ => return EitherS::B(stream::empty()), + _ => return Ok(()), }; // NOTE: the mumble packet id increases by 1 per 10ms of audio contained @@ -321,7 +310,7 @@ impl Connection { let user = match self.sessions.get_mut(&(session_id as u32)) { Some(s) => s, - None => return EitherS::B(stream::empty()), + None => return Ok(()), }; let rtp_ssrc = user.ssrc; @@ -336,44 +325,49 @@ impl Connection { let offset = seq_num - user.start_voice_seq_num; let mut rtp_seq_num = user.rtp_seq_num_offset + offset as u32; - let activity_stream = if last_bit { + if last_bit { if seq_num <= user.highest_voice_seq_num { // Horribly delayed end packet from a previous stream, just drop it // (or single packet stream which would be inaudible anyway) - return EitherS::B(stream::empty()); + return Ok(()); } // this is the last packet of this voice transmission -> reset counters // doing that will effectively trash any delayed packets but that's just // a flaw in the mumble protocol and there's nothing we can do about it. - EitherS::B(user.set_inactive()) + if let Some(frame) = user.set_inactive() { + self.outbound_buf.push_back(frame); + } + } else if seq_num == user.highest_voice_seq_num && seq_num != user.start_voice_seq_num { + // re-transmission, drop it + return Ok(()); + } else if seq_num >= user.highest_voice_seq_num + && seq_num < user.highest_voice_seq_num + 100 + { + // probably same voice transmission (also not too far in the future) + user.highest_voice_seq_num = seq_num; + if let Some(frame) = user.set_active(target) { + self.outbound_buf.push_back(frame); + } + } else if seq_num < user.highest_voice_seq_num && seq_num + 100 > user.highest_voice_seq_num + { + // slightly delayed but probably same voice transmission + if let Some(frame) = user.set_active(target) { + self.outbound_buf.push_back(frame); + } } else { - EitherS::A( - if seq_num == user.highest_voice_seq_num && seq_num != user.start_voice_seq_num { - // re-transmission, drop it - return EitherS::B(stream::empty()); - } else if seq_num >= user.highest_voice_seq_num - && seq_num < user.highest_voice_seq_num + 100 - { - // probably same voice transmission (also not too far in the future) - user.highest_voice_seq_num = seq_num; - EitherS::A(user.set_active(target)) - } else if seq_num < user.highest_voice_seq_num - && seq_num + 100 > user.highest_voice_seq_num - { - // slightly delayed but probably same voice transmission - EitherS::A(user.set_active(target)) - } else { - // Either significant jitter (>2s) or we missed the end of the last - // transmission. Since >2s jitter will break opus horribly anyway, - // we assume the latter and start a new transmission - let stream = user.set_inactive(); - first_in_transmission = true; - user.start_voice_seq_num = seq_num; - user.highest_voice_seq_num = seq_num; - rtp_seq_num = user.rtp_seq_num_offset; - EitherS::B(stream.chain(user.set_active(target))) - }, - ) + // Either significant jitter (>2s) or we missed the end of the last + // transmission. Since >2s jitter will break opus horribly anyway, + // we assume the latter and start a new transmission + if let Some(frame) = user.set_inactive() { + self.outbound_buf.push_back(frame); + } + first_in_transmission = true; + user.start_voice_seq_num = seq_num; + user.highest_voice_seq_num = seq_num; + rtp_seq_num = user.rtp_seq_num_offset; + if let Some(frame) = user.set_active(target) { + self.outbound_buf.push_back(frame); + } }; let rtp_time = 480 * rtp_seq_num; @@ -393,42 +387,44 @@ impl Connection { padding: Vec::new(), }; let frame = Frame::Rtp(MuxedPacket::Rtp(rtp)); - EitherS::A(activity_stream.chain(stream::once(future::ready(Ok(frame))))) + self.outbound_buf.push_back(frame); + + Ok(()) } fn process_packet_from_server( &mut self, packet: ControlPacket, - ) -> impl Stream> { + ) -> Result<(), Error> { if !self.supports_webrtc() { - return EitherS::B(stream::once(future::ready(Ok(Frame::Client(packet))))); + self.outbound_buf.push_back(Frame::Client(packet)); + return Ok(()); } match packet { - ControlPacket::UDPTunnel(voice) => EitherS::A(self.handle_voice_packet(*voice)), + ControlPacket::UDPTunnel(voice) => return self.handle_voice_packet(*voice), ControlPacket::UserState(mut message) => { let session_id = message.get_session(); if !self.sessions.contains_key(&session_id) { let user = self.allocate_ssrc(session_id); message.set_ssrc(user.ssrc); } - EitherS::B(stream::once(future::ready(Ok(Frame::Client( - (*message).into(), - ))))) + self.outbound_buf + .push_back(Frame::Client((*message).into())); } ControlPacket::UserRemove(message) => { self.free_ssrc(message.get_session()); - EitherS::B(stream::once(future::ready(Ok(Frame::Client( - (*message).into(), - ))))) + self.outbound_buf + .push_back(Frame::Client((*message).into())); } - other => EitherS::B(stream::once(future::ready(Ok(Frame::Client(other))))), - } + other => self.outbound_buf.push_back(Frame::Client(other)), + }; + Ok(()) } fn process_packet_from_client( &mut self, packet: ControlPacket, - ) -> Pin> + Send>> { + ) -> Result<(), Error> { match packet { ControlPacket::Authenticate(mut message) => { println!("MSG Authenticate: {:?}", message); @@ -437,17 +433,13 @@ impl Connection { message.clear_webrtc(); // and make sure opus is marked as supported message.set_opus(true); + self.outbound_buf + .push_back(Frame::Server((*message).into())); - let stream = self.setup_ice(); - - Box::pin( - stream::once(future::ready(Ok(Frame::Server((*message).into())))) - .chain(stream), - ) + self.setup_ice()?; } else { - Box::pin(stream::once(future::ready(Ok(Frame::Server( - (*message).into(), - ))))) + self.outbound_buf + .push_back(Frame::Server((*message).into())); } } ControlPacket::WebRTC(mut message) => { @@ -462,7 +454,6 @@ impl Connection { // FIXME trigger ICE-restart if required // FIXME store and use remote dtls fingerprint } - Box::pin(stream::empty()) } ControlPacket::IceCandidate(mut message) => { let candidate = message.take_content(); @@ -474,15 +465,14 @@ impl Connection { } Ok(_) => unreachable!(), Err(err) => { - return Box::pin(stream::once(future::ready(Err(io::Error::new( + return Err(io::Error::new( io::ErrorKind::Other, format!("Error parsing ICE candidate: {}", err), ) - .into())))); + .into()); } } } - Box::pin(stream::empty()) } ControlPacket::TalkingState(message) => { self.target = if message.has_target() { @@ -490,13 +480,15 @@ impl Connection { } else { None }; - Box::pin(stream::empty()) } - other => Box::pin(stream::once(future::ready(Ok(Frame::Server(other))))), - } + other => { + self.outbound_buf.push_back(Frame::Server(other)); + } + }; + Ok(()) } - fn process_rtp_packet(&mut self, buf: &[u8]) -> impl Stream> { + fn process_rtp_packet(&mut self, buf: &[u8]) { match self.rtp_reader.read_packet(&mut &buf[..]) { Ok(MuxedPacket::Rtp(rtp)) => { if let Some(target) = self.target { @@ -513,15 +505,12 @@ impl Connection { position_info: None, }; - EitherS::A(stream::once(future::ready(Ok(Frame::Server( - voice_packet.into(), - ))))) - } else { - EitherS::B(stream::empty()) + self.outbound_buf + .push_back(Frame::Server(voice_packet.into())); } } - Ok(MuxedPacket::Rtcp(_rtcp)) => EitherS::B(stream::empty()), - Err(_err) => EitherS::B(stream::empty()), // FIXME maybe not silently drop the error? + Ok(MuxedPacket::Rtcp(_rtcp)) => {} + Err(_err) => {} // FIXME maybe not silently drop the error? } } } @@ -565,27 +554,17 @@ impl Future for Connection { } // Send out all pending frames - if self.stream_to_be_sent.is_some() { - let mut stream = self.stream_to_be_sent.as_mut().unwrap(); - let stream = stream.deref_mut(); - pin_mut!(stream); - match ready!(stream.poll_next(cx)) { - Some(frame) => { - match frame? { - Frame::Server(frame) => self.next_serverbound_frame = Some(frame), - Frame::Client(frame) => self.next_clientbound_frame = Some(frame), - Frame::Rtp(frame) => { - let mut buf = Vec::new(); - self.rtp_writer.write_packet(&mut buf, &frame)?; - self.next_rtp_frame = Some(buf) - } - } - continue 'poll; - } - None => { - self.stream_to_be_sent = None; + if let Some(frame) = self.outbound_buf.pop_front() { + match frame { + Frame::Server(frame) => self.next_serverbound_frame = Some(frame), + Frame::Client(frame) => self.next_clientbound_frame = Some(frame), + Frame::Rtp(frame) => { + let mut buf = Vec::new(); + self.rtp_writer.write_packet(&mut buf, &frame)?; + self.next_rtp_frame = Some(buf) } } + continue 'poll; } // All frames have been sent (or queued), flush any buffers in the output path @@ -607,8 +586,9 @@ impl Future for Connection { if let Some(timeout) = &mut session.timeout { pin_mut!(timeout); if let Poll::Ready(()) = timeout.poll(cx) { - let stream = session.set_inactive(); - self.stream_to_be_sent = Some(Box::pin(stream)); + if let Some(frame) = session.set_inactive() { + self.outbound_buf.push_back(frame); + } continue 'poll; } } @@ -638,8 +618,7 @@ impl Future for Connection { match self.inbound_server.as_mut().poll_next(cx)? { Poll::Pending => {} Poll::Ready(Some(frame)) => { - let stream = self.process_packet_from_server(frame); - self.stream_to_be_sent = Some(Box::pin(stream)); + self.process_packet_from_server(frame)?; continue 'poll; } Poll::Ready(None) => return Poll::Ready(Ok(())), @@ -647,8 +626,7 @@ impl Future for Connection { match self.inbound_client.as_mut().poll_next(cx)? { Poll::Pending => {} Poll::Ready(Some(frame)) => { - let stream = self.process_packet_from_client(frame); - self.stream_to_be_sent = Some(stream); + self.process_packet_from_client(frame)?; continue 'poll; } Poll::Ready(None) => return Poll::Ready(Ok(())), @@ -658,8 +636,7 @@ impl Future for Connection { match dtls_srtp.poll_next(cx)? { Poll::Pending => {} Poll::Ready(Some(frame)) => { - let stream = self.process_rtp_packet(&frame); - self.stream_to_be_sent = Some(Box::pin(stream)); + self.process_rtp_packet(&frame); continue 'poll; } Poll::Ready(None) => return Poll::Ready(Ok(())), diff --git a/src/main.rs b/src/main.rs index 2377f2a..db65187 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,7 +28,6 @@ use tungstenite::protocol::WebSocketConfig; mod connection; mod error; -mod utils; use connection::Connection; use error::Error; diff --git a/src/utils.rs b/src/utils.rs deleted file mode 100644 index b265d30..0000000 --- a/src/utils.rs +++ /dev/null @@ -1,30 +0,0 @@ -use futures::pin_mut; -use futures::Stream; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Like `futures::future::Either` but for Streams -pub enum EitherS { - A(A), - B(B), -} - -impl Stream for EitherS -where - A: Stream + Unpin, - B: Stream + Unpin, -{ - type Item = A::Item; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.get_mut() { - EitherS::A(s) => { - pin_mut!(s); - s.poll_next(cx) - } - EitherS::B(s) => { - pin_mut!(s); - s.poll_next(cx) - } - } - } -}