Reimplement based on systemd's inhibit system
This commit is contained in:
parent
ceb06abce6
commit
6db58d07cb
10 changed files with 385 additions and 178 deletions
134
ober/source/msgpackrpc/client.d
Normal file
134
ober/source/msgpackrpc/client.d
Normal file
|
@ -0,0 +1,134 @@
|
|||
import core.atomic;
|
||||
|
||||
import std.array;
|
||||
import std.concurrency : receiveOnly;
|
||||
import std.exception;
|
||||
import std.experimental.logger;
|
||||
import std.traits;
|
||||
import std.typecons;
|
||||
|
||||
import msgpack;
|
||||
|
||||
import vibe.core.concurrency;
|
||||
import vibe.core.core;
|
||||
import vibe.core.net;
|
||||
import vibe.core.task;
|
||||
|
||||
import protocol;
|
||||
|
||||
/**
|
||||
* MessagePack RPC client
|
||||
*/
|
||||
class Client {
|
||||
private:
|
||||
TCPConnection m_connection;
|
||||
shared int m_nextRequestId = 0;
|
||||
|
||||
/// Map of sent request, mapping the requestId to a future.
|
||||
Tid[int] m_requests;
|
||||
public:
|
||||
this(TCPConnection connection) {
|
||||
this.m_connection = connection;
|
||||
runTask(&receiveLoop);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notifies the server, not waiting for a response. Although it waits for the request to be sent
|
||||
*
|
||||
* params:
|
||||
* method = The name of the method to be notified
|
||||
* arguments = The arguments to pass to the server
|
||||
*/
|
||||
void notify(Args...)(string method, Args arguments) {
|
||||
tracef("Notify '%s'", method);
|
||||
auto pack = packer(Appender!(ubyte[])());
|
||||
pack.beginArray(3).pack(MessageType.notify, method, pack.packArray(arguments));
|
||||
sendData(pack.stream.data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls a method on the other side, awaiting a response.
|
||||
*
|
||||
* The difference between this and notify(string, Args) is that call expects a result from the
|
||||
* other endpoint and will actually wait for it.
|
||||
*/
|
||||
R call(R, Args...)(string method, Args arguments) {
|
||||
R res = sendRequest!R(method, arguments).getResult();
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls a method on the other endpoint, but will not wait for a result. Instead, a Future!R is
|
||||
* returned.
|
||||
*/
|
||||
Future!R callAsync(R, Args...)(string method, Args arguments) {
|
||||
return sendRequest!R(method, arguments);
|
||||
}
|
||||
|
||||
private:
|
||||
void receiveLoop() {
|
||||
StreamingUnpacker unpacker = StreamingUnpacker([]);
|
||||
while(m_connection.connected) {
|
||||
m_connection.waitForData();
|
||||
ubyte[] buf = new ubyte[m_connection.leastSize];
|
||||
m_connection.read(buf);
|
||||
unpacker.feed(buf);
|
||||
|
||||
foreach(ref message; unpacker) {
|
||||
enforce!ProtocolException(message.length == 3 || message.length == 4, "Protocol error: incoming message size mismatch");
|
||||
immutable uint messageType = message[0].as!uint;
|
||||
switch(messageType) {
|
||||
case MessageType.response:
|
||||
enforce!ProtocolException(message.length == 4, "Protocol error: Response must be of size 4");
|
||||
immutable error = message[2];
|
||||
immutable result = message[3];
|
||||
onResponse(message[1].as!int, error, result);
|
||||
break;
|
||||
default:
|
||||
tracef("Received unhandled messageType %s", cast(MessageType) messageType);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void onResponse(int id, immutable Value error, immutable Value result) {
|
||||
if (id in m_requests) {
|
||||
tracef("Reply for %d; Error: %s; Result: %s", id, error.type, result.type);
|
||||
send(m_requests[id], error, result);
|
||||
m_requests.remove(id);
|
||||
} else {
|
||||
warningf("No task for id %d", id);
|
||||
}
|
||||
}
|
||||
|
||||
void sendData(T)(T data) {
|
||||
m_connection.write(data);
|
||||
m_connection.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a request and returns a future containing the result.
|
||||
*/
|
||||
Future!R sendRequest(R, Args...)(string method, Args arguments) {
|
||||
tracef("Call '%s'", method);
|
||||
auto pack = packer(Appender!(ubyte[])());
|
||||
immutable int requestId = atomicFetchAdd(m_nextRequestId, 1);
|
||||
pack.beginArray(4)
|
||||
.pack(MessageType.request, requestId, method)
|
||||
.packArray(arguments);
|
||||
|
||||
sendData(pack.stream.data);
|
||||
|
||||
return async(delegate R() {
|
||||
m_requests[requestId] = Task.getThis().tid;
|
||||
// error result
|
||||
Tuple!(immutable Value, immutable Value) reply = receiveOnly!(immutable Value, immutable Value);
|
||||
if (reply[0].type != Value.Type.nil) {
|
||||
throw new RPCException(reply[0]);
|
||||
}
|
||||
Value result = reply[1];
|
||||
return result.as!R;
|
||||
});
|
||||
}
|
||||
}
|
3
ober/source/msgpackrpc/package.d
Normal file
3
ober/source/msgpackrpc/package.d
Normal file
|
@ -0,0 +1,3 @@
|
|||
module msgpackrpc;
|
||||
public import client;
|
||||
public import server;
|
32
ober/source/msgpackrpc/protocol.d
Normal file
32
ober/source/msgpackrpc/protocol.d
Normal file
|
@ -0,0 +1,32 @@
|
|||
import std.exception;
|
||||
|
||||
import msgpack;
|
||||
|
||||
enum MessageType {
|
||||
request = 0,
|
||||
response = 1,
|
||||
notify = 2
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown when the wire protocol could not be parsed.
|
||||
*/
|
||||
class ProtocolException : Exception {
|
||||
mixin basicExceptionCtors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown when the other endpoint reports an error.
|
||||
*/
|
||||
class RPCException : Exception {
|
||||
private:
|
||||
Value m_value;
|
||||
public:
|
||||
this(Value value, string file = __FILE__, int line = __LINE__, Throwable next = null) {
|
||||
super(value.as!string, file, line, next);
|
||||
this.m_value = value;
|
||||
}
|
||||
|
||||
@property Value value() { return m_value; }
|
||||
|
||||
}
|
0
ober/source/msgpackrpc/server.d
Normal file
0
ober/source/msgpackrpc/server.d
Normal file
Loading…
Add table
Add a link
Reference in a new issue