From beafbe162f350d9a02f512da3983be00bb411ae6 Mon Sep 17 00:00:00 2001 From: Jonas Herzig Date: Thu, 27 Sep 2018 16:57:41 +0200 Subject: [PATCH] Move mumble client and audio encoding/resampling into a web worker --- app/index.js | 21 +-- app/voice.js | 22 +-- app/worker-client.js | 371 +++++++++++++++++++++++++++++++++++++++++++ app/worker.js | 284 +++++++++++++++++++++++++++++++++ package.json | 3 +- webpack.config.js | 2 +- 6 files changed, 667 insertions(+), 36 deletions(-) create mode 100644 app/worker-client.js create mode 100644 app/worker.js diff --git a/app/index.js b/app/index.js index a04ce1b..278516b 100644 --- a/app/index.js +++ b/app/index.js @@ -2,11 +2,9 @@ import 'stream-browserify' // see https://github.com/ericgundrum/pouch-websocket import url from 'url' import ByteBuffer from 'bytebuffer' import MumbleClient from 'mumble-client' -import mumbleConnect from 'mumble-client-websocket' -import CodecsBrowser from 'mumble-client-codecs-browser' +import WorkerBasedMumbleConnector from './worker-client' import BufferQueueNode from 'web-audio-buffer-queue' import audioContext from 'audio-context' -import Resampler from 'libsamplerate.js' import ko from 'knockout' import _dompurify from 'dompurify' import keyboardjs from 'keyboardjs' @@ -263,6 +261,7 @@ class Settings { class GlobalBindings { constructor () { this.settings = new Settings() + this.connector = new WorkerBasedMumbleConnector(audioContext.sampleRate) this.client = null this.userContextMenu = new ContextMenu() this.channelContextMenu = new ContextMenu() @@ -335,10 +334,9 @@ class GlobalBindings { log('Connecting to server ', host) // TODO: token - mumbleConnect(`wss://${host}:${port}`, { + this.connector.connect(`wss://${host}:${port}`, { username: username, - password: password, - codecs: CodecsBrowser + password: password }).done(client => { log('Connected!') @@ -560,13 +558,6 @@ class GlobalBindings { }) userNode.connect(audioContext.destination) - var resampler = new Resampler({ - unsafe: true, - type: Resampler.Type.ZERO_ORDER_HOLD, - ratio: audioContext.sampleRate / 48000 - }) - resampler.pipe(userNode) - stream.on('data', data => { if (data.target === 'normal') { ui.talking('on') @@ -575,11 +566,11 @@ class GlobalBindings { } else if (data.target === 'whisper') { ui.talking('whisper') } - resampler.write(Buffer.from(data.pcm.buffer)) + userNode.write(data.buffer) }).on('end', () => { console.log(`User ${user.username} stopped takling`) ui.talking('off') - resampler.end() + userNode.end() }) }) } diff --git a/app/voice.js b/app/voice.js index b3bff80..ff538e3 100644 --- a/app/voice.js +++ b/app/voice.js @@ -1,8 +1,6 @@ -import { Writable, Transform } from 'stream' +import { Writable } from 'stream' import MicrophoneStream from 'microphone-stream' import audioContext from 'audio-context' -import chunker from 'stream-chunker' -import Resampler from 'libsamplerate.js' import getUserMedia from 'getusermedia' import keyboardjs from 'keyboardjs' import vad from 'voice-activity-detection' @@ -34,23 +32,9 @@ class VoiceHandler extends Writable { this.emit('started_talking') return this._outbound } - this._outbound = new Resampler({ - unsafe: true, - type: Resampler.Type.SINC_FASTEST, - ratio: 48000 / audioContext.sampleRate - }) - const buffer2Float32Array = new Transform({ - transform (data, _, callback) { - callback(null, new Float32Array(data.buffer, data.byteOffset, data.byteLength / 4)) - }, - readableObjectMode: true - }) - - this._outbound - .pipe(chunker(4 * this._settings.samplesPerPacket)) - .pipe(buffer2Float32Array) - .pipe(this._client.createVoiceStream()) + // Note: the samplesPerPacket argument is handled in worker.js and not passed on + this._outbound = this._client.createVoiceStream(this._settings.samplesPerPacket) this.emit('started_talking') } diff --git a/app/worker-client.js b/app/worker-client.js new file mode 100644 index 0000000..3a4ac2b --- /dev/null +++ b/app/worker-client.js @@ -0,0 +1,371 @@ +import MumbleClient from 'mumble-client' +import Promise from 'promise' +import EventEmitter from 'events' +import { Writable, PassThrough } from 'stream' +import toArrayBuffer from 'to-arraybuffer' +import ByteBuffer from 'bytebuffer' +import webworkify from 'webworkify' +import worker from './worker' + +/** + * Creates proxy MumbleClients to a real ones running on a web worker. + * Only stuff which we need in mumble-web is proxied, i.e. this is not a generic solution. + */ +class WorkerBasedMumbleConnector { + constructor (sampleRate) { + this._worker = webworkify(worker) + this._worker.addEventListener('message', this._onMessage.bind(this)) + this._reqId = 1 + this._requests = {} + this._clients = {} + this._nextVoiceId = 1 + this._voiceStreams = {} + + this._postMessage({ + method: '_init', + sampleRate: sampleRate + }) + } + + _postMessage (msg, transfer) { + try { + this._worker.postMessage(msg, transfer) + } catch (err) { + console.error('Failed to postMessage', msg) + throw err + } + } + + _call (id, method, payload, transfer) { + let reqId = this._reqId++ + console.debug(method, id, payload) + this._postMessage({ + clientId: id.client, + channelId: id.channel, + userId: id.user, + method: method, + reqId: reqId, + payload: payload + }, transfer) + return reqId + } + + _query (id, method, payload, transfer) { + let reqId = this._call(id, method, payload, transfer) + return new Promise((resolve, reject) => { + this._requests[reqId] = [resolve, reject] + }) + } + + _addCall (proxy, name, id) { + let self = this + proxy[name] = function () { + self._call(id, name, Array.from(arguments)) + } + } + + connect (host, args) { + return this._query({}, '_connect', { host: host, args: args }) + .then(id => this._client(id)) + } + + _client (id) { + let client = this._clients[id] + if (!client) { + client = new WorkerBasedMumbleClient(this, id) + this._clients[id] = client + } + return client + } + + _onMessage (ev) { + let data = ev.data + if (data.reqId != null) { + console.debug(data) + let { reqId, result, error } = data + let [ resolve, reject ] = this._requests[reqId] + delete this._requests[reqId] + if (result) { + resolve(result) + } else { + reject(error) + } + } else if (data.clientId != null) { + console.debug(data) + let client = this._client(data.clientId) + + let target + if (data.userId != null) { + target = client._user(data.userId) + } else if (data.channelId != null) { + target = client._channel(data.channelId) + } else { + target = client + } + + if (data.event) { + target._dispatchEvent(data.event, data.value) + } else if (data.prop) { + target._setProp(data.prop, data.value) + } + } else if (data.voiceId != null) { + let stream = this._voiceStreams[data.voiceId] + let buffer = data.buffer + if (buffer) { + stream.write({ + target: data.target, + buffer: Buffer.from(buffer) + }) + } else { + delete this._voiceStreams[data.voiceId] + stream.end() + } + } + } +} + +class WorkerBasedMumbleClient extends EventEmitter { + constructor (connector, clientId) { + super() + this._connector = connector + this._id = clientId + this._users = {} + this._channels = {} + + let id = { client: clientId } + connector._addCall(this, 'setSelfDeaf', id) + connector._addCall(this, 'setSelfMute', id) + connector._addCall(this, 'setSelfTexture', id) + connector._addCall(this, 'setAudioQuality', id) + + connector._addCall(this, 'disconnect', id) + let _disconnect = this.disconnect + this.disconnect = () => { + _disconnect.apply(this) + delete connector._clients[id] + } + + connector._addCall(this, 'createVoiceStream', id) + let _createVoiceStream = this.createVoiceStream + this.createVoiceStream = function () { + let voiceId = connector._nextVoiceId++ + + let args = Array.from(arguments) + args.unshift(voiceId) + _createVoiceStream.apply(this, args) + + return new Writable({ + write (chunk, encoding, callback) { + chunk = toArrayBuffer(chunk) + connector._postMessage({ + voiceId: voiceId, + chunk: chunk + }) + callback() + }, + final (callback) { + connector._postMessage({ + voiceId: voiceId + }) + callback() + } + }) + } + + // Dummy client used for bandwidth calculations + this._dummyClient = new MumbleClient({ username: 'dummy' }) + let defineDummyMethod = (name) => { + this[name] = function () { + return this._dummyClient[name].apply(this._dummyClient, arguments) + } + } + defineDummyMethod('getMaxBitrate') + defineDummyMethod('getActualBitrate') + } + + _user (id) { + let user = this._users[id] + if (!user) { + user = new WorkerBasedMumbleUser(this._connector, this, id) + this._users[id] = user + } + return user + } + + _channel (id) { + let channel = this._channels[id] + if (!channel) { + channel = new WorkerBasedMumbleChannel(this._connector, this, id) + this._channels[id] = channel + } + return channel + } + + _dispatchEvent (name, args) { + if (name === 'newChannel') { + args[0] = this._channel(args[0]) + } else if (name === 'newUser') { + args[0] = this._user(args[0]) + } else if (name === 'message') { + args[0] = this._user(args[0]) + args[2] = args[2].map((id) => this._user(id)) + args[3] = args[3].map((id) => this._channel(id)) + args[4] = args[4].map((id) => this._channel(id)) + } + args.unshift(name) + this.emit.apply(this, args) + } + + _setProp (name, value) { + if (name === 'root') { + name = '_rootId' + } + if (name === 'self') { + name = '_selfId' + } + if (name === 'maxBandwidth') { + this._dummyClient.maxBandwidth = value + } + this[name] = value + } + + get root () { + return this._channel(this._rootId) + } + + get channels () { + return Object.values(this._channels) + } + + get users () { + return Object.values(this._users) + } + + get self () { + return this._user(this._selfId) + } +} + +class WorkerBasedMumbleChannel extends EventEmitter { + constructor (connector, client, channelId) { + super() + this._connector = connector + this._client = client + this._id = channelId + + let id = { client: client._id, channel: channelId } + connector._addCall(this, 'sendMessage', id) + } + + _dispatchEvent (name, args) { + if (name === 'update') { + let [actor, props] = args + Object.entries(props).forEach((entry) => { + this._setProp(entry[0], entry[1]) + }) + if (props.parent != null) { + props.parent = this.parent + } + if (props.links != null) { + props.links = this.links + } + args = [ + this._client._user(actor), + props + ] + } else if (name === 'remove') { + delete this._client._channels[this._id] + } + args.unshift(name) + this.emit.apply(this, args) + } + + _setProp (name, value) { + if (name === 'parent') { + name = '_parentId' + } + if (name === 'links') { + value = value.map((id) => this._client._channel(id)) + } + this[name] = value + } + + get parent () { + if (this._parentId != null) { + return this._client._channel(this._parentId) + } + } + + get children () { + return Object.values(this._client._channels).filter((it) => it.parent === this) + } +} + +class WorkerBasedMumbleUser extends EventEmitter { + constructor (connector, client, userId) { + super() + this._connector = connector + this._client = client + this._id = userId + + let id = { client: client._id, user: userId } + connector._addCall(this, 'requestTexture', id) + connector._addCall(this, 'clearTexture', id) + connector._addCall(this, 'setMute', id) + connector._addCall(this, 'setDeaf', id) + connector._addCall(this, 'sendMessage', id) + this.setChannel = (channel) => { + connector._call(id, 'setChannel', channel._id) + } + } + + _dispatchEvent (name, args) { + if (name === 'update') { + let [actor, props] = args + Object.entries(props).forEach((entry) => { + this._setProp(entry[0], entry[1]) + }) + if (props.channel != null) { + props.channel = this.channel + } + if (props.texture != null) { + props.texture = this.texture + } + args = [ + this._client._user(actor), + props + ] + } else if (name === 'voice') { + let [id] = args + let stream = new PassThrough({ + objectMode: true + }) + this._connector._voiceStreams[id] = stream + args = [stream] + } else if (name === 'remove') { + delete this._client._users[this._id] + } + args.unshift(name) + this.emit.apply(this, args) + } + + _setProp (name, value) { + if (name === 'channel') { + name = '_channelId' + } + if (name === 'texture') { + if (value) { + let buf = ByteBuffer.wrap(value.buffer) + buf.offset = value.offset + buf.limit = value.limit + value = buf + } + } + this[name] = value + } + + get channel () { + return this._client.channels[this._channelId] + } +} +export default WorkerBasedMumbleConnector diff --git a/app/worker.js b/app/worker.js new file mode 100644 index 0000000..31f8e0f --- /dev/null +++ b/app/worker.js @@ -0,0 +1,284 @@ +import { Transform } from 'stream' +import mumbleConnect from 'mumble-client-websocket' +import toArrayBuffer from 'to-arraybuffer' +import chunker from 'stream-chunker' +import Resampler from 'libsamplerate.js' + +// Monkey-patch to allow webworkify-webpack and codecs to work inside of web worker +/* global URL */ +window.URL = URL + +// Using require to ensure ordering relative to monkey-patch above +let CodecsBrowser = require('mumble-client-codecs-browser') + +export default function (self) { + let sampleRate + let nextClientId = 1 + let nextVoiceId = 1 + let voiceStreams = [] + let clients = [] + + function postMessage (msg, transfer) { + try { + self.postMessage(msg, transfer) + } catch (err) { + console.error('Failed to postMessage', msg) + throw err + } + } + + function resolve (reqId, value, transfer) { + postMessage({ + reqId: reqId, + result: value + }, transfer) + } + + function reject (reqId, value, transfer) { + console.error(value) + let jsonValue = JSON.parse(JSON.stringify(value)) + if (value.$type) { + jsonValue.$type = { name: value.$type.name } + } + postMessage({ + reqId: reqId, + error: jsonValue + }, transfer) + } + + function registerEventProxy (id, obj, event, transform) { + obj.on(event, function (_) { + postMessage({ + clientId: id.client, + channelId: id.channel, + userId: id.user, + event: event, + value: transform ? transform.apply(null, arguments) : Array.from(arguments) + }) + }) + } + + function pushProp (id, obj, prop, transform) { + let value = obj[prop] + postMessage({ + clientId: id.client, + channelId: id.channel, + userId: id.user, + prop: prop, + value: transform ? transform(value) : value + }) + } + + function setupOutboundVoice (voiceId, samplesPerPacket, stream) { + let resampler = new Resampler({ + unsafe: true, + type: Resampler.Type.SINC_FASTEST, + ratio: 48000 / sampleRate + }) + + let buffer2Float32Array = new Transform({ + transform (data, _, callback) { + callback(null, new Float32Array(data.buffer, data.byteOffset, data.byteLength / 4)) + }, + readableObjectMode: true + }) + + resampler + .pipe(chunker(4 * samplesPerPacket)) + .pipe(buffer2Float32Array) + .pipe(stream) + + voiceStreams[voiceId] = resampler + } + + function setupChannel (id, channel) { + id = Object.assign({}, id, { channel: channel.id }) + + registerEventProxy(id, channel, 'update', (actor, props) => { + if (actor) { + actor = actor.id + } + if (props.parent) { + props.parent = props.parent.id + } + if (props.links) { + props.links = props.links.map((it) => it.id) + } + return [actor, props] + }) + registerEventProxy(id, channel, 'remove') + + pushProp(id, channel, 'parent', (it) => it ? it.id : it) + pushProp(id, channel, 'links', (it) => it.map((it) => it.id)) + let props = [ + 'position', 'name', 'description' + ] + for (let prop of props) { + pushProp(id, channel, prop) + } + + for (let child of channel.children) { + setupChannel(id, child) + } + + return channel.id + } + + function setupUser (id, user) { + id = Object.assign({}, id, { user: user.id }) + + registerEventProxy(id, user, 'update', (actor, props) => { + if (actor) { + actor = actor.id + } + if (props.channel != null) { + props.channel = props.channel.id + } + return [actor, props] + }) + registerEventProxy(id, user, 'voice', (stream) => { + let voiceId = nextVoiceId++ + + let target + + // We want to do as little on the UI thread as possible, so do resampling here as well + var resampler = new Resampler({ + unsafe: true, + type: Resampler.Type.ZERO_ORDER_HOLD, + ratio: sampleRate / 48000 + }) + + // Pipe stream into resampler + stream.on('data', (data) => { + // store target so we can pass it on after resampling + target = data.target + resampler.write(Buffer.from(data.pcm.buffer)) + }).on('end', () => { + resampler.end() + }) + + // Pipe resampler into output stream on UI thread + resampler.on('data', (data) => { + data = toArrayBuffer(data) // postMessage can't transfer node's Buffer + postMessage({ + voiceId: voiceId, + target: target, + buffer: data + }, [data]) + }).on('end', () => { + postMessage({ + voiceId: voiceId + }) + }) + + return [voiceId] + }) + registerEventProxy(id, user, 'remove') + + pushProp(id, user, 'channel', (it) => it ? it.id : it) + let props = [ + 'uniqueId', 'username', 'mute', 'deaf', 'suppress', 'selfMute', 'selfDeaf', + 'texture', 'textureHash', 'comment' + ] + for (let prop of props) { + pushProp(id, user, prop) + } + + return user.id + } + + function setupClient (id, client) { + id = { client: id } + + registerEventProxy(id, client, 'error') + registerEventProxy(id, client, 'newChannel', (it) => [setupChannel(id, it)]) + registerEventProxy(id, client, 'newUser', (it) => [setupUser(id, it)]) + registerEventProxy(id, client, 'message', (sender, message, users, channels, trees) => { + return [ + sender.id, + message, + users.map((it) => it.id), + channels.map((it) => it.id), + trees.map((it) => it.id) + ] + }) + client.on('dataPing', () => { + pushProp(id, client, 'dataStats') + }) + + setupChannel(id, client.root) + for (let user of client.users) { + setupUser(id, user) + } + + pushProp(id, client, 'root', (it) => it.id) + pushProp(id, client, 'self', (it) => it.id) + pushProp(id, client, 'welcomeMessage') + pushProp(id, client, 'serverVersion') + pushProp(id, client, 'maxBandwidth') + } + + function onMessage (data) { + let { reqId, method, payload } = data + if (method === '_init') { + sampleRate = data.sampleRate + } else if (method === '_connect') { + payload.args.codecs = CodecsBrowser + mumbleConnect(payload.host, payload.args).then((client) => { + let id = nextClientId++ + clients[id] = client + setupClient(id, client) + return id + }).done((id) => { + resolve(reqId, id) + }, (err) => { + reject(reqId, err) + }) + } else if (data.clientId != null) { + let client = clients[data.clientId] + + let target + if (data.userId != null) { + target = client.getUserById(data.userId) + if (method === 'setChannel') { + payload = [client.getChannelById(payload)] + } + } else if (data.channelId != null) { + target = client.getChannelById(data.channelId) + } else { + target = client + if (method === 'createVoiceStream') { + let voiceId = payload.shift() + let samplesPerPacket = payload.shift() + + let stream = target.createVoiceStream.apply(target, payload) + + setupOutboundVoice(voiceId, samplesPerPacket, stream) + return + } + if (method === 'disconnect') { + delete clients[data.clientId] + } + } + + target[method].apply(target, payload) + } else if (data.voiceId != null) { + let stream = voiceStreams[data.voiceId] + let buffer = data.chunk + if (buffer) { + stream.write(Buffer.from(buffer)) + } else { + delete voiceStreams[data.voiceId] + stream.end() + } + } + } + + self.addEventListener('message', (ev) => { + try { + onMessage(ev.data) + } catch (ex) { + console.error('exception during message event', ev.data, ex) + } + }) +} diff --git a/package.json b/package.json index 11eb677..a216f63 100644 --- a/package.json +++ b/package.json @@ -42,10 +42,11 @@ "regexp-replace-loader": "0.0.1", "sass-loader": "^4.1.1", "stream-chunker": "^1.2.8", + "to-arraybuffer": "^1.0.1", "transform-loader": "^0.2.3", "voice-activity-detection": "johni0702/voice-activity-detection#9f8bd90", "webpack": "^1.13.3", - "webworkify-webpack-dropin": "^1.1.9", + "webworkify-webpack": "^1.1.8", "libsamplerate.js": "^1.0.0", "mumble-client-codecs-browser": "^1.2.0", "mumble-client-websocket": "^1.0.0", diff --git a/webpack.config.js b/webpack.config.js index 74d1154..7404156 100644 --- a/webpack.config.js +++ b/webpack.config.js @@ -89,7 +89,7 @@ module.exports = { }, resolve: { alias: { - webworkify: 'webworkify-webpack-dropin' + webworkify: 'webworkify-webpack' }, root: [ path.resolve('./themes/')