Replace stream_to_be_sent with much simpler outbound_buf

We've been passing around `Stream`s of `Frame`s everywhere even though we never
made use of the async nature of those (i.e. we used future::ready everywhere).
This resulted in unnecessarily complicated an hard to read code. Instead we now
have a simple VecDeque outbound_buf which we push packets on if we want them to
be sent. No more passing around complicated return values.
This commit is contained in:
Jonas Herzig 2020-04-05 17:46:29 +02:00
parent 499c430b47
commit 995eed0a9b
3 changed files with 100 additions and 154 deletions

View file

@ -1,10 +1,7 @@
use futures::future;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::pin_mut; use futures::pin_mut;
use futures::ready; use futures::ready;
use futures::stream; use futures::{Future, FutureExt, Sink, Stream};
use futures::stream::BoxStream;
use futures::{Future, FutureExt, Sink, Stream, StreamExt};
use libnice::ice; use libnice::ice;
use mumble_protocol::control::msgs; use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlPacket; use mumble_protocol::control::ControlPacket;
@ -25,10 +22,9 @@ use rtp::rfc3550::{
use rtp::rfc5761::{MuxPacketReader, MuxPacketWriter, MuxedPacket}; use rtp::rfc5761::{MuxPacketReader, MuxPacketWriter, MuxedPacket};
use rtp::rfc5764::DtlsSrtp; use rtp::rfc5764::DtlsSrtp;
use rtp::traits::{ReadPacket, WritePacket}; use rtp::traits::{ReadPacket, WritePacket};
use std::collections::BTreeMap; use std::collections::{BTreeMap, VecDeque};
use std::ffi::CString; use std::ffi::CString;
use std::net::IpAddr; use std::net::IpAddr;
use std::ops::DerefMut;
use std::pin::Pin; use std::pin::Pin;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
@ -38,7 +34,6 @@ use tokio::time::Delay;
use webrtc_sdp::attribute_type::SdpAttribute; use webrtc_sdp::attribute_type::SdpAttribute;
use crate::error::Error; use crate::error::Error;
use crate::utils::EitherS;
use crate::Config; use crate::Config;
type SessionId = u32; type SessionId = u32;
@ -54,7 +49,7 @@ struct User {
} }
impl User { impl User {
fn set_inactive(&mut self) -> impl Stream<Item = Result<Frame, Error>> { fn set_inactive(&mut self) -> Option<Frame> {
self.timeout = None; self.timeout = None;
if self.active { if self.active {
@ -68,24 +63,24 @@ impl User {
let mut msg = msgs::TalkingState::new(); let mut msg = msgs::TalkingState::new();
msg.set_session(self.session); msg.set_session(self.session);
EitherS::A(stream::once(future::ready(Ok(Frame::Client(msg.into()))))) Some(Frame::Client(msg.into()))
} else { } else {
EitherS::B(stream::empty()) None
} }
} }
fn set_active(&mut self, target: u8) -> impl Stream<Item = Result<Frame, Error>> { fn set_active(&mut self, target: u8) -> Option<Frame> {
self.timeout = Some(tokio::time::delay_for(Duration::from_millis(400))); self.timeout = Some(tokio::time::delay_for(Duration::from_millis(400)));
if self.active { if self.active {
EitherS::A(stream::empty()) None
} else { } else {
self.active = true; self.active = true;
let mut msg = msgs::TalkingState::new(); let mut msg = msgs::TalkingState::new();
msg.set_session(self.session); msg.set_session(self.session);
msg.set_target(target.into()); msg.set_target(target.into());
EitherS::B(stream::once(future::ready(Ok(Frame::Client(msg.into()))))) Some(Frame::Client(msg.into()))
} }
} }
} }
@ -99,7 +94,7 @@ pub struct Connection {
next_clientbound_frame: Option<ControlPacket<Clientbound>>, next_clientbound_frame: Option<ControlPacket<Clientbound>>,
next_serverbound_frame: Option<ControlPacket<Serverbound>>, next_serverbound_frame: Option<ControlPacket<Serverbound>>,
next_rtp_frame: Option<Vec<u8>>, next_rtp_frame: Option<Vec<u8>>,
stream_to_be_sent: Option<BoxStream<'static, Result<Frame, Error>>>, outbound_buf: VecDeque<Frame>,
ice: Option<(ice::Agent, ice::Stream)>, ice: Option<(ice::Agent, ice::Stream)>,
candidate_gathering_done: bool, candidate_gathering_done: bool,
@ -157,7 +152,7 @@ impl Connection {
next_clientbound_frame: None, next_clientbound_frame: None,
next_serverbound_frame: None, next_serverbound_frame: None,
next_rtp_frame: None, next_rtp_frame: None,
stream_to_be_sent: None, outbound_buf: VecDeque::new(),
ice: None, ice: None,
candidate_gathering_done: false, candidate_gathering_done: false,
dtls_srtp_future: None, dtls_srtp_future: None,
@ -206,7 +201,7 @@ impl Connection {
} }
} }
fn setup_ice(&mut self) -> impl Stream<Item = Result<Frame, Error>> { fn setup_ice(&mut self) -> Result<(), Error> {
// Setup ICE agent // Setup ICE agent
let mut agent = ice::Agent::new_rfc5245(); let mut agent = ice::Agent::new_rfc5245();
agent.set_software("mumble-web-proxy"); agent.set_software("mumble-web-proxy");
@ -222,11 +217,7 @@ impl Connection {
} { } {
Ok(stream) => stream, Ok(stream) => stream,
Err(err) => { Err(err) => {
return stream::once(future::ready(Err(io::Error::new( return Err(io::Error::new(io::ErrorKind::Other, err).into());
io::ErrorKind::Other,
err,
)
.into())));
} }
}; };
let component = stream.take_components().pop().expect("one component"); let component = stream.take_components().pop().expect("one component");
@ -255,7 +246,8 @@ impl Connection {
// FIXME: verify remote fingerprint // FIXME: verify remote fingerprint
self.dtls_srtp_future = Some(DtlsSrtp::handshake(component, acceptor).boxed()); self.dtls_srtp_future = Some(DtlsSrtp::handshake(component, acceptor).boxed());
stream::once(future::ready(Ok(Frame::Client(msg.into())))) self.outbound_buf.push_back(Frame::Client(msg.into()));
Ok(())
} }
fn gather_ice_candidates(mut self: Pin<&mut Self>, cx: &mut Context) -> bool { fn gather_ice_candidates(mut self: Pin<&mut Self>, cx: &mut Context) -> bool {
@ -287,7 +279,7 @@ impl Connection {
let mut msg = msgs::IceCandidate::new(); let mut msg = msgs::IceCandidate::new();
msg.set_content(format!("candidate:{}", candidate.to_string())); msg.set_content(format!("candidate:{}", candidate.to_string()));
let frame = Frame::Client(msg.into()); let frame = Frame::Client(msg.into());
self.stream_to_be_sent = Some(Box::pin(stream::once(future::ready(Ok(frame))))); self.outbound_buf.push_back(frame);
true true
} }
Poll::Ready(None) => { Poll::Ready(None) => {
@ -298,10 +290,7 @@ impl Connection {
} }
} }
fn handle_voice_packet( fn handle_voice_packet(&mut self, packet: VoicePacket<Clientbound>) -> Result<(), Error> {
&mut self,
packet: VoicePacket<Clientbound>,
) -> impl Stream<Item = Result<Frame, Error>> {
let (target, session_id, seq_num, opus_data, last_bit) = match packet { let (target, session_id, seq_num, opus_data, last_bit) = match packet {
VoicePacket::Audio { VoicePacket::Audio {
target, target,
@ -310,7 +299,7 @@ impl Connection {
payload: VoicePacketPayload::Opus(data, last_bit), payload: VoicePacketPayload::Opus(data, last_bit),
.. ..
} => (target, session_id, seq_num, data, last_bit), } => (target, session_id, seq_num, data, last_bit),
_ => return EitherS::B(stream::empty()), _ => return Ok(()),
}; };
// NOTE: the mumble packet id increases by 1 per 10ms of audio contained // NOTE: the mumble packet id increases by 1 per 10ms of audio contained
@ -321,7 +310,7 @@ impl Connection {
let user = match self.sessions.get_mut(&(session_id as u32)) { let user = match self.sessions.get_mut(&(session_id as u32)) {
Some(s) => s, Some(s) => s,
None => return EitherS::B(stream::empty()), None => return Ok(()),
}; };
let rtp_ssrc = user.ssrc; let rtp_ssrc = user.ssrc;
@ -336,44 +325,49 @@ impl Connection {
let offset = seq_num - user.start_voice_seq_num; let offset = seq_num - user.start_voice_seq_num;
let mut rtp_seq_num = user.rtp_seq_num_offset + offset as u32; let mut rtp_seq_num = user.rtp_seq_num_offset + offset as u32;
let activity_stream = if last_bit { if last_bit {
if seq_num <= user.highest_voice_seq_num { if seq_num <= user.highest_voice_seq_num {
// Horribly delayed end packet from a previous stream, just drop it // Horribly delayed end packet from a previous stream, just drop it
// (or single packet stream which would be inaudible anyway) // (or single packet stream which would be inaudible anyway)
return EitherS::B(stream::empty()); return Ok(());
} }
// this is the last packet of this voice transmission -> reset counters // this is the last packet of this voice transmission -> reset counters
// doing that will effectively trash any delayed packets but that's just // doing that will effectively trash any delayed packets but that's just
// a flaw in the mumble protocol and there's nothing we can do about it. // a flaw in the mumble protocol and there's nothing we can do about it.
EitherS::B(user.set_inactive()) if let Some(frame) = user.set_inactive() {
} else { self.outbound_buf.push_back(frame);
EitherS::A( }
if seq_num == user.highest_voice_seq_num && seq_num != user.start_voice_seq_num { } else if seq_num == user.highest_voice_seq_num && seq_num != user.start_voice_seq_num {
// re-transmission, drop it // re-transmission, drop it
return EitherS::B(stream::empty()); return Ok(());
} else if seq_num >= user.highest_voice_seq_num } else if seq_num >= user.highest_voice_seq_num
&& seq_num < user.highest_voice_seq_num + 100 && seq_num < user.highest_voice_seq_num + 100
{ {
// probably same voice transmission (also not too far in the future) // probably same voice transmission (also not too far in the future)
user.highest_voice_seq_num = seq_num; user.highest_voice_seq_num = seq_num;
EitherS::A(user.set_active(target)) if let Some(frame) = user.set_active(target) {
} else if seq_num < user.highest_voice_seq_num self.outbound_buf.push_back(frame);
&& seq_num + 100 > user.highest_voice_seq_num }
} else if seq_num < user.highest_voice_seq_num && seq_num + 100 > user.highest_voice_seq_num
{ {
// slightly delayed but probably same voice transmission // slightly delayed but probably same voice transmission
EitherS::A(user.set_active(target)) if let Some(frame) = user.set_active(target) {
self.outbound_buf.push_back(frame);
}
} else { } else {
// Either significant jitter (>2s) or we missed the end of the last // Either significant jitter (>2s) or we missed the end of the last
// transmission. Since >2s jitter will break opus horribly anyway, // transmission. Since >2s jitter will break opus horribly anyway,
// we assume the latter and start a new transmission // we assume the latter and start a new transmission
let stream = user.set_inactive(); if let Some(frame) = user.set_inactive() {
self.outbound_buf.push_back(frame);
}
first_in_transmission = true; first_in_transmission = true;
user.start_voice_seq_num = seq_num; user.start_voice_seq_num = seq_num;
user.highest_voice_seq_num = seq_num; user.highest_voice_seq_num = seq_num;
rtp_seq_num = user.rtp_seq_num_offset; rtp_seq_num = user.rtp_seq_num_offset;
EitherS::B(stream.chain(user.set_active(target))) if let Some(frame) = user.set_active(target) {
}, self.outbound_buf.push_back(frame);
) }
}; };
let rtp_time = 480 * rtp_seq_num; let rtp_time = 480 * rtp_seq_num;
@ -393,42 +387,44 @@ impl Connection {
padding: Vec::new(), padding: Vec::new(),
}; };
let frame = Frame::Rtp(MuxedPacket::Rtp(rtp)); let frame = Frame::Rtp(MuxedPacket::Rtp(rtp));
EitherS::A(activity_stream.chain(stream::once(future::ready(Ok(frame))))) self.outbound_buf.push_back(frame);
Ok(())
} }
fn process_packet_from_server( fn process_packet_from_server(
&mut self, &mut self,
packet: ControlPacket<Clientbound>, packet: ControlPacket<Clientbound>,
) -> impl Stream<Item = Result<Frame, Error>> { ) -> Result<(), Error> {
if !self.supports_webrtc() { if !self.supports_webrtc() {
return EitherS::B(stream::once(future::ready(Ok(Frame::Client(packet))))); self.outbound_buf.push_back(Frame::Client(packet));
return Ok(());
} }
match packet { match packet {
ControlPacket::UDPTunnel(voice) => EitherS::A(self.handle_voice_packet(*voice)), ControlPacket::UDPTunnel(voice) => return self.handle_voice_packet(*voice),
ControlPacket::UserState(mut message) => { ControlPacket::UserState(mut message) => {
let session_id = message.get_session(); let session_id = message.get_session();
if !self.sessions.contains_key(&session_id) { if !self.sessions.contains_key(&session_id) {
let user = self.allocate_ssrc(session_id); let user = self.allocate_ssrc(session_id);
message.set_ssrc(user.ssrc); message.set_ssrc(user.ssrc);
} }
EitherS::B(stream::once(future::ready(Ok(Frame::Client( self.outbound_buf
(*message).into(), .push_back(Frame::Client((*message).into()));
)))))
} }
ControlPacket::UserRemove(message) => { ControlPacket::UserRemove(message) => {
self.free_ssrc(message.get_session()); self.free_ssrc(message.get_session());
EitherS::B(stream::once(future::ready(Ok(Frame::Client( self.outbound_buf
(*message).into(), .push_back(Frame::Client((*message).into()));
)))))
}
other => EitherS::B(stream::once(future::ready(Ok(Frame::Client(other))))),
} }
other => self.outbound_buf.push_back(Frame::Client(other)),
};
Ok(())
} }
fn process_packet_from_client( fn process_packet_from_client(
&mut self, &mut self,
packet: ControlPacket<Serverbound>, packet: ControlPacket<Serverbound>,
) -> Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send>> { ) -> Result<(), Error> {
match packet { match packet {
ControlPacket::Authenticate(mut message) => { ControlPacket::Authenticate(mut message) => {
println!("MSG Authenticate: {:?}", message); println!("MSG Authenticate: {:?}", message);
@ -437,17 +433,13 @@ impl Connection {
message.clear_webrtc(); message.clear_webrtc();
// and make sure opus is marked as supported // and make sure opus is marked as supported
message.set_opus(true); message.set_opus(true);
self.outbound_buf
.push_back(Frame::Server((*message).into()));
let stream = self.setup_ice(); self.setup_ice()?;
Box::pin(
stream::once(future::ready(Ok(Frame::Server((*message).into()))))
.chain(stream),
)
} else { } else {
Box::pin(stream::once(future::ready(Ok(Frame::Server( self.outbound_buf
(*message).into(), .push_back(Frame::Server((*message).into()));
)))))
} }
} }
ControlPacket::WebRTC(mut message) => { ControlPacket::WebRTC(mut message) => {
@ -462,7 +454,6 @@ impl Connection {
// FIXME trigger ICE-restart if required // FIXME trigger ICE-restart if required
// FIXME store and use remote dtls fingerprint // FIXME store and use remote dtls fingerprint
} }
Box::pin(stream::empty())
} }
ControlPacket::IceCandidate(mut message) => { ControlPacket::IceCandidate(mut message) => {
let candidate = message.take_content(); let candidate = message.take_content();
@ -474,15 +465,14 @@ impl Connection {
} }
Ok(_) => unreachable!(), Ok(_) => unreachable!(),
Err(err) => { Err(err) => {
return Box::pin(stream::once(future::ready(Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
format!("Error parsing ICE candidate: {}", err), format!("Error parsing ICE candidate: {}", err),
) )
.into())))); .into());
} }
} }
} }
Box::pin(stream::empty())
} }
ControlPacket::TalkingState(message) => { ControlPacket::TalkingState(message) => {
self.target = if message.has_target() { self.target = if message.has_target() {
@ -490,13 +480,15 @@ impl Connection {
} else { } else {
None None
}; };
Box::pin(stream::empty())
} }
other => Box::pin(stream::once(future::ready(Ok(Frame::Server(other))))), other => {
self.outbound_buf.push_back(Frame::Server(other));
} }
};
Ok(())
} }
fn process_rtp_packet(&mut self, buf: &[u8]) -> impl Stream<Item = Result<Frame, Error>> { fn process_rtp_packet(&mut self, buf: &[u8]) {
match self.rtp_reader.read_packet(&mut &buf[..]) { match self.rtp_reader.read_packet(&mut &buf[..]) {
Ok(MuxedPacket::Rtp(rtp)) => { Ok(MuxedPacket::Rtp(rtp)) => {
if let Some(target) = self.target { if let Some(target) = self.target {
@ -513,15 +505,12 @@ impl Connection {
position_info: None, position_info: None,
}; };
EitherS::A(stream::once(future::ready(Ok(Frame::Server( self.outbound_buf
voice_packet.into(), .push_back(Frame::Server(voice_packet.into()));
)))))
} else {
EitherS::B(stream::empty())
} }
} }
Ok(MuxedPacket::Rtcp(_rtcp)) => EitherS::B(stream::empty()), Ok(MuxedPacket::Rtcp(_rtcp)) => {}
Err(_err) => EitherS::B(stream::empty()), // FIXME maybe not silently drop the error? Err(_err) => {} // FIXME maybe not silently drop the error?
} }
} }
} }
@ -565,13 +554,8 @@ impl Future for Connection {
} }
// Send out all pending frames // Send out all pending frames
if self.stream_to_be_sent.is_some() { if let Some(frame) = self.outbound_buf.pop_front() {
let mut stream = self.stream_to_be_sent.as_mut().unwrap(); match frame {
let stream = stream.deref_mut();
pin_mut!(stream);
match ready!(stream.poll_next(cx)) {
Some(frame) => {
match frame? {
Frame::Server(frame) => self.next_serverbound_frame = Some(frame), Frame::Server(frame) => self.next_serverbound_frame = Some(frame),
Frame::Client(frame) => self.next_clientbound_frame = Some(frame), Frame::Client(frame) => self.next_clientbound_frame = Some(frame),
Frame::Rtp(frame) => { Frame::Rtp(frame) => {
@ -582,11 +566,6 @@ impl Future for Connection {
} }
continue 'poll; continue 'poll;
} }
None => {
self.stream_to_be_sent = None;
}
}
}
// All frames have been sent (or queued), flush any buffers in the output path // All frames have been sent (or queued), flush any buffers in the output path
let _ = self.outbound_client.as_mut().poll_flush(cx)?; let _ = self.outbound_client.as_mut().poll_flush(cx)?;
@ -607,8 +586,9 @@ impl Future for Connection {
if let Some(timeout) = &mut session.timeout { if let Some(timeout) = &mut session.timeout {
pin_mut!(timeout); pin_mut!(timeout);
if let Poll::Ready(()) = timeout.poll(cx) { if let Poll::Ready(()) = timeout.poll(cx) {
let stream = session.set_inactive(); if let Some(frame) = session.set_inactive() {
self.stream_to_be_sent = Some(Box::pin(stream)); self.outbound_buf.push_back(frame);
}
continue 'poll; continue 'poll;
} }
} }
@ -638,8 +618,7 @@ impl Future for Connection {
match self.inbound_server.as_mut().poll_next(cx)? { match self.inbound_server.as_mut().poll_next(cx)? {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Some(frame)) => { Poll::Ready(Some(frame)) => {
let stream = self.process_packet_from_server(frame); self.process_packet_from_server(frame)?;
self.stream_to_be_sent = Some(Box::pin(stream));
continue 'poll; continue 'poll;
} }
Poll::Ready(None) => return Poll::Ready(Ok(())), Poll::Ready(None) => return Poll::Ready(Ok(())),
@ -647,8 +626,7 @@ impl Future for Connection {
match self.inbound_client.as_mut().poll_next(cx)? { match self.inbound_client.as_mut().poll_next(cx)? {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Some(frame)) => { Poll::Ready(Some(frame)) => {
let stream = self.process_packet_from_client(frame); self.process_packet_from_client(frame)?;
self.stream_to_be_sent = Some(stream);
continue 'poll; continue 'poll;
} }
Poll::Ready(None) => return Poll::Ready(Ok(())), Poll::Ready(None) => return Poll::Ready(Ok(())),
@ -658,8 +636,7 @@ impl Future for Connection {
match dtls_srtp.poll_next(cx)? { match dtls_srtp.poll_next(cx)? {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Some(frame)) => { Poll::Ready(Some(frame)) => {
let stream = self.process_rtp_packet(&frame); self.process_rtp_packet(&frame);
self.stream_to_be_sent = Some(Box::pin(stream));
continue 'poll; continue 'poll;
} }
Poll::Ready(None) => return Poll::Ready(Ok(())), Poll::Ready(None) => return Poll::Ready(Ok(())),

View file

@ -28,7 +28,6 @@ use tungstenite::protocol::WebSocketConfig;
mod connection; mod connection;
mod error; mod error;
mod utils;
use connection::Connection; use connection::Connection;
use error::Error; use error::Error;

View file

@ -1,30 +0,0 @@
use futures::pin_mut;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Like `futures::future::Either` but for Streams
pub enum EitherS<A, B> {
A(A),
B(B),
}
impl<A, B> Stream for EitherS<A, B>
where
A: Stream + Unpin,
B: Stream<Item = A::Item> + Unpin,
{
type Item = A::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.get_mut() {
EitherS::A(s) => {
pin_mut!(s);
s.poll_next(cx)
}
EitherS::B(s) => {
pin_mut!(s);
s.poll_next(cx)
}
}
}
}