Replace "nice" crate with futures-based "libnice" crate
This commit is contained in:
parent
459b538794
commit
353de4ec2f
7 changed files with 375 additions and 686 deletions
|
@ -1,5 +1,6 @@
|
|||
use futures::stream;
|
||||
use futures::{Future, Sink, Stream};
|
||||
use libnice::ice;
|
||||
use openssl::asn1::Asn1Time;
|
||||
use openssl::hash::MessageDigest;
|
||||
use openssl::pkey::{PKey, Private};
|
||||
|
@ -15,13 +16,14 @@ use rtp::rfc5761::{MuxPacketReader, MuxPacketWriter, MuxedPacket};
|
|||
use rtp::rfc5764::{DtlsSrtp, DtlsSrtpHandshakeResult};
|
||||
use rtp::traits::{ReadPacket, WritePacket};
|
||||
use std::collections::BTreeMap;
|
||||
use std::ffi::CString;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::io;
|
||||
use tokio::prelude::*;
|
||||
use tokio::timer::Delay;
|
||||
use webrtc_sdp::attribute_type::SdpAttribute;
|
||||
|
||||
use error::Error;
|
||||
use ice::{IceAgent, IceStream};
|
||||
use mumble;
|
||||
use mumble::MumbleFrame;
|
||||
use protos::Mumble;
|
||||
|
@ -93,11 +95,10 @@ pub struct Connection {
|
|||
next_rtp_frame: Option<Vec<u8>>,
|
||||
stream_to_be_sent: Option<Box<Stream<Item = Frame, Error = Error>>>,
|
||||
|
||||
ice_future: Option<Box<Future<Item = (IceAgent, IceStream), Error = Error>>>,
|
||||
ice: Option<IceAgent>,
|
||||
ice: Option<(ice::Agent, ice::Stream)>,
|
||||
|
||||
dtls_srtp_future: Option<DtlsSrtpHandshakeResult<IceStream, SslAcceptorBuilder>>,
|
||||
dtls_srtp: Option<DtlsSrtp<IceStream, SslAcceptorBuilder>>,
|
||||
dtls_srtp_future: Option<DtlsSrtpHandshakeResult<ice::StreamComponent, SslAcceptorBuilder>>,
|
||||
dtls_srtp: Option<DtlsSrtp<ice::StreamComponent, SslAcceptorBuilder>>,
|
||||
dtls_key: PKey<Private>,
|
||||
dtls_cert: X509,
|
||||
|
||||
|
@ -146,7 +147,6 @@ impl Connection {
|
|||
next_serverbound_frame: None,
|
||||
next_rtp_frame: None,
|
||||
stream_to_be_sent: None,
|
||||
ice_future: None,
|
||||
ice: None,
|
||||
dtls_srtp_future: None,
|
||||
dtls_srtp: None,
|
||||
|
@ -190,11 +190,20 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
fn setup_ice(
|
||||
&mut self,
|
||||
agent: IceAgent,
|
||||
stream: IceStream,
|
||||
) -> impl Stream<Item = Frame, Error = Error> {
|
||||
fn setup_ice(&mut self) -> impl Stream<Item = Frame, Error = Error> {
|
||||
// Setup ICE agent
|
||||
let mut agent = ice::Agent::new_rfc5245();
|
||||
agent.set_software("mumble-web-proxy");
|
||||
|
||||
// Setup ICE stream
|
||||
let mut stream = match agent.stream_builder(1).build() {
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
return stream::once(Err(io::Error::new(io::ErrorKind::Other, err).into()));
|
||||
}
|
||||
};
|
||||
let component = stream.take_components().pop().expect("one component");
|
||||
|
||||
// Send WebRTC details to the client
|
||||
let mut msg = Mumble::WebRTC::new();
|
||||
msg.set_dtls_fingerprint(
|
||||
|
@ -206,40 +215,24 @@ impl Connection {
|
|||
.collect::<Vec<_>>()
|
||||
.join(":"),
|
||||
);
|
||||
msg.set_ice_pwd(agent.pwd().to_owned());
|
||||
msg.set_ice_ufrag(agent.ufrag().to_owned());
|
||||
let webrtc_msg = Frame::Client(MumbleFrame {
|
||||
msg.set_ice_pwd(stream.get_local_pwd().to_owned());
|
||||
msg.set_ice_ufrag(stream.get_local_ufrag().to_owned());
|
||||
let webrtc_msg = MumbleFrame {
|
||||
id: mumble::MSG_WEBRTC,
|
||||
bytes: msg.write_to_bytes().unwrap().into(),
|
||||
});
|
||||
};
|
||||
|
||||
// Parse ICE candidates and send them to the client
|
||||
let candidate_msgs = agent
|
||||
.sdp()
|
||||
.lines()
|
||||
.filter(|line| line.starts_with("a=candidate"))
|
||||
.map(|line| line[2..].to_owned())
|
||||
.map(move |candidate| {
|
||||
let mut msg = Mumble::IceCandidate::new();
|
||||
msg.set_content(candidate);
|
||||
Frame::Client(MumbleFrame {
|
||||
id: mumble::MSG_ICE_CANDIDATE,
|
||||
bytes: msg.write_to_bytes().unwrap().into(),
|
||||
})
|
||||
})
|
||||
.collect::<Vec<Frame>>();
|
||||
|
||||
// Store ice agent for later use
|
||||
self.ice = Some(agent);
|
||||
// Store ice agent and stream for later use
|
||||
self.ice = Some((agent, stream));
|
||||
|
||||
// Prepare to accept the DTLS connection
|
||||
let mut acceptor = SslAcceptor::mozilla_modern(SslMethod::dtls()).unwrap();
|
||||
acceptor.set_certificate(&self.dtls_cert).unwrap();
|
||||
acceptor.set_private_key(&self.dtls_key).unwrap();
|
||||
// FIXME: verify remote fingerprint
|
||||
self.dtls_srtp_future = Some(DtlsSrtp::handshake(stream, acceptor));
|
||||
self.dtls_srtp_future = Some(DtlsSrtp::handshake(component, acceptor));
|
||||
|
||||
stream::iter_ok(Some(webrtc_msg).into_iter().chain(candidate_msgs))
|
||||
stream::once(Ok(Frame::Client(webrtc_msg)))
|
||||
}
|
||||
|
||||
fn handle_voice_packet(&mut self, buf: &[u8]) -> impl Stream<Item = Frame, Error = Error> {
|
||||
|
@ -384,7 +377,7 @@ impl Connection {
|
|||
fn process_packet_from_client(
|
||||
&mut self,
|
||||
mut frame: MumbleFrame,
|
||||
) -> impl Stream<Item = Frame, Error = Error> {
|
||||
) -> Box<Stream<Item = Frame, Error = Error>> {
|
||||
match frame.id {
|
||||
mumble::MSG_AUTHENTICATE => {
|
||||
let mut message: Mumble::Authenticate =
|
||||
|
@ -396,52 +389,50 @@ impl Connection {
|
|||
// and make sure opus is marked as supported
|
||||
message.set_opus(true);
|
||||
|
||||
self.ice_future = Some(Box::new(IceAgent::bind()));
|
||||
}
|
||||
let stream = self.setup_ice();
|
||||
|
||||
frame.bytes = message.write_to_bytes().unwrap().as_slice().into();
|
||||
EitherS::A(EitherS::A(stream::once(Ok(Frame::Server(frame)))))
|
||||
frame.bytes = message.write_to_bytes().unwrap().as_slice().into();
|
||||
Box::new(stream::once(Ok(Frame::Server(frame))).chain(stream))
|
||||
} else {
|
||||
Box::new(stream::once(Ok(Frame::Server(frame))))
|
||||
}
|
||||
}
|
||||
mumble::MSG_WEBRTC => {
|
||||
let mut message: Mumble::WebRTC = protobuf::parse_from_bytes(&frame.bytes).unwrap();
|
||||
println!("Got WebRTC: {:?}", message);
|
||||
if let Some(ref mut agent) = self.ice {
|
||||
let f1 = agent.set_remote_pwd(message.take_ice_pwd());
|
||||
let f2 = agent.set_remote_ufrag(message.take_ice_ufrag());
|
||||
if let Some((_, stream)) = &mut self.ice {
|
||||
if let (Ok(ufrag), Ok(pwd)) = (
|
||||
CString::new(message.take_ice_ufrag()),
|
||||
CString::new(message.take_ice_pwd()),
|
||||
) {
|
||||
stream.set_remote_credentials(ufrag, pwd);
|
||||
}
|
||||
// FIXME trigger ICE-restart if required
|
||||
// FIXME store and use remote dtls fingerprint
|
||||
EitherS::B(EitherS::A(
|
||||
f1.join(f2)
|
||||
.map(|_| stream::empty())
|
||||
.map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::Other, "failed to set ice creds")
|
||||
})
|
||||
.from_err()
|
||||
.flatten_stream(),
|
||||
))
|
||||
} else {
|
||||
EitherS::A(EitherS::B(stream::empty()))
|
||||
}
|
||||
Box::new(stream::empty())
|
||||
}
|
||||
mumble::MSG_ICE_CANDIDATE => {
|
||||
let mut message: Mumble::IceCandidate =
|
||||
protobuf::parse_from_bytes(&frame.bytes).unwrap();
|
||||
let candidate = message.take_content();
|
||||
println!("Got ice candidate: {:?}", candidate);
|
||||
if let Some(ref mut agent) = self.ice {
|
||||
EitherS::B(EitherS::B(
|
||||
agent
|
||||
.add_remote_ice_candidate(candidate)
|
||||
.map(|_| stream::empty())
|
||||
.map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::Other, "failed to add ice candidate")
|
||||
})
|
||||
.from_err()
|
||||
.flatten_stream(),
|
||||
))
|
||||
} else {
|
||||
EitherS::A(EitherS::B(stream::empty()))
|
||||
if let Some((_, stream)) = &mut self.ice {
|
||||
match format!("candidate:{}", candidate).parse() {
|
||||
Ok(SdpAttribute::Candidate(candidate)) => {
|
||||
stream.add_remote_candidate(candidate)
|
||||
}
|
||||
Ok(_) => unreachable!(),
|
||||
Err(err) => {
|
||||
return Box::new(stream::once(Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("Error parsing ICE candidate: {}", err),
|
||||
)
|
||||
.into())));
|
||||
}
|
||||
}
|
||||
}
|
||||
Box::new(stream::empty())
|
||||
}
|
||||
mumble::MSG_TALKING_STATE => {
|
||||
let mut message: Mumble::TalkingState =
|
||||
|
@ -451,9 +442,9 @@ impl Connection {
|
|||
} else {
|
||||
None
|
||||
};
|
||||
EitherS::A(EitherS::B(stream::empty()))
|
||||
Box::new(stream::empty())
|
||||
}
|
||||
_ => EitherS::A(EitherS::A(stream::once(Ok(Frame::Server(frame))))),
|
||||
_ => Box::new(stream::once(Ok(Frame::Server(frame)))),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -492,6 +483,10 @@ impl Future for Connection {
|
|||
|
||||
fn poll(&mut self) -> Poll<(), Error> {
|
||||
'poll: loop {
|
||||
if let Some((agent, _)) = &mut self.ice {
|
||||
agent.poll()?;
|
||||
}
|
||||
|
||||
// If there's a frame pending to be sent, sent it before everything else
|
||||
if let Some(frame) = self.next_serverbound_frame.take() {
|
||||
match self.outbound_server.start_send(frame)? {
|
||||
|
@ -570,20 +565,20 @@ impl Future for Connection {
|
|||
}
|
||||
}
|
||||
|
||||
// Poll ice future if required
|
||||
if self.ice_future.is_some() {
|
||||
if let Async::Ready((agent, stream)) = self.ice_future.as_mut().unwrap().poll()? {
|
||||
self.ice_future = None;
|
||||
|
||||
println!("ICE ready.");
|
||||
|
||||
let stream = self.setup_ice(agent, stream);
|
||||
self.stream_to_be_sent = Some(Box::new(stream));
|
||||
// Poll ice stream for new candidates
|
||||
if let Some((_, stream)) = &mut self.ice {
|
||||
if let Async::Ready(Some(candidate)) = stream.poll()? {
|
||||
let candidate = format!("candidate:{}", candidate.to_string());
|
||||
println!("Local ice candidate: {}", candidate);
|
||||
// Got a new candidate, send it to the client
|
||||
let mut msg = Mumble::IceCandidate::new();
|
||||
msg.set_content(candidate.to_string());
|
||||
let frame = Frame::Client(MumbleFrame {
|
||||
id: mumble::MSG_ICE_CANDIDATE,
|
||||
bytes: msg.write_to_bytes().unwrap().into(),
|
||||
});
|
||||
self.stream_to_be_sent = Some(Box::new(stream::once(Ok(frame))));
|
||||
continue 'poll;
|
||||
} else {
|
||||
// wait for ice before processing futher packets to ensure
|
||||
// that the WebRTC init message isn't sent too late
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
202
src/ice.rs
202
src/ice.rs
|
@ -1,202 +0,0 @@
|
|||
// FIXME replace with proper libnice bindings or pure-rust ICE/STUN lib
|
||||
use error::Error;
|
||||
use future::Either;
|
||||
use futures::sync::{mpsc, oneshot};
|
||||
use futures::{Future, Sink, Stream};
|
||||
use nice::api_agent::Agent;
|
||||
use nice::api_gobject::GMainLoop;
|
||||
use nice::bindings_agent as ice_ffi;
|
||||
use std::clone::Clone;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use tokio::io;
|
||||
use tokio::prelude::*;
|
||||
|
||||
pub struct IceAgent {
|
||||
ufrag: String,
|
||||
pwd: String,
|
||||
sdp: String,
|
||||
remote_sdp_sender: mpsc::Sender<String>,
|
||||
remote_ice_candidates: Vec<String>,
|
||||
remote_ufrag: Option<String>,
|
||||
remote_pwd: Option<String>,
|
||||
}
|
||||
|
||||
pub struct IceStream {
|
||||
receiver: mpsc::Receiver<Vec<u8>>,
|
||||
sender: mpsc::Sender<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl IceAgent {
|
||||
pub fn bind() -> impl Future<Item = (Self, IceStream), Error = Error> {
|
||||
let (tokio_sender, thread_receiver) = mpsc::channel(10);
|
||||
let (thread_sender, tokio_receiver) = mpsc::channel(10);
|
||||
let (cred_sender, cred_receiver) = oneshot::channel();
|
||||
let (remote_sdp_sender, remote_sdp_receiver) = mpsc::channel(10);
|
||||
|
||||
thread::spawn(move || {
|
||||
let thread_sender = Mutex::new(thread_sender);
|
||||
let mut recv_callback = Box::new(move |buf: &[u8]| {
|
||||
let result = thread_sender.lock().unwrap().try_send(buf.to_vec());
|
||||
if let Err(err) = result {
|
||||
eprintln!("Failed to queue packet: {:?} {:?}", buf, err);
|
||||
}
|
||||
});
|
||||
|
||||
let main_loop = GMainLoop::new();
|
||||
let context = main_loop.get_context();
|
||||
thread::spawn(move || {
|
||||
// FIXME stop at some point
|
||||
main_loop.run();
|
||||
});
|
||||
|
||||
let agent = Arc::new(Agent::new(&context, ice_ffi::NICE_COMPATIBILITY_RFC5245));
|
||||
agent.set_software("mumble-web-proxy");
|
||||
|
||||
let stream_id = agent.add_stream(1).unwrap();
|
||||
let (ufrag, pwd) = agent.get_local_credentials(stream_id).unwrap();
|
||||
|
||||
agent.set_stream_name(stream_id, "audio");
|
||||
agent.gather_candidates(stream_id);
|
||||
agent.attach_recv(stream_id, 1, &context, &mut recv_callback);
|
||||
|
||||
// ok, here's the thing: libnice.rs is a giant train wreck.
|
||||
// It doesn't require any of the closures which it takes to be Send
|
||||
// even though, considering they're called from the GMainLoop,
|
||||
// they really should be. That's going to explode sooner or later.
|
||||
// I have no clue how many other things are broken but one of them
|
||||
// is on_candidate_gathering_done which just segfaults.
|
||||
// Since I can neither be bothered to debug that mess nor to write
|
||||
// new bindings or a pure-rust ice lib (yet), we'll work around the
|
||||
// issue by periodically polling. FIXME
|
||||
// It turns out attach_recv is also broken, so I had to go in and fix
|
||||
// that but now I've already written this workaround so it's here to
|
||||
// stay (at least until there's a better libnice binding).
|
||||
// This will probably only give non-turn candidates which should
|
||||
// be enough for our use-case.
|
||||
loop {
|
||||
let maybe_sdp = agent.generate_local_stream_sdp(stream_id, false);
|
||||
if let Some(sdp) = maybe_sdp {
|
||||
cred_sender.send((sdp, ufrag, pwd)).unwrap(); // FIXME handle shutdown
|
||||
break;
|
||||
}
|
||||
::std::thread::sleep(::std::time::Duration::from_millis(100));
|
||||
}
|
||||
|
||||
let remote_sdp_handler = remote_sdp_receiver.for_each(|remote_sdp: String| {
|
||||
// FIXME do we need to handle invalid sdp?
|
||||
agent.parse_remote_sdp(&remote_sdp).unwrap();
|
||||
Ok(())
|
||||
});
|
||||
let packet_send_handler = thread_receiver.for_each(|packet: Vec<u8>| {
|
||||
agent.send(stream_id, 1, &packet[..]);
|
||||
Ok(())
|
||||
});
|
||||
remote_sdp_handler.join(packet_send_handler).wait().unwrap();
|
||||
});
|
||||
|
||||
cred_receiver
|
||||
.map(|(sdp, ufrag, pwd): (String, String, String)| {
|
||||
(
|
||||
Self {
|
||||
ufrag,
|
||||
pwd,
|
||||
sdp,
|
||||
remote_sdp_sender,
|
||||
remote_ice_candidates: Vec::new(),
|
||||
remote_ufrag: None,
|
||||
remote_pwd: None,
|
||||
},
|
||||
IceStream {
|
||||
receiver: tokio_receiver,
|
||||
sender: tokio_sender,
|
||||
},
|
||||
)
|
||||
})
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
.from_err()
|
||||
}
|
||||
|
||||
pub fn pwd(&self) -> &str {
|
||||
&self.pwd
|
||||
}
|
||||
|
||||
pub fn ufrag(&self) -> &str {
|
||||
&self.ufrag
|
||||
}
|
||||
|
||||
pub fn sdp(&self) -> &str {
|
||||
&self.sdp
|
||||
}
|
||||
|
||||
pub fn set_remote_pwd(&mut self, pwd: String) -> impl Future<Item = (), Error = ()> {
|
||||
self.remote_pwd = Some(pwd);
|
||||
self.update_remote_sdp()
|
||||
}
|
||||
|
||||
pub fn set_remote_ufrag(&mut self, ufrag: String) -> impl Future<Item = (), Error = ()> {
|
||||
self.remote_ufrag = Some(ufrag);
|
||||
self.update_remote_sdp()
|
||||
}
|
||||
|
||||
pub fn add_remote_ice_candidate(
|
||||
&mut self,
|
||||
candidate: String,
|
||||
) -> impl Future<Item = (), Error = ()> {
|
||||
self.remote_ice_candidates.push(candidate);
|
||||
self.update_remote_sdp()
|
||||
}
|
||||
|
||||
pub fn update_remote_sdp(&self) -> impl Future<Item = (), Error = ()> {
|
||||
if let (Some(pwd), Some(ufrag)) = (&self.remote_pwd, &self.remote_ufrag) {
|
||||
let mut sdp = Vec::new();
|
||||
sdp.push("a=ice-options:trickle".to_owned());
|
||||
sdp.push("m=audio".to_owned());
|
||||
for candidate in &self.remote_ice_candidates {
|
||||
sdp.push("a=".to_owned() + candidate);
|
||||
}
|
||||
sdp.push("a=ice-pwd:".to_owned() + pwd);
|
||||
sdp.push("a=ice-ufrag:".to_owned() + ufrag);
|
||||
let f = self.remote_sdp_sender.clone().send(sdp.join("\n"));
|
||||
Either::A(f.map(|_| ()).map_err(|_| ()))
|
||||
} else {
|
||||
Either::B(future::ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for IceStream {
|
||||
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
|
||||
match self.receiver.poll() {
|
||||
Ok(Async::Ready(Some(buf))) => (&buf[..]).read(dst),
|
||||
Ok(Async::Ready(None)) => Ok(0),
|
||||
Ok(Async::NotReady) => Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
|
||||
Err(err) => panic!(err), // FIXME should we really panic here? when can this happen?
|
||||
}
|
||||
}
|
||||
}
|
||||
impl AsyncRead for IceStream {}
|
||||
|
||||
impl Write for IceStream {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
match self.sender.start_send(buf.to_vec()) {
|
||||
Ok(AsyncSink::Ready) => Ok(buf.len()),
|
||||
Ok(AsyncSink::NotReady(_)) => Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
|
||||
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
match self.sender.poll_complete() {
|
||||
Ok(Async::Ready(())) => Ok(()),
|
||||
Ok(Async::NotReady) => Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
|
||||
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl AsyncWrite for IceStream {
|
||||
fn shutdown(&mut self) -> io::Result<Async<()>> {
|
||||
Ok(Async::Ready(())) // FIXME actually shutdown ice
|
||||
}
|
||||
}
|
||||
|
|
@ -4,8 +4,8 @@ extern crate argparse;
|
|||
extern crate byteorder;
|
||||
extern crate bytes;
|
||||
extern crate futures;
|
||||
extern crate libnice;
|
||||
extern crate native_tls;
|
||||
extern crate nice;
|
||||
extern crate openssl;
|
||||
extern crate protobuf;
|
||||
extern crate rtp;
|
||||
|
@ -13,6 +13,7 @@ extern crate tokio;
|
|||
extern crate tokio_codec;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_tls;
|
||||
extern crate webrtc_sdp;
|
||||
extern crate websocket;
|
||||
|
||||
use argparse::{ArgumentParser, Store};
|
||||
|
@ -22,7 +23,6 @@ use futures::{Future, Sink, Stream};
|
|||
use std::convert::Into;
|
||||
use std::net::ToSocketAddrs;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::prelude::*;
|
||||
use tokio_codec::Decoder;
|
||||
use tokio_core::reactor::Core;
|
||||
use tokio_tls::TlsConnector;
|
||||
|
@ -32,7 +32,6 @@ use websocket::server::InvalidConnection;
|
|||
|
||||
mod connection;
|
||||
mod error;
|
||||
mod ice;
|
||||
mod mumble;
|
||||
mod utils;
|
||||
mod protos {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue