module msgpackrpc.client; version(WithRPC) { import core.atomic; import std.array; import std.concurrency : receiveOnly; import std.exception; import std.experimental.logger; import std.traits; import std.typecons; import msgpack; import vibe.core.concurrency; import vibe.core.core; import vibe.core.net; import vibe.core.task; import msgpackrpc.protocol; /** * MessagePack RPC client */ class Client { private: TCPConnection m_connection; shared int m_nextRequestId = 0; /// Map of sent request, mapping the requestId to a future. Tid[int] m_requests; public: this(TCPConnection connection) { this.m_connection = connection; runTask(&receiveLoop); } /** * Notifies the server, not waiting for a response. Although it waits for the request to be sent * * params: * method = The name of the method to be notified * arguments = The arguments to pass to the server */ void notify(Args...)(string method, Args arguments) { tracef("Notify '%s'", method); auto pack = packer(Appender!(ubyte[])()); pack.beginArray(3).pack(MessageType.notify, method, pack.packArray(arguments)); sendData(pack.stream.data); } /** * Calls a method on the other side, awaiting a response. * * The difference between this and notify(string, Args) is that call expects a result from the * other endpoint and will actually wait for it. */ R call(R, Args...)(string method, Args arguments) { R res = sendRequest!R(method, arguments).getResult(); return res; } /** * Calls a method on the other endpoint, but will not wait for a result. Instead, a Future!R is * returned. */ Future!R callAsync(R, Args...)(string method, Args arguments) { return sendRequest!R(method, arguments); } private: void receiveLoop() { StreamingUnpacker unpacker = StreamingUnpacker([]); while(m_connection.connected) { m_connection.waitForData(); ubyte[] buf = new ubyte[m_connection.leastSize]; m_connection.read(buf); unpacker.feed(buf); foreach(ref message; unpacker) { enforce!ProtocolException(message.length == 3 || message.length == 4, "Protocol error: incoming message size mismatch"); immutable uint messageType = message[0].as!uint; switch(messageType) { case MessageType.response: enforce!ProtocolException(message.length == 4, "Protocol error: Response must be of size 4"); immutable error = message[2]; immutable result = message[3]; onResponse(message[1].as!int, error, result); break; default: tracef("Received unhandled messageType %s", cast(MessageType) messageType); break; } } } } void onResponse(int id, immutable Value error, immutable Value result) { if (id in m_requests) { tracef("Reply for %d; Error: %s; Result: %s", id, error.type, result.type); send(m_requests[id], error, result); m_requests.remove(id); } else { warningf("No task for id %d", id); } } void sendData(T)(T data) { m_connection.write(data); m_connection.flush(); } /** * Sends a request and returns a future containing the result. */ Future!R sendRequest(R, Args...)(string method, Args arguments) { tracef("Call '%s'", method); auto pack = packer(Appender!(ubyte[])()); immutable int requestId = atomicFetchAdd(m_nextRequestId, 1); pack.beginArray(4) .pack(MessageType.request, requestId, method) .packArray(arguments); sendData(pack.stream.data); return async(delegate R() { m_requests[requestId] = Task.getThis().tid; // error result Tuple!(immutable Value, immutable Value) reply = receiveOnly!(immutable Value, immutable Value); if (reply[0].type != Value.Type.nil) { throw new RPCException(reply[0]); } Value result = reply[1]; return result.as!R; }); } } } // version(WithRPC)