Add more interface functions.

- Cancelling of socket read/write events
- Cancelling of timer waits
- Retrieving the TCP connection state
- Storing custom data together with the event structures
This commit is contained in:
Sönke Ludwig 2016-06-14 07:57:10 +02:00
parent 87487f9e71
commit 5ec6b9a5e5
3 changed files with 115 additions and 2 deletions

View file

@ -54,12 +54,20 @@ interface EventDriver {
StreamSocketFD connectStream(scope Address peer_address, ConnectCallback on_connect); StreamSocketFD connectStream(scope Address peer_address, ConnectCallback on_connect);
StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept);
void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept);
ConnectionState getConnectionState(StreamSocketFD sock);
void setTCPNoDelay(StreamSocketFD socket, bool enable); void setTCPNoDelay(StreamSocketFD socket, bool enable);
void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish); void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish);
void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish); void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish);
void waitSocketData(StreamSocketFD socket, IOCallback on_data_available); void waitSocketData(StreamSocketFD socket, IOCallback on_data_available);
void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true); void shutdownSocket(StreamSocketFD socket, bool shut_read = true, bool shut_write = true);
void cancelRead(StreamSocketFD socket);
void cancelWrite(StreamSocketFD socket);
//
// Files
//
//FileFD openFile(string path, FileOpenMode mode);
//FileFD createTempFile();
// //
// Manual events // Manual events
@ -78,6 +86,7 @@ interface EventDriver {
bool isTimerPending(TimerID timer); bool isTimerPending(TimerID timer);
bool isTimerPeriodic(TimerID timer); bool isTimerPeriodic(TimerID timer);
void waitTimer(TimerID timer, TimerCallback callback); void waitTimer(TimerID timer, TimerCallback callback);
void cancelTimerWait(TimerID timer, TimerCallback callback);
// //
// Resource ownership // Resource ownership
@ -107,6 +116,20 @@ interface EventDriver {
void releaseRef(TimerID descriptor); void releaseRef(TimerID descriptor);
/// ditto /// ditto
void releaseRef(EventID descriptor); void releaseRef(EventID descriptor);
/// Low-level user data access. Use `getUserData` instead.
protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
/** Retrieves a reference to a user-defined value associated with a descriptor.
*/
@property final ref T userData(T, FD)(FD descriptor)
@trusted {
import std.conv : emplace;
static void init(void* ptr) { emplace(cast(T*)ptr); }
static void destr(void* ptr) { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
}
} }
@ -115,6 +138,7 @@ alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD);
alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias EventCallback = void delegate(EventID); alias EventCallback = void delegate(EventID);
alias TimerCallback = void delegate(TimerID); alias TimerCallback = void delegate(TimerID);
@system alias DataInitializer = void function(void*);
enum ExitReason { enum ExitReason {
timeout, timeout,
@ -131,6 +155,15 @@ enum ConnectStatus {
unknownError unknownError
} }
enum ConnectionState {
initialized,
connecting,
connected,
passiveClose,
activeClose,
closed
}
enum IOMode { enum IOMode {
immediate, /// Process only as much as possible without waiting immediate, /// Process only as much as possible without waiting
once, /// Process as much as possible with a single call once, /// Process as much as possible with a single call
@ -141,6 +174,7 @@ enum IOMode {
enum IOStatus { enum IOStatus {
ok, /// The data has been transferred normally ok, /// The data has been transferred normally
disconnected, /// The connection was closed before all data could be transterred disconnected, /// The connection was closed before all data could be transterred
cancelled, /// The operation was cancelled manually
error, /// An error occured while transferring the data error, /// An error occured while transferring the data
wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable
} }

View file

@ -161,6 +161,7 @@ abstract class PosixEventDriver : EventDriver {
final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept) final override StreamListenSocketFD listenStream(scope Address address, AcceptCallback on_accept)
{ {
log("Listen stream");
auto sock = cast(StreamListenSocketFD)createSocket(address.addressFamily); auto sock = cast(StreamListenSocketFD)createSocket(address.addressFamily);
void invalidateSocket() @nogc @trusted nothrow { closeSocket(sock); sock = StreamSocketFD.invalid; } void invalidateSocket() @nogc @trusted nothrow { closeSocket(sock); sock = StreamSocketFD.invalid; }
@ -169,12 +170,15 @@ abstract class PosixEventDriver : EventDriver {
int tmp_reuse = 1; int tmp_reuse = 1;
// FIXME: error handling! // FIXME: error handling!
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) { if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) {
log("setsockopt failed.");
invalidateSocket(); invalidateSocket();
} else if (bind(sock, address.name, address.nameLen) != 0) { } else if (bind(sock, address.name, address.nameLen) != 0) {
log("bind failed.");
invalidateSocket(); invalidateSocket();
} else if (listen(sock, 128) != 0) { } else if (listen(sock, 128) != 0) {
log("listen failed.");
invalidateSocket(); invalidateSocket();
} else { scope (failure) assert(false); import std.stdio; writeln("Success!"); } } else log("Success!");
} (); } ();
if (on_accept && sock != StreamListenSocketFD.invalid) if (on_accept && sock != StreamListenSocketFD.invalid)
@ -185,6 +189,7 @@ abstract class PosixEventDriver : EventDriver {
final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
{ {
log("wait for conn");
registerFD(sock, EventMask.read); registerFD(sock, EventMask.read);
initFD(sock); initFD(sock);
m_fds[sock].acceptCallback = on_accept; m_fds[sock].acceptCallback = on_accept;
@ -210,6 +215,11 @@ abstract class PosixEventDriver : EventDriver {
} }
} }
ConnectionState getConnectionState(StreamSocketFD sock)
{
assert(false);
}
final override void setTCPNoDelay(StreamSocketFD socket, bool enable) final override void setTCPNoDelay(StreamSocketFD socket, bool enable)
{ {
int opt = enable; int opt = enable;
@ -218,6 +228,11 @@ abstract class PosixEventDriver : EventDriver {
final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) final override void readSocket(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
{ {
if (buffer.length == 0) {
on_read_finish(socket, IOStatus.ok, 0);
return;
}
sizediff_t ret; sizediff_t ret;
() @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } (); () @trusted { ret = recv(socket, buffer.ptr, buffer.length, 0); } ();
@ -261,6 +276,16 @@ abstract class PosixEventDriver : EventDriver {
setNotifyCallback!(EventType.read)(socket, &onSocketRead); setNotifyCallback!(EventType.read)(socket, &onSocketRead);
} }
override void cancelRead(StreamSocketFD socket)
{
assert(m_fds[socket].readCallback !is null, "Cancelling read when there is no read in progress.");
setNotifyCallback!(EventType.read)(socket, null);
with (m_fds[socket]) {
readBuffer = null;
readCallback(socket, IOStatus.cancelled, bytesRead);
}
}
private void onSocketRead(FD fd) private void onSocketRead(FD fd)
{ {
auto slot = &m_fds[fd]; auto slot = &m_fds[fd];
@ -300,6 +325,11 @@ abstract class PosixEventDriver : EventDriver {
final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) final override void writeSocket(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
{ {
if (buffer.length == 0) {
on_write_finish(socket, IOStatus.ok, 0);
return;
}
sizediff_t ret; sizediff_t ret;
() @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } (); () @trusted { ret = send(socket, buffer.ptr, buffer.length, 0); } ();
@ -342,6 +372,16 @@ abstract class PosixEventDriver : EventDriver {
setNotifyCallback!(EventType.write)(socket, &onSocketWrite); setNotifyCallback!(EventType.write)(socket, &onSocketWrite);
} }
override void cancelWrite(StreamSocketFD socket)
{
assert(m_fds[socket].readCallback !is null, "Cancelling write when there is no read in progress.");
setNotifyCallback!(EventType.write)(socket, null);
with (m_fds[socket]) {
writeBuffer = null;
writeCallback(socket, IOStatus.cancelled, bytesWritten);
}
}
private void onSocketWrite(FD fd) private void onSocketWrite(FD fd)
{ {
auto slot = &m_fds[fd]; auto slot = &m_fds[fd];
@ -530,6 +570,20 @@ abstract class PosixEventDriver : EventDriver {
} }
} }
final override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@system {
FDSlot* fds = &m_fds[descriptor];
assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
"Requesting user data with differing type (destructor).");
assert(size <= FDSlot.userData.length, "Requested user data is too large.");
if (size > FDSlot.userData.length) assert(false);
if (!fds.userDataDestructor) {
initialize(fds.userData.ptr);
fds.userDataDestructor = destroy;
}
return m_fds[descriptor].userData.ptr;
}
/// Registers the FD for general notification reception. /// Registers the FD for general notification reception.
protected abstract void registerFD(FD fd, EventMask mask); protected abstract void registerFD(FD fd, EventMask mask);
@ -555,6 +609,7 @@ abstract class PosixEventDriver : EventDriver {
private void startNotify(EventType evt)(FD fd, FDSlotCallback callback) private void startNotify(EventType evt)(FD fd, FDSlotCallback callback)
{ {
import std.stdio : writefln; try writefln("start notify %s %s", evt, fd); catch(Exception) {}
//assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for."); //assert(m_fds[fd].callback[evt] is null, "Waiting for event which is already being waited for.");
m_fds[fd].callback[evt] = callback; m_fds[fd].callback[evt] = callback;
m_waiterCount++; m_waiterCount++;
@ -563,6 +618,7 @@ abstract class PosixEventDriver : EventDriver {
private void stopNotify(EventType evt)(FD fd) private void stopNotify(EventType evt)(FD fd)
{ {
import std.stdio : writefln; try writefln("stop notify %s %s", evt, fd); catch(Exception) {}
//ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for."); //ssert(m_fds[fd].callback[evt] !is null, "Stopping waiting for event which is not being waited for.");
m_fds[fd].callback[evt] = null; m_fds[fd].callback[evt] = null;
m_waiterCount--; m_waiterCount--;
@ -591,6 +647,8 @@ abstract class PosixEventDriver : EventDriver {
private void clearFD(FD fd) private void clearFD(FD fd)
{ {
if (m_fds[fd].userDataDestructor)
() @trusted { m_fds[fd].userDataDestructor(m_fds[fd].userData.ptr); } ();
m_fds[fd] = FDSlot.init; m_fds[fd] = FDSlot.init;
} }
} }
@ -619,6 +677,10 @@ private struct FDSlot {
ConnectCallback connectCallback; ConnectCallback connectCallback;
AcceptCallback acceptCallback; AcceptCallback acceptCallback;
ConsumableQueue!EventCallback waiters; ConsumableQueue!EventCallback waiters;
DataInitializer userDataDestructor;
ubyte[16*size_t.sizeof] userData;
shared bool triggerAll; shared bool triggerAll;
@property EventMask eventMask() const nothrow { @property EventMask eventMask() const nothrow {
@ -664,6 +726,13 @@ private int getSocketError()
else return errno; else return errno;
} }
void log(ARGS...)(string fmt, ARGS args)
{
import std.stdio;
try writefln(fmt, args);
catch (Exception) {}
}
/*version (Windows) { /*version (Windows) {
import std.c.windows.windows; import std.c.windows.windows;

View file

@ -130,6 +130,16 @@ mixin template DefaultTimerImpl() {
m_timers[timer].callbacks ~= callback; m_timers[timer].callbacks ~= callback;
} }
final override void cancelTimerWait(TimerID timer, TimerCallback callback)
{
import std.algorithm.mutation : remove;
import std.algorithm.searching : countUntil;
auto pt = m_timers[timer];
auto idx = pt.callbacks.countUntil(callback);
if (idx >= 0) pt.callbacks = pt.callbacks.remove(idx);
}
final override void addRef(TimerID descriptor) final override void addRef(TimerID descriptor)
{ {
m_timers[descriptor].refCount++; m_timers[descriptor].refCount++;