AIUO/ober/source/msgpackrpc/client.d

136 lines
3.7 KiB
D

module msgpackrpc.client;
import core.atomic;
import std.array;
import std.concurrency : receiveOnly;
import std.exception;
import std.experimental.logger;
import std.traits;
import std.typecons;
import msgpack;
import vibe.core.concurrency;
import vibe.core.core;
import vibe.core.net;
import vibe.core.task;
import msgpackrpc.protocol;
/**
* MessagePack RPC client
*/
class Client {
private:
TCPConnection m_connection;
shared int m_nextRequestId = 0;
/// Map of sent request, mapping the requestId to a future.
Tid[int] m_requests;
public:
this(TCPConnection connection) {
this.m_connection = connection;
runTask(&receiveLoop);
}
/**
* Notifies the server, not waiting for a response. Although it waits for the request to be sent
*
* params:
* method = The name of the method to be notified
* arguments = The arguments to pass to the server
*/
void notify(Args...)(string method, Args arguments) {
tracef("Notify '%s'", method);
auto pack = packer(Appender!(ubyte[])());
pack.beginArray(3).pack(MessageType.notify, method, pack.packArray(arguments));
sendData(pack.stream.data);
}
/**
* Calls a method on the other side, awaiting a response.
*
* The difference between this and notify(string, Args) is that call expects a result from the
* other endpoint and will actually wait for it.
*/
R call(R, Args...)(string method, Args arguments) {
R res = sendRequest!R(method, arguments).getResult();
return res;
}
/**
* Calls a method on the other endpoint, but will not wait for a result. Instead, a Future!R is
* returned.
*/
Future!R callAsync(R, Args...)(string method, Args arguments) {
return sendRequest!R(method, arguments);
}
private:
void receiveLoop() {
StreamingUnpacker unpacker = StreamingUnpacker([]);
while(m_connection.connected) {
m_connection.waitForData();
ubyte[] buf = new ubyte[m_connection.leastSize];
m_connection.read(buf);
unpacker.feed(buf);
foreach(ref message; unpacker) {
enforce!ProtocolException(message.length == 3 || message.length == 4, "Protocol error: incoming message size mismatch");
immutable uint messageType = message[0].as!uint;
switch(messageType) {
case MessageType.response:
enforce!ProtocolException(message.length == 4, "Protocol error: Response must be of size 4");
immutable error = message[2];
immutable result = message[3];
onResponse(message[1].as!int, error, result);
break;
default:
tracef("Received unhandled messageType %s", cast(MessageType) messageType);
break;
}
}
}
}
void onResponse(int id, immutable Value error, immutable Value result) {
if (id in m_requests) {
tracef("Reply for %d; Error: %s; Result: %s", id, error.type, result.type);
send(m_requests[id], error, result);
m_requests.remove(id);
} else {
warningf("No task for id %d", id);
}
}
void sendData(T)(T data) {
m_connection.write(data);
m_connection.flush();
}
/**
* Sends a request and returns a future containing the result.
*/
Future!R sendRequest(R, Args...)(string method, Args arguments) {
tracef("Call '%s'", method);
auto pack = packer(Appender!(ubyte[])());
immutable int requestId = atomicFetchAdd(m_nextRequestId, 1);
pack.beginArray(4)
.pack(MessageType.request, requestId, method)
.packArray(arguments);
sendData(pack.stream.data);
return async(delegate R() {
m_requests[requestId] = Task.getThis().tid;
// error result
Tuple!(immutable Value, immutable Value) reply = receiveOnly!(immutable Value, immutable Value);
if (reply[0].type != Value.Type.nil) {
throw new RPCException(reply[0]);
}
Value result = reply[1];
return result.as!R;
});
}
}