Replace lots of generic code with dedicated mumble-protocol crate

This commit is contained in:
Jonas Herzig 2019-02-03 23:34:09 +01:00
parent 353de4ec2f
commit 443712cb22
10 changed files with 112 additions and 1028 deletions

View file

@ -1,13 +1,18 @@
use futures::stream;
use futures::{Future, Sink, Stream};
use libnice::ice;
use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::voice::VoicePacket;
use mumble_protocol::voice::VoicePacketPayload;
use mumble_protocol::Clientbound;
use mumble_protocol::Serverbound;
use openssl::asn1::Asn1Time;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey, Private};
use openssl::rsa::Rsa;
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslMethod};
use openssl::x509::X509;
use protobuf::Message;
use rtp::rfc3550::{
RtcpCompoundPacket, RtcpPacket, RtcpPacketReader, RtcpPacketWriter, RtpFixedHeader, RtpPacket,
RtpPacketReader, RtpPacketWriter,
@ -24,10 +29,7 @@ use tokio::timer::Delay;
use webrtc_sdp::attribute_type::SdpAttribute;
use error::Error;
use mumble;
use mumble::MumbleFrame;
use protos::Mumble;
use utils::{read_varint, write_varint32, EitherS};
use utils::EitherS;
type SessionId = u32;
@ -54,12 +56,9 @@ impl User {
self.start_voice_seq_num = 0;
self.highest_voice_seq_num = 0;
let mut msg = Mumble::TalkingState::new();
let mut msg = msgs::TalkingState::new();
msg.set_session(self.session);
EitherS::A(stream::once(Ok(Frame::Client(MumbleFrame {
id: mumble::MSG_TALKING_STATE,
bytes: msg.write_to_bytes().unwrap().into(),
}))))
EitherS::A(stream::once(Ok(Frame::Client(msg.into()))))
} else {
EitherS::B(stream::empty())
}
@ -74,24 +73,21 @@ impl User {
} else {
self.active = true;
let mut msg = Mumble::TalkingState::new();
let mut msg = msgs::TalkingState::new();
msg.set_session(self.session);
msg.set_target(target.into());
EitherS::B(stream::once(Ok(Frame::Client(MumbleFrame {
id: mumble::MSG_TALKING_STATE,
bytes: msg.write_to_bytes().unwrap().into(),
}))))
EitherS::B(stream::once(Ok(Frame::Client(msg.into()))))
}
}
}
pub struct Connection {
inbound_client: Box<Stream<Item = MumbleFrame, Error = Error>>,
outbound_client: Box<Sink<SinkItem = MumbleFrame, SinkError = Error>>,
inbound_server: Box<Stream<Item = MumbleFrame, Error = Error>>,
outbound_server: Box<Sink<SinkItem = MumbleFrame, SinkError = Error>>,
next_clientbound_frame: Option<MumbleFrame>,
next_serverbound_frame: Option<MumbleFrame>,
inbound_client: Box<Stream<Item = ControlPacket<Serverbound>, Error = Error>>,
outbound_client: Box<Sink<SinkItem = ControlPacket<Clientbound>, SinkError = Error>>,
inbound_server: Box<Stream<Item = ControlPacket<Clientbound>, Error = Error>>,
outbound_server: Box<Sink<SinkItem = ControlPacket<Serverbound>, SinkError = Error>>,
next_clientbound_frame: Option<ControlPacket<Clientbound>>,
next_serverbound_frame: Option<ControlPacket<Serverbound>>,
next_rtp_frame: Option<Vec<u8>>,
stream_to_be_sent: Option<Box<Stream<Item = Frame, Error = Error>>>,
@ -119,10 +115,10 @@ impl Connection {
server_stream: SSt,
) -> Self
where
CSi: Sink<SinkItem = MumbleFrame, SinkError = Error> + 'static,
CSt: Stream<Item = MumbleFrame, Error = Error> + 'static,
SSi: Sink<SinkItem = MumbleFrame, SinkError = Error> + 'static,
SSt: Stream<Item = MumbleFrame, Error = Error> + 'static,
CSi: Sink<SinkItem = ControlPacket<Clientbound>, SinkError = Error> + 'static,
CSt: Stream<Item = ControlPacket<Serverbound>, Error = Error> + 'static,
SSi: Sink<SinkItem = ControlPacket<Serverbound>, SinkError = Error> + 'static,
SSt: Stream<Item = ControlPacket<Clientbound>, Error = Error> + 'static,
{
let rsa = Rsa::generate(2048).unwrap();
let key = PKey::from_rsa(rsa).unwrap();
@ -205,7 +201,7 @@ impl Connection {
let component = stream.take_components().pop().expect("one component");
// Send WebRTC details to the client
let mut msg = Mumble::WebRTC::new();
let mut msg = msgs::WebRTC::new();
msg.set_dtls_fingerprint(
self.dtls_cert
.digest(MessageDigest::sha256())
@ -217,10 +213,6 @@ impl Connection {
);
msg.set_ice_pwd(stream.get_local_pwd().to_owned());
msg.set_ice_ufrag(stream.get_local_ufrag().to_owned());
let webrtc_msg = MumbleFrame {
id: mumble::MSG_WEBRTC,
bytes: msg.write_to_bytes().unwrap().into(),
};
// Store ice agent and stream for later use
self.ice = Some((agent, stream));
@ -232,37 +224,23 @@ impl Connection {
// FIXME: verify remote fingerprint
self.dtls_srtp_future = Some(DtlsSrtp::handshake(component, acceptor));
stream::once(Ok(Frame::Client(webrtc_msg)))
stream::once(Ok(Frame::Client(msg.into())))
}
fn handle_voice_packet(&mut self, buf: &[u8]) -> impl Stream<Item = Frame, Error = Error> {
let (header, buf) = match buf.split_first() {
Some(t) => t,
None => return EitherS::B(stream::empty()),
fn handle_voice_packet(
&mut self,
packet: VoicePacket<Clientbound>,
) -> impl Stream<Item = Frame, Error = Error> {
let (target, session_id, seq_num, opus_data, last_bit) = match packet {
VoicePacket::Audio {
target,
session_id,
seq_num,
payload: VoicePacketPayload::Opus(data, last_bit),
..
} => (target, session_id, seq_num, data, last_bit),
_ => return EitherS::B(stream::empty()),
};
if (header >> 5_u8) != 4_u8 {
// only opus
return EitherS::B(stream::empty());
}
let target = header & 0x1f;
let (session_id, buf) = match read_varint(buf) {
Some(t) => t,
None => return EitherS::B(stream::empty()),
};
let (seq_num, buf) = match read_varint(buf) {
Some(t) => t,
None => return EitherS::B(stream::empty()),
};
let (opus_header, buf) = match read_varint(buf) {
Some(t) => t,
None => return EitherS::B(stream::empty()),
};
let length = (opus_header & 0x1fff) as usize;
let last_bit = opus_header & 0x2000 != 0;
if length > buf.len() {
return EitherS::B(stream::empty());
}
let (opus_data, _) = buf.split_at(length);
// NOTE: the mumble packet id increases by 1 per 10ms of audio contained
// whereas rtp seq_num should increase by 1 per packet, regardless of audio,
@ -349,39 +327,32 @@ impl Connection {
fn process_packet_from_server(
&mut self,
mut frame: MumbleFrame,
packet: ControlPacket<Clientbound>,
) -> impl Stream<Item = Frame, Error = Error> {
match frame.id {
mumble::MSG_UDP_TUNNEL => EitherS::A(self.handle_voice_packet(&frame.bytes)),
mumble::MSG_USER_STATE => {
let mut message: Mumble::UserState =
protobuf::parse_from_bytes(&frame.bytes).unwrap();
match packet {
ControlPacket::UDPTunnel(voice) => EitherS::A(self.handle_voice_packet(*voice)),
ControlPacket::UserState(mut message) => {
let session_id = message.get_session();
if !self.sessions.contains_key(&session_id) {
let user = self.allocate_ssrc(session_id);
message.set_ssrc(user.ssrc);
}
frame.bytes = message.write_to_bytes().unwrap().as_slice().into();
EitherS::B(stream::once(Ok(Frame::Client(frame))))
EitherS::B(stream::once(Ok(Frame::Client((*message).into()))))
}
mumble::MSG_USER_REMOVE => {
let mut message: Mumble::UserRemove =
protobuf::parse_from_bytes(&frame.bytes).unwrap();
ControlPacket::UserRemove(message) => {
self.free_ssrc(message.get_session());
EitherS::B(stream::once(Ok(Frame::Client(frame))))
EitherS::B(stream::once(Ok(Frame::Client((*message).into()))))
}
_ => EitherS::B(stream::once(Ok(Frame::Client(frame)))),
other => EitherS::B(stream::once(Ok(Frame::Client(other)))),
}
}
fn process_packet_from_client(
&mut self,
mut frame: MumbleFrame,
packet: ControlPacket<Serverbound>,
) -> Box<Stream<Item = Frame, Error = Error>> {
match frame.id {
mumble::MSG_AUTHENTICATE => {
let mut message: Mumble::Authenticate =
protobuf::parse_from_bytes(&frame.bytes).unwrap();
match packet {
ControlPacket::Authenticate(mut message) => {
println!("MSG Authenticate: {:?}", message);
if message.get_webrtc() {
// strip webrtc support from the connection (we will be providing it)
@ -391,14 +362,12 @@ impl Connection {
let stream = self.setup_ice();
frame.bytes = message.write_to_bytes().unwrap().as_slice().into();
Box::new(stream::once(Ok(Frame::Server(frame))).chain(stream))
Box::new(stream::once(Ok(Frame::Server((*message).into()))).chain(stream))
} else {
Box::new(stream::once(Ok(Frame::Server(frame))))
Box::new(stream::once(Ok(Frame::Server((*message).into()))))
}
}
mumble::MSG_WEBRTC => {
let mut message: Mumble::WebRTC = protobuf::parse_from_bytes(&frame.bytes).unwrap();
ControlPacket::WebRTC(mut message) => {
println!("Got WebRTC: {:?}", message);
if let Some((_, stream)) = &mut self.ice {
if let (Ok(ufrag), Ok(pwd)) = (
@ -412,9 +381,7 @@ impl Connection {
}
Box::new(stream::empty())
}
mumble::MSG_ICE_CANDIDATE => {
let mut message: Mumble::IceCandidate =
protobuf::parse_from_bytes(&frame.bytes).unwrap();
ControlPacket::IceCandidate(mut message) => {
let candidate = message.take_content();
println!("Got ice candidate: {:?}", candidate);
if let Some((_, stream)) = &mut self.ice {
@ -434,9 +401,7 @@ impl Connection {
}
Box::new(stream::empty())
}
mumble::MSG_TALKING_STATE => {
let mut message: Mumble::TalkingState =
protobuf::parse_from_bytes(&frame.bytes).unwrap();
ControlPacket::TalkingState(message) => {
self.target = if message.has_target() {
Some(message.get_target() as u8)
} else {
@ -444,7 +409,7 @@ impl Connection {
};
Box::new(stream::empty())
}
_ => Box::new(stream::once(Ok(Frame::Server(frame)))),
other => Box::new(stream::once(Ok(Frame::Server(other)))),
}
}
@ -456,17 +421,16 @@ impl Connection {
// packet reordering and loss (done). But maybe keep it low?
let seq_num = rtp.header.timestamp / 480;
let header = 128_u8 | target;
let mut vec: Vec<u8> = Vec::new();
vec.push(header);
write_varint32(&mut vec, seq_num as u32).unwrap();
write_varint32(&mut vec, rtp.payload.len() as u32).unwrap();
vec.extend(rtp.payload);
let voice_packet = VoicePacket::Audio {
_dst: std::marker::PhantomData::<Serverbound>,
target,
session_id: (),
seq_num: seq_num.into(),
payload: VoicePacketPayload::Opus(rtp.payload.into(), false),
position_info: None,
};
Some(Ok(Frame::Server(MumbleFrame {
id: mumble::MSG_UDP_TUNNEL,
bytes: vec.into(),
})))
Some(Ok(Frame::Server(voice_packet.into())))
} else {
None
}
@ -571,12 +535,9 @@ impl Future for Connection {
let candidate = format!("candidate:{}", candidate.to_string());
println!("Local ice candidate: {}", candidate);
// Got a new candidate, send it to the client
let mut msg = Mumble::IceCandidate::new();
let mut msg = msgs::IceCandidate::new();
msg.set_content(candidate.to_string());
let frame = Frame::Client(MumbleFrame {
id: mumble::MSG_ICE_CANDIDATE,
bytes: msg.write_to_bytes().unwrap().into(),
});
let frame = Frame::Client(msg.into());
self.stream_to_be_sent = Some(Box::new(stream::once(Ok(frame))));
continue 'poll;
}
@ -632,7 +593,7 @@ impl Future for Connection {
#[derive(Clone)]
enum Frame {
Server(MumbleFrame),
Client(MumbleFrame),
Server(ControlPacket<Serverbound>),
Client(ControlPacket<Clientbound>),
Rtp(MuxedPacket<RtpPacket, RtcpCompoundPacket<RtcpPacket>>),
}

View file

@ -8,7 +8,6 @@ pub enum Error {
Io(std::io::Error),
ServerTls(native_tls::Error),
ClientConnection(websocket::result::WebSocketError),
Protobuf(protobuf::ProtobufError),
Misc(Box<std::error::Error>),
}
@ -30,12 +29,6 @@ impl From<native_tls::Error> for Error {
}
}
impl From<protobuf::ProtobufError> for Error {
fn from(e: protobuf::ProtobufError) -> Self {
Error::Protobuf(e)
}
}
impl From<tokio::timer::Error> for Error {
fn from(e: tokio::timer::Error) -> Self {
Error::Misc(Box::new(e))

View file

@ -1,3 +1,4 @@
#![feature(try_from)]
// FIXME don't just unwrap protobuf results
// FIXME for some reason, reconnecting without reloading the page fails DTLS handshake (FF)
extern crate argparse;
@ -5,9 +6,9 @@ extern crate byteorder;
extern crate bytes;
extern crate futures;
extern crate libnice;
extern crate mumble_protocol;
extern crate native_tls;
extern crate openssl;
extern crate protobuf;
extern crate rtp;
extern crate tokio;
extern crate tokio_codec;
@ -20,7 +21,12 @@ use argparse::{ArgumentParser, Store};
use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};
use futures::{Future, Sink, Stream};
use mumble_protocol::control::ClientControlCodec;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::control::RawControlPacket;
use mumble_protocol::Clientbound;
use std::convert::Into;
use std::convert::TryInto;
use std::net::ToSocketAddrs;
use tokio::net::TcpStream;
use tokio_codec::Decoder;
@ -32,14 +38,9 @@ use websocket::server::InvalidConnection;
mod connection;
mod error;
mod mumble;
mod utils;
mod protos {
pub mod Mumble;
}
use connection::Connection;
use error::Error;
use mumble::{MumbleCodec, MumbleFrame};
fn main() {
let mut ws_port = 0_u16;
@ -104,31 +105,35 @@ fn main() {
let (client_sink, client_stream) = client.split();
// buffered client sink to prevent temporary lag on the control
// channel from lagging the otherwise independent audio channel
let client_sink = client_sink.buffer(10).with(|m: MumbleFrame| {
let bytes = &m.bytes;
let len = bytes.len();
let mut buf = BytesMut::with_capacity(6 + len);
buf.put_u16_be(m.id);
buf.put_u32_be(len as u32);
buf.put(bytes);
Ok::<OwnedMessage, Error>(OwnedMessage::Binary(buf.freeze().to_vec()))
});
let client_sink =
client_sink
.buffer(10)
.with(|m: ControlPacket<Clientbound>| {
let m = RawControlPacket::from(m);
let bytes = &m.bytes;
let len = bytes.len();
let mut buf = BytesMut::with_capacity(6 + len);
buf.put_u16_be(m.id);
buf.put_u32_be(len as u32);
buf.put(bytes);
Ok::<OwnedMessage, Error>(OwnedMessage::Binary(
buf.freeze().to_vec(),
))
});
let client_stream = client_stream
.from_err()
.take_while(|m| Ok(!m.is_close()))
.filter_map(|m| {
match m {
OwnedMessage::Binary(ref b) if b.len() >= 6 => {
let id = BigEndian::read_u16(b);
// b[2..6] is length which is implicit in websocket msgs
let bytes = b[6..].into();
Some(MumbleFrame { id, bytes })
}
_ => None,
.filter_map(|m| match m {
OwnedMessage::Binary(ref b) if b.len() >= 6 => {
let id = BigEndian::read_u16(b);
// b[2..6] is length which is implicit in websocket msgs
let bytes = b[6..].into();
RawControlPacket { id, bytes }.try_into().ok()
}
_ => None,
});
let server = MumbleCodec::new().framed(server);
let server = ClientControlCodec::new().framed(server);
let (server_sink, server_stream) = server.split();
let server_sink = server_sink.sink_from_err();
let server_stream = server_stream.from_err();
@ -142,48 +147,3 @@ fn main() {
});
core.run(f).unwrap();
}
macro_rules! define_packet_mappings {
( $id:expr, $head:ident ) => {
#[allow(dead_code)]
const $head: u16 = $id;
};
( $id:expr, $head:ident, $( $tail:ident ),* ) => {
#[allow(dead_code)]
const $head: u16 = $id;
define_packet_mappings!($id + 1, $($tail),*);
};
}
define_packet_mappings![
0,
MSG_VERSION,
MSG_UDP_TUNNEL,
MSG_AUTHENTICATE,
MSG_PING,
MSG_REJECT,
MSG_SERVER_SYNC,
MSG_CHANNEL_REMOVE,
MSG_CHANNEL_STATE,
MSG_USER_REMOVE,
MSG_USER_STATE,
MSG_BAN_LIST,
MSG_TEXT_MESSAGE,
MSG_PERMISSION_DENIED,
MSG_ACL,
MSG_QUERY_USERS,
MSG_CRYPT_SETUP,
MSG_CONTEXT_ACTION_MODIFY,
MSG_CONTEXT_ACTION,
MSG_USER_LIST,
MSG_VOICE_TARGET,
MSG_PERMISSION_QUERY,
MSG_CODEC_VERSION,
MSG_USER_STATS,
MSG_REQUEST_BLOB,
MSG_SERVER_CONFIG,
MSG_SUGGEST_CONFIG,
MSG_WEBRTC,
MSG_ICE_CANDIDATE,
MSG_TALKING_STATE
];

View file

@ -1,103 +0,0 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::io::Cursor;
use tokio::io;
use tokio_codec::{Decoder, Encoder};
#[derive(Clone, Debug)]
pub struct MumbleFrame {
pub id: u16,
pub bytes: Bytes,
}
pub struct MumbleCodec;
impl MumbleCodec {
pub fn new() -> Self {
Self {}
}
}
impl Decoder for MumbleCodec {
type Item = MumbleFrame;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<MumbleFrame>, io::Error> {
let buf_len = buf.len();
if buf_len >= 6 {
let mut buf = Cursor::new(buf);
let id = buf.get_u16_be();
let len = buf.get_u32_be() as usize;
if buf_len >= 6 + len {
let mut bytes = buf.into_inner().split_to(6 + len);
bytes.advance(6);
let bytes = bytes.freeze();
Ok(Some(MumbleFrame { id, bytes }))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
}
impl Encoder for MumbleCodec {
type Item = MumbleFrame;
type Error = io::Error;
fn encode(&mut self, item: MumbleFrame, dst: &mut BytesMut) -> Result<(), io::Error> {
let id = item.id;
let bytes = &item.bytes;
let len = bytes.len();
dst.reserve(6 + len);
dst.put_u16_be(id);
dst.put_u32_be(len as u32);
dst.put(bytes);
Ok(())
}
}
macro_rules! define_packet_mappings {
( $id:expr, $head:ident ) => {
#[allow(dead_code)]
pub const $head: u16 = $id;
};
( $id:expr, $head:ident, $( $tail:ident ),* ) => {
#[allow(dead_code)]
pub const $head: u16 = $id;
define_packet_mappings!($id + 1, $($tail),*);
};
}
define_packet_mappings![
0,
MSG_VERSION,
MSG_UDP_TUNNEL,
MSG_AUTHENTICATE,
MSG_PING,
MSG_REJECT,
MSG_SERVER_SYNC,
MSG_CHANNEL_REMOVE,
MSG_CHANNEL_STATE,
MSG_USER_REMOVE,
MSG_USER_STATE,
MSG_BAN_LIST,
MSG_TEXT_MESSAGE,
MSG_PERMISSION_DENIED,
MSG_ACL,
MSG_QUERY_USERS,
MSG_CRYPT_SETUP,
MSG_CONTEXT_ACTION_MODIFY,
MSG_CONTEXT_ACTION,
MSG_USER_LIST,
MSG_VOICE_TARGET,
MSG_PERMISSION_QUERY,
MSG_CODEC_VERSION,
MSG_USER_STATS,
MSG_REQUEST_BLOB,
MSG_SERVER_CONFIG,
MSG_SUGGEST_CONFIG,
MSG_WEBRTC,
MSG_ICE_CANDIDATE,
MSG_TALKING_STATE
];

View file

@ -1 +0,0 @@
/Mumble.rs

View file

@ -1,90 +1,6 @@
use futures::Stream;
use tokio::prelude::*;
pub fn read_varint(buf: &[u8]) -> Option<(u64, &[u8])> {
let (b0, buf) = buf.split_first()?;
let result = if (b0 & 0x80) == 0x00 {
(u64::from(b0 & 0x7F), buf)
} else {
let (b1, buf) = buf.split_first()?;
if (b0 & 0xC0) == 0x80 {
(u64::from(b0 & 0x3F) << 8 | u64::from(*b1), buf)
} else {
let (b2, buf) = buf.split_first()?;
if (b0 & 0xF0) == 0xF0 {
match b0 & 0xFC {
0xF0 => {
let (b3, buf) = buf.split_first()?;
let (b4, buf) = buf.split_first()?;
(
u64::from(*b1) << 24
| u64::from(*b2) << 16
| u64::from(*b3) << 8
| u64::from(*b4),
buf,
)
}
0xF4 => {
let (b3, buf) = buf.split_first()?;
let (b4, buf) = buf.split_first()?;
let (b5, buf) = buf.split_first()?;
let (b6, buf) = buf.split_first()?;
let (b7, buf) = buf.split_first()?;
let (b8, buf) = buf.split_first()?;
(
u64::from(*b1) << 56
| u64::from(*b2) << 48
| u64::from(*b3) << 40
| u64::from(*b4) << 32
| u64::from(*b5) << 24
| u64::from(*b6) << 16
| u64::from(*b7) << 8
| u64::from(*b8),
buf,
)
}
0xF8 => {
let (val, buf) = read_varint(buf)?;
(!val, buf)
}
0xFC => (!u64::from(b0 & 0x03), buf),
_ => {
return None;
}
}
} else if (b0 & 0xF0) == 0xE0 {
let (b3, buf) = buf.split_first()?;
(
u64::from(b0 & 0x0F) << 24
| u64::from(*b1) << 16
| u64::from(*b2) << 8
| u64::from(*b3),
buf,
)
} else if (b0 & 0xE0) == 0xC0 {
(
u64::from(b0 & 0x1F) << 16 | u64::from(*b1) << 8 | u64::from(*b2),
buf,
)
} else {
return None;
}
}
};
Some(result)
}
pub fn write_varint32<T: Write>(buf: &mut T, value: u32) -> std::io::Result<()> {
// FIXME: actually implement the variable part
buf.write_all(&[
240,
(value >> 24) as u8,
(value >> 16) as u8,
(value >> 8) as u8,
value as u8,
])
}
/// Like `futures::future::Either` but for Streams
pub enum EitherS<A, B> {
A(A),