Remove intermediate per-destination outbound packet buffers

With futures 0.3, we can check whether a sink can accept items without actually
sending them to it, which used to be the reason why we needed to per-destination
buffers.
This commit is contained in:
Jonas Herzig 2020-04-05 18:17:48 +02:00
parent 995eed0a9b
commit c2ebc19134

View file

@ -91,9 +91,6 @@ pub struct Connection {
outbound_client: Pin<Box<dyn Sink<ControlPacket<Clientbound>, Error = Error> + Send>>, outbound_client: Pin<Box<dyn Sink<ControlPacket<Clientbound>, Error = Error> + Send>>,
inbound_server: Pin<Box<dyn Stream<Item = Result<ControlPacket<Clientbound>, Error>> + Send>>, inbound_server: Pin<Box<dyn Stream<Item = Result<ControlPacket<Clientbound>, Error>> + Send>>,
outbound_server: Pin<Box<dyn Sink<ControlPacket<Serverbound>, Error = Error> + Send>>, outbound_server: Pin<Box<dyn Sink<ControlPacket<Serverbound>, Error = Error> + Send>>,
next_clientbound_frame: Option<ControlPacket<Clientbound>>,
next_serverbound_frame: Option<ControlPacket<Serverbound>>,
next_rtp_frame: Option<Vec<u8>>,
outbound_buf: VecDeque<Frame>, outbound_buf: VecDeque<Frame>,
ice: Option<(ice::Agent, ice::Stream)>, ice: Option<(ice::Agent, ice::Stream)>,
@ -149,9 +146,6 @@ impl Connection {
outbound_client: Box::pin(client_sink), outbound_client: Box::pin(client_sink),
inbound_server: Box::pin(server_stream), inbound_server: Box::pin(server_stream),
outbound_server: Box::pin(server_sink), outbound_server: Box::pin(server_sink),
next_clientbound_frame: None,
next_serverbound_frame: None,
next_rtp_frame: None,
outbound_buf: VecDeque::new(), outbound_buf: VecDeque::new(),
ice: None, ice: None,
candidate_gathering_done: false, candidate_gathering_done: false,
@ -290,6 +284,52 @@ impl Connection {
} }
} }
fn dispatch_outbound_frames(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Error>> {
// Make sure we can send any pending frames before trying to do so
ready!(self.outbound_server.as_mut().poll_ready(cx)?);
ready!(self.outbound_client.as_mut().poll_ready(cx)?);
if let Some(ref mut dtls_srtp) = self.dtls_srtp {
ready!(Pin::new(dtls_srtp).poll_ready(cx)?);
}
// Send out all pending frames
while let Some(frame) = self.outbound_buf.pop_front() {
match frame {
Frame::Server(frame) => {
self.outbound_server.as_mut().start_send(frame)?;
ready!(self.outbound_server.as_mut().poll_ready(cx)?);
}
Frame::Client(frame) => {
self.outbound_client.as_mut().start_send(frame)?;
ready!(self.outbound_client.as_mut().poll_ready(cx)?);
}
Frame::Rtp(frame) => {
let mut buf = Vec::new();
self.rtp_writer.write_packet(&mut buf, &frame)?;
if let Some(ref mut dtls_srtp) = self.dtls_srtp {
pin_mut!(dtls_srtp);
dtls_srtp.as_mut().start_send(&buf)?;
ready!(dtls_srtp.poll_ready(cx)?);
} else {
// RTP not yet setup, just drop the frame
}
}
}
}
// All frames have been sent (or queued), flush any buffers in the output path
let _ = self.outbound_client.as_mut().poll_flush(cx)?;
let _ = self.outbound_server.as_mut().poll_flush(cx)?;
if let Some(ref mut dtls_srtp) = self.dtls_srtp {
let _ = Pin::new(dtls_srtp).poll_flush(cx)?;
}
Poll::Ready(Ok(()))
}
fn handle_voice_packet(&mut self, packet: VoicePacket<Clientbound>) -> Result<(), Error> { fn handle_voice_packet(&mut self, packet: VoicePacket<Clientbound>) -> Result<(), Error> {
let (target, session_id, seq_num, opus_data, last_bit) = match packet { let (target, session_id, seq_num, opus_data, last_bit) = match packet {
VoicePacket::Audio { VoicePacket::Audio {
@ -525,54 +565,7 @@ impl Future for Connection {
let _ = agent.poll(cx); let _ = agent.poll(cx);
} }
// If there's a frame pending to be sent, sent it before everything else ready!(self.as_mut().dispatch_outbound_frames(cx))?;
if self.next_serverbound_frame.is_some() {
ready!(self.outbound_server.as_mut().poll_ready(cx)?);
let frame = self.next_serverbound_frame.take().unwrap();
self.outbound_server.as_mut().start_send(frame)?;
}
if self.next_clientbound_frame.is_some() {
ready!(self.outbound_client.as_mut().poll_ready(cx)?);
let frame = self.next_clientbound_frame.take().unwrap();
self.outbound_client.as_mut().start_send(frame)?;
}
if let Some(frame) = self.next_rtp_frame.take() {
if let Some(ref mut dtls_srtp) = self.dtls_srtp {
pin_mut!(dtls_srtp);
match dtls_srtp.as_mut().poll_ready(cx)? {
Poll::Pending => {
self.next_rtp_frame = Some(frame);
}
Poll::Ready(()) => {
dtls_srtp.as_mut().start_send(&frame)?;
}
}
} else {
// RTP not yet setup, just drop the frame
self.next_rtp_frame = None;
}
}
// Send out all pending frames
if let Some(frame) = self.outbound_buf.pop_front() {
match frame {
Frame::Server(frame) => self.next_serverbound_frame = Some(frame),
Frame::Client(frame) => self.next_clientbound_frame = Some(frame),
Frame::Rtp(frame) => {
let mut buf = Vec::new();
self.rtp_writer.write_packet(&mut buf, &frame)?;
self.next_rtp_frame = Some(buf)
}
}
continue 'poll;
}
// All frames have been sent (or queued), flush any buffers in the output path
let _ = self.outbound_client.as_mut().poll_flush(cx)?;
let _ = self.outbound_server.as_mut().poll_flush(cx)?;
if let Some(ref mut dtls_srtp) = self.dtls_srtp {
let _ = Pin::new(dtls_srtp).poll_flush(cx)?;
}
// Check/register voice timeouts // Check/register voice timeouts
// Note that this must be ran if any new sessions are added or timeouts are // Note that this must be ran if any new sessions are added or timeouts are