From c2ebc191349456f40296ebbc12a01ca24fd37f0a Mon Sep 17 00:00:00 2001 From: Jonas Herzig Date: Sun, 5 Apr 2020 18:17:48 +0200 Subject: [PATCH] Remove intermediate per-destination outbound packet buffers With futures 0.3, we can check whether a sink can accept items without actually sending them to it, which used to be the reason why we needed to per-destination buffers. --- src/connection.rs | 101 +++++++++++++++++++++------------------------- 1 file changed, 47 insertions(+), 54 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index b9e6e6f..12d1e62 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -91,9 +91,6 @@ pub struct Connection { outbound_client: Pin, Error = Error> + Send>>, inbound_server: Pin, Error>> + Send>>, outbound_server: Pin, Error = Error> + Send>>, - next_clientbound_frame: Option>, - next_serverbound_frame: Option>, - next_rtp_frame: Option>, outbound_buf: VecDeque, ice: Option<(ice::Agent, ice::Stream)>, @@ -149,9 +146,6 @@ impl Connection { outbound_client: Box::pin(client_sink), inbound_server: Box::pin(server_stream), outbound_server: Box::pin(server_sink), - next_clientbound_frame: None, - next_serverbound_frame: None, - next_rtp_frame: None, outbound_buf: VecDeque::new(), ice: None, candidate_gathering_done: false, @@ -290,6 +284,52 @@ impl Connection { } } + fn dispatch_outbound_frames( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + // Make sure we can send any pending frames before trying to do so + ready!(self.outbound_server.as_mut().poll_ready(cx)?); + ready!(self.outbound_client.as_mut().poll_ready(cx)?); + if let Some(ref mut dtls_srtp) = self.dtls_srtp { + ready!(Pin::new(dtls_srtp).poll_ready(cx)?); + } + + // Send out all pending frames + while let Some(frame) = self.outbound_buf.pop_front() { + match frame { + Frame::Server(frame) => { + self.outbound_server.as_mut().start_send(frame)?; + ready!(self.outbound_server.as_mut().poll_ready(cx)?); + } + Frame::Client(frame) => { + self.outbound_client.as_mut().start_send(frame)?; + ready!(self.outbound_client.as_mut().poll_ready(cx)?); + } + Frame::Rtp(frame) => { + let mut buf = Vec::new(); + self.rtp_writer.write_packet(&mut buf, &frame)?; + if let Some(ref mut dtls_srtp) = self.dtls_srtp { + pin_mut!(dtls_srtp); + dtls_srtp.as_mut().start_send(&buf)?; + ready!(dtls_srtp.poll_ready(cx)?); + } else { + // RTP not yet setup, just drop the frame + } + } + } + } + + // All frames have been sent (or queued), flush any buffers in the output path + let _ = self.outbound_client.as_mut().poll_flush(cx)?; + let _ = self.outbound_server.as_mut().poll_flush(cx)?; + if let Some(ref mut dtls_srtp) = self.dtls_srtp { + let _ = Pin::new(dtls_srtp).poll_flush(cx)?; + } + + Poll::Ready(Ok(())) + } + fn handle_voice_packet(&mut self, packet: VoicePacket) -> Result<(), Error> { let (target, session_id, seq_num, opus_data, last_bit) = match packet { VoicePacket::Audio { @@ -525,54 +565,7 @@ impl Future for Connection { let _ = agent.poll(cx); } - // If there's a frame pending to be sent, sent it before everything else - if self.next_serverbound_frame.is_some() { - ready!(self.outbound_server.as_mut().poll_ready(cx)?); - let frame = self.next_serverbound_frame.take().unwrap(); - self.outbound_server.as_mut().start_send(frame)?; - } - if self.next_clientbound_frame.is_some() { - ready!(self.outbound_client.as_mut().poll_ready(cx)?); - let frame = self.next_clientbound_frame.take().unwrap(); - self.outbound_client.as_mut().start_send(frame)?; - } - if let Some(frame) = self.next_rtp_frame.take() { - if let Some(ref mut dtls_srtp) = self.dtls_srtp { - pin_mut!(dtls_srtp); - match dtls_srtp.as_mut().poll_ready(cx)? { - Poll::Pending => { - self.next_rtp_frame = Some(frame); - } - Poll::Ready(()) => { - dtls_srtp.as_mut().start_send(&frame)?; - } - } - } else { - // RTP not yet setup, just drop the frame - self.next_rtp_frame = None; - } - } - - // Send out all pending frames - if let Some(frame) = self.outbound_buf.pop_front() { - match frame { - Frame::Server(frame) => self.next_serverbound_frame = Some(frame), - Frame::Client(frame) => self.next_clientbound_frame = Some(frame), - Frame::Rtp(frame) => { - let mut buf = Vec::new(); - self.rtp_writer.write_packet(&mut buf, &frame)?; - self.next_rtp_frame = Some(buf) - } - } - continue 'poll; - } - - // All frames have been sent (or queued), flush any buffers in the output path - let _ = self.outbound_client.as_mut().poll_flush(cx)?; - let _ = self.outbound_server.as_mut().poll_flush(cx)?; - if let Some(ref mut dtls_srtp) = self.dtls_srtp { - let _ = Pin::new(dtls_srtp).poll_flush(cx)?; - } + ready!(self.as_mut().dispatch_outbound_frames(cx))?; // Check/register voice timeouts // Note that this must be ran if any new sessions are added or timeouts are