140 lines
3.7 KiB
D
140 lines
3.7 KiB
D
module msgpackrpc.client;
|
|
|
|
version(WithRPC) {
|
|
import core.atomic;
|
|
|
|
import std.array;
|
|
import std.concurrency : receiveOnly;
|
|
import std.exception;
|
|
import std.experimental.logger;
|
|
import std.traits;
|
|
import std.typecons;
|
|
|
|
import msgpack;
|
|
|
|
import vibe.core.concurrency;
|
|
import vibe.core.core;
|
|
import vibe.core.net;
|
|
import vibe.core.task;
|
|
|
|
import msgpackrpc.protocol;
|
|
|
|
/**
|
|
* MessagePack RPC client
|
|
*/
|
|
class Client {
|
|
private:
|
|
TCPConnection m_connection;
|
|
shared int m_nextRequestId = 0;
|
|
|
|
/// Map of sent request, mapping the requestId to a future.
|
|
Tid[int] m_requests;
|
|
public:
|
|
this(TCPConnection connection) {
|
|
this.m_connection = connection;
|
|
runTask(&receiveLoop);
|
|
}
|
|
|
|
/**
|
|
* Notifies the server, not waiting for a response. Although it waits for the request to be sent
|
|
*
|
|
* params:
|
|
* method = The name of the method to be notified
|
|
* arguments = The arguments to pass to the server
|
|
*/
|
|
void notify(Args...)(string method, Args arguments) {
|
|
tracef("Notify '%s'", method);
|
|
auto pack = packer(Appender!(ubyte[])());
|
|
pack.beginArray(3).pack(MessageType.notify, method, pack.packArray(arguments));
|
|
sendData(pack.stream.data);
|
|
}
|
|
|
|
/**
|
|
* Calls a method on the other side, awaiting a response.
|
|
*
|
|
* The difference between this and notify(string, Args) is that call expects a result from the
|
|
* other endpoint and will actually wait for it.
|
|
*/
|
|
R call(R, Args...)(string method, Args arguments) {
|
|
R res = sendRequest!R(method, arguments).getResult();
|
|
return res;
|
|
}
|
|
|
|
/**
|
|
* Calls a method on the other endpoint, but will not wait for a result. Instead, a Future!R is
|
|
* returned.
|
|
*/
|
|
Future!R callAsync(R, Args...)(string method, Args arguments) {
|
|
return sendRequest!R(method, arguments);
|
|
}
|
|
|
|
private:
|
|
void receiveLoop() {
|
|
StreamingUnpacker unpacker = StreamingUnpacker([]);
|
|
while(m_connection.connected) {
|
|
m_connection.waitForData();
|
|
ubyte[] buf = new ubyte[m_connection.leastSize];
|
|
m_connection.read(buf);
|
|
unpacker.feed(buf);
|
|
|
|
foreach(ref message; unpacker) {
|
|
enforce!ProtocolException(message.length == 3 || message.length == 4, "Protocol error: incoming message size mismatch");
|
|
immutable uint messageType = message[0].as!uint;
|
|
switch(messageType) {
|
|
case MessageType.response:
|
|
enforce!ProtocolException(message.length == 4, "Protocol error: Response must be of size 4");
|
|
immutable error = message[2];
|
|
immutable result = message[3];
|
|
onResponse(message[1].as!int, error, result);
|
|
break;
|
|
default:
|
|
tracef("Received unhandled messageType %s", cast(MessageType) messageType);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void onResponse(int id, immutable Value error, immutable Value result) {
|
|
if (id in m_requests) {
|
|
tracef("Reply for %d; Error: %s; Result: %s", id, error.type, result.type);
|
|
send(m_requests[id], error, result);
|
|
m_requests.remove(id);
|
|
} else {
|
|
warningf("No task for id %d", id);
|
|
}
|
|
}
|
|
|
|
void sendData(T)(T data) {
|
|
m_connection.write(data);
|
|
m_connection.flush();
|
|
}
|
|
|
|
/**
|
|
* Sends a request and returns a future containing the result.
|
|
*/
|
|
Future!R sendRequest(R, Args...)(string method, Args arguments) {
|
|
tracef("Call '%s'", method);
|
|
auto pack = packer(Appender!(ubyte[])());
|
|
immutable int requestId = atomicFetchAdd(m_nextRequestId, 1);
|
|
pack.beginArray(4)
|
|
.pack(MessageType.request, requestId, method)
|
|
.packArray(arguments);
|
|
|
|
sendData(pack.stream.data);
|
|
|
|
return async(delegate R() {
|
|
m_requests[requestId] = Task.getThis().tid;
|
|
// error result
|
|
Tuple!(immutable Value, immutable Value) reply = receiveOnly!(immutable Value, immutable Value);
|
|
if (reply[0].type != Value.Type.nil) {
|
|
throw new RPCException(reply[0]);
|
|
}
|
|
Value result = reply[1];
|
|
return result.as!R;
|
|
});
|
|
}
|
|
}
|
|
|
|
} // version(WithRPC)
|