diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index fe0d6dd..dd90f71 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -16,12 +16,12 @@ cancellation function returns, or afterwards. */ module eventcore.driver; -@safe: /*@nogc:*/ nothrow: import core.time : Duration; import std.process : StdProcessConfig = Config; import std.socket : Address; import std.stdint : intptr_t; +import std.typecons : Tuple; import std.variant : Algebraic; @@ -71,14 +71,13 @@ interface EventDriver { /** Provides generic event loop control. */ interface EventDriverCore { -@safe: /*@nogc:*/ nothrow: /** The number of pending callbacks. When this number drops to zero, the event loop can safely be quit. It is guaranteed that no callbacks will be made anymore, unless new callbacks get registered. */ - size_t waiterCount(); + size_t waiterCount() @safe nothrow; /** Runs the event loop to process a chunk of events. @@ -92,7 +91,7 @@ interface EventDriverCore { duration of `Duration.max`, if necessary, will wait indefinitely until an event arrives. */ - ExitReason processEvents(Duration timeout); + ExitReason processEvents(Duration timeout) @safe nothrow; /** Causes `processEvents` to return with `ExitReason.exited` as soon as possible. @@ -101,7 +100,7 @@ interface EventDriverCore { so that it returns immediately. If no call is in progress, the next call to `processEvents` will immediately return with `ExitReason.exited`. */ - void exit(); + void exit() @safe nothrow; /** Resets the exit flag. @@ -110,26 +109,28 @@ interface EventDriverCore { `processEvents`, the next call to `processEvents` will return with `ExitCode.exited` immediately. This function can be used to avoid this. */ - void clearExitFlag(); + void clearExitFlag() @safe nothrow; /** Executes a callback in the thread owning the driver. */ - void runInOwnerThread(ThreadCallback2 fun, intptr_t param1, intptr_t param2) shared; + void runInOwnerThread(ThreadCallbackGen fun, ref ThreadCallbackGenParams params) shared @safe nothrow; /// ditto - final void runInOwnerThread(ThreadCallback1 fun, intptr_t param1) - shared { - runInOwnerThread((p1, p2) { - auto f = () @trusted { return cast(ThreadCallback1)p2; } (); - f(p1); - }, param1, cast(intptr_t)fun); - } - /// ditto - final void runInOwnerThread(ThreadCallback0 fun) - shared { - runInOwnerThread((p1, p2) { - auto f = () @trusted { return cast(ThreadCallback0)p2; } (); - f(); - }, 0, cast(intptr_t)fun); + final void runInOwnerThread(ARGS...)(void function(ARGS) @safe nothrow fun, ARGS args) shared + if (ARGS.length != 1 || !is(ARGS[0] == ThreadCallbackGenParams)) + { + alias F = void function(ARGS) @safe nothrow; + alias T = Tuple!ARGS; + static assert(F.sizeof + T.sizeof <= ThreadCallbackGenParams.length, + "Parameters take up too much space."); + + ThreadCallbackGenParams params; + () @trusted { (cast(F[])params[0 .. F.sizeof])[0] = fun; } (); + (cast(T[])params[F.sizeof .. F.sizeof + T.sizeof])[0] = T(args); + runInOwnerThread((ref ThreadCallbackGenParams p) { + auto f = () @trusted { return cast(F[])p[0 .. F.sizeof]; } ()[0]; + auto pt = () @trusted { return cast(T[])p[F.sizeof .. F.sizeof + T.sizeof]; } (); + f(pt[0].expand); + }, params); } /// Low-level user data access. Use `getUserData` instead. @@ -141,7 +142,7 @@ interface EventDriverCore { */ deprecated("Use `EventDriverSockets.userData` instead.") @property final ref T userData(T, FD)(FD descriptor) - @trusted { + @trusted nothrow { import std.conv : emplace; static void init(void* ptr) { emplace(cast(T*)ptr); } static void destr(void* ptr) { destroy(*cast(T*)ptr); } @@ -460,12 +461,11 @@ interface EventDriverFiles { /** Disallows any reads/writes and removes any exclusive locks. - Note that this function may not actually close the file handle. The - handle is only guaranteed to be closed one the reference count drops - to zero. However, the remaining effects of calling this function will - be similar to actually closing the file. + Note that the file handle may become invalid at any point after the + call to `close`, regardless of its current reference count. Any + operations on the handle will not have an effect. */ - void close(FileFD file); + void close(FileFD file, FileCloseCallback on_closed); ulong getSize(FileFD file); @@ -882,8 +882,12 @@ interface EventDriverPipes { void waitForData(PipeFD pipe, PipeIOCallback on_data_available); /** Immediately close the pipe. Future read or write operations may fail. + + Note that the file handle may become invalid at any point after the + call to `close`, regardless of its current reference count. Any + operations on the handle will not have an effect. */ - void close(PipeFD pipe); + void close(PipeFD file, PipeCloseCallback on_closed); /** Determines whether the given pipe handle is valid. @@ -954,6 +958,7 @@ final class RefAddress : Address { } } +@safe: /*@nogc:*/ nothrow: alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope RefAddress remote_address); @@ -961,7 +966,9 @@ alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress); alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]); alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); +alias FileCloseCallback = void delegate(FileFD, CloseStatus); alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t); +alias PipeCloseCallback = void delegate(PipeFD, CloseStatus); alias EventCallback = void delegate(EventID); alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); alias TimerCallback = void delegate(TimerID); @@ -984,6 +991,12 @@ enum ExitReason { exited } +enum CloseStatus { + ok, + ioError, + invalidHandle, +} + enum ConnectStatus { connected, refused, @@ -1150,10 +1163,9 @@ mixin template Handle(string NAME, T, T invalid_value = T.init) { alias value this; } -alias ThreadCallback0 = void function() @safe nothrow; -alias ThreadCallback1 = void function(intptr_t param1) @safe nothrow; -alias ThreadCallback2 = void function(intptr_t param1, intptr_t param2) @safe nothrow; -alias ThreadCallback = ThreadCallback1; +alias ThreadCallbackGenParams = ubyte[8 * intptr_t.sizeof]; +alias ThreadCallbackGen = void function(ref ThreadCallbackGenParams param3) @safe nothrow; +deprecated alias ThreadCallback = void function(intptr_t param1) @safe nothrow; struct FD { mixin Handle!("fd", size_t, size_t.max); } struct SocketFD { mixin Handle!("socket", FD); } diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index c5e542e..20c04e1 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -160,7 +160,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime protected alias ExtraEventsCallback = bool delegate(long); - private alias ThreadCallbackEntry = Tuple!(ThreadCallback2, intptr_t, intptr_t); + private alias ThreadCallbackEntry = Tuple!(ThreadCallbackGen, ThreadCallbackGenParams); private { Loop m_loop; @@ -262,7 +262,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime m_exit = false; } - final override void runInOwnerThread(ThreadCallback2 del, intptr_t param1, intptr_t param2) + final override void runInOwnerThread(ThreadCallbackGen del, + ref ThreadCallbackGenParams params) shared { auto m = atomicLoad(m_threadCallbackMutex); auto evt = atomicLoad(m_wakeupEvent); @@ -275,7 +276,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime try { synchronized (m) () @trusted { return (cast()this).m_threadCallbacks; } () - .put(ThreadCallbackEntry(del, param1, param2)); + .put(ThreadCallbackEntry(del, params)); } catch (Exception e) assert(false, e.msg); m_events.trigger(m_wakeupEvent, false); @@ -311,7 +312,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime del = m_threadCallbacks.consumeOne; } } catch (Exception e) assert(false, e.msg); - del[0](del[1], del[2]); + del[0](del[1]); } } } diff --git a/source/eventcore/drivers/posix/pipes.d b/source/eventcore/drivers/posix/pipes.d index 1e4cef5..af6710c 100644 --- a/source/eventcore/drivers/posix/pipes.d +++ b/source/eventcore/drivers/posix/pipes.d @@ -10,7 +10,7 @@ import std.algorithm : min, max; final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { @safe: /*@nogc:*/ nothrow: - import core.stdc.errno : errno, EAGAIN; + import core.stdc.errno : errno, EAGAIN, EINTR; import core.sys.posix.unistd : close, read, write; import core.sys.posix.fcntl; import core.sys.posix.poll; @@ -304,12 +304,21 @@ final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { return false; } - final override void close(PipeFD pipe) + final override void close(PipeFD pipe, PipeCloseCallback on_closed) { - if (!isValid(pipe)) return; + if (!isValid(pipe)) { + on_closed(pipe, CloseStatus.invalidHandle); + return; + } - // TODO: Maybe actually close here instead of waiting for releaseRef? - close(cast(int)pipe); + int res; + do res = close(cast(int)pipe); + while (res != 0 && errno == EINTR); + m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status); + m_loop.clearFD!PipeSlot(pipe); + + if (on_closed) + on_closed(pipe, res == 0 ? CloseStatus.ok : CloseStatus.ioError); } override bool isValid(PipeFD handle) @@ -337,10 +346,7 @@ final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD."); if (--slot.common.refCount == 0) { - m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status); - m_loop.clearFD!PipeSlot(pipe); - - close(cast(int)pipe); + close(pipe, null); return false; } return true; @@ -386,8 +392,13 @@ final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { assert(false, "TODO!"); } - override void close(PipeFD pipe) + override void close(PipeFD pipe, PipeCloseCallback on_closed) { + if (!isValid(pipe)) { + on_closed(pipe, CloseStatus.invalidHandle); + return; + } + assert(false, "TODO!"); } diff --git a/source/eventcore/drivers/posix/processes.d b/source/eventcore/drivers/posix/processes.d index 93f3593..0456939 100644 --- a/source/eventcore/drivers/posix/processes.d +++ b/source/eventcore/drivers/posix/processes.d @@ -206,7 +206,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid); } - private static void onLocalProcessExit(intptr_t system_pid) + private static void onLocalProcessExit(int system_pid) { int exitCode; ProcessWaitCallback[] callbacks; @@ -214,12 +214,12 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces ProcessID pid; PosixEventDriverProcesses driver; - lockedProcessInfoPlain(cast(int)system_pid, (info) { + lockedProcessInfoPlain(system_pid, (info) { assert(info !is null); exitCode = info.exitCode; callbacks = info.callbacks; - pid = ProcessID(cast(int)system_pid, info.validationCounter); + pid = ProcessID(system_pid, info.validationCounter); info.callbacks = null; driver = info.driver; diff --git a/source/eventcore/drivers/threadedfile.d b/source/eventcore/drivers/threadedfile.d index 1dfc57d..9ea9857 100644 --- a/source/eventcore/drivers/threadedfile.d +++ b/source/eventcore/drivers/threadedfile.d @@ -102,7 +102,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil static struct FileInfo { IOInfo read; IOInfo write; - bool open = true; uint validationCounter; @@ -177,15 +176,22 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil return FileFD(system_file_handle, vc + 1); } - void close(FileFD file) + void close(FileFD file, FileCloseCallback on_closed) { - if (!isValid(file)) return; + if (!isValid(file)) { + on_closed(file, CloseStatus.invalidHandle); + return; + } - // NOTE: The file descriptor itself must stay open until the reference - // count drops to zero, or this would result in dangling handles. - // In case of an exclusive file lock, the lock should be lifted - // here. - m_files[file].open = false; + // TODO: close may block and should be executed in a worker thread + int res; + do res = .close(cast(int)file.value); + while (res != 0 && errno == EINTR); + + m_files[file.value] = FileInfo.init; + + if (on_closed) + on_closed(file, res == 0 ? CloseStatus.ok : CloseStatus.ioError); } ulong getSize(FileFD file) @@ -230,11 +236,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil //assert(this.writable); auto f = () @trusted { return &m_files[file]; } (); - if (!f.open) { - on_write_finish(file, IOStatus.disconnected, 0); - return; - } - if (!safeCAS(f.write.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) assert(false, "Concurrent file writes are not allowed."); assert(f.write.callback is null, "Concurrent file writes are not allowed."); @@ -276,11 +277,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil auto f = () @trusted { return &m_files[file]; } (); - if (!f.open) { - on_read_finish(file, IOStatus.disconnected, 0); - return; - } - if (!safeCAS(f.read.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) assert(false, "Concurrent file reads are not allowed."); assert(f.read.callback is null, "Concurrent file reads are not allowed."); @@ -332,8 +328,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil auto f = () @trusted { return &m_files[descriptor]; } (); if (!--f.refCount) { - .close(cast(int)descriptor); - *f = FileInfo.init; + close(descriptor, null); assert(!m_activeReads.contains(descriptor.value)); assert(!m_activeWrites.contains(descriptor.value)); return false; diff --git a/source/eventcore/drivers/winapi/core.d b/source/eventcore/drivers/winapi/core.d index 4227082..26c3f45 100644 --- a/source/eventcore/drivers/winapi/core.d +++ b/source/eventcore/drivers/winapi/core.d @@ -16,7 +16,7 @@ import std.typecons : Tuple, tuple; final class WinAPIEventDriverCore : EventDriverCore { @safe: /*@nogc:*/ nothrow: - private alias ThreadCallbackEntry = Tuple!(ThreadCallback2, intptr_t, intptr_t); + private alias ThreadCallbackEntry = Tuple!(ThreadCallbackGen, ThreadCallbackGenParams); private { bool m_exit; @@ -141,7 +141,8 @@ final class WinAPIEventDriverCore : EventDriverCore { m_exit = false; } - override void runInOwnerThread(ThreadCallback2 del, intptr_t param1, intptr_t param2) + override void runInOwnerThread(ThreadCallbackGen del, + ref ThreadCallbackGenParams params) shared { import core.atomic : atomicLoad; @@ -155,7 +156,7 @@ final class WinAPIEventDriverCore : EventDriverCore { try { synchronized (m) () @trusted { return (cast()this).m_threadCallbacks; } () - .put(ThreadCallbackEntry(del, param1, param2)); + .put(ThreadCallbackEntry(del, params)); } catch (Exception e) assert(false, e.msg); () @trusted { PostThreadMessageW(m_tid, WM_APP, 0, 0); } (); @@ -279,8 +280,6 @@ final class WinAPIEventDriverCore : EventDriverCore { private void executeThreadCallbacks() { - import std.stdint : intptr_t; - while (true) { ThreadCallbackEntry del; try { @@ -289,7 +288,7 @@ final class WinAPIEventDriverCore : EventDriverCore { del = m_threadCallbacks.consumeOne; } } catch (Exception e) assert(false, e.msg); - del[0](del[1], del[2]); + del[0](del[1]); } } } @@ -360,6 +359,7 @@ package struct FileSlot { } Direction!false read; Direction!true write; + FileCloseCallback closeCallback; } package struct WatcherSlot { diff --git a/source/eventcore/drivers/winapi/files.d b/source/eventcore/drivers/winapi/files.d index ab8c21c..0f35380 100644 --- a/source/eventcore/drivers/winapi/files.d +++ b/source/eventcore/drivers/winapi/files.d @@ -4,6 +4,7 @@ version (Windows): import eventcore.driver; import eventcore.drivers.winapi.core; +import eventcore.internal.ioworker; import eventcore.internal.win32; private extern(Windows) @trusted nothrow @nogc { @@ -14,6 +15,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles { @safe /*@nogc*/ nothrow: private { WinAPIEventDriverCore m_core; + IOWorkerPool m_workerPool; } this(WinAPIEventDriverCore core) @@ -71,15 +73,41 @@ final class WinAPIEventDriverFiles : EventDriverFiles { return FileFD(cast(size_t)handle, m_core.m_handles[handle].validationCounter); } - override void close(FileFD file) + override void close(FileFD file, FileCloseCallback on_closed) { - if (!isValid(file)) return; + static void doCloseCleanup(CloseParams p) + { + p.core.removeWaiter(); + if (p.callback) p.callback(p.file, p.status); + } + + static void doClose(FileFD file, FileCloseCallback on_closed, + shared(WinAPIEventDriverCore) core) + { + CloseParams p; + CloseStatus st; + p.file = file; + p.callback = on_closed; + p.core = () @trusted { return cast(WinAPIEventDriverCore)core; } (); + if (CloseHandle(idToHandle(file))) p.status = CloseStatus.ok; + else p.status = CloseStatus.ioError; + + () @trusted { core.runInOwnerThread(&doCloseCleanup, p); } (); + } + + if (!isValid(file)) { + on_closed(file, CloseStatus.invalidHandle); + return; + } auto h = idToHandle(file); auto slot = () @trusted { return &m_core.m_handles[h]; } (); - if (slot.validationCounter != file.validationCounter) return; if (slot.file.read.overlapped.hEvent != INVALID_HANDLE_VALUE) slot.file.read.overlapped.hEvent = slot.file.write.overlapped.hEvent = INVALID_HANDLE_VALUE; + m_core.addWaiter(); + m_core.discardEvents(&slot.file.read.overlapped, &slot.file.write.overlapped); + m_core.freeSlot(h); + workerPool.run!doClose(file, on_closed, () @trusted { return cast(shared)m_core; } ()); } override ulong getSize(FileFD file) @@ -213,11 +241,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles { auto h = idToHandle(descriptor); auto slot = &m_core.m_handles[h]; - return slot.releaseRef({ - CloseHandle(h); - m_core.discardEvents(&slot.file.read.overlapped, &slot.file.write.overlapped); - m_core.freeSlot(h); - }); + return slot.releaseRef({ close(descriptor, null); }); } protected override void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @@ -291,8 +315,22 @@ final class WinAPIEventDriverFiles : EventDriverFiles { } } + private @property ref IOWorkerPool workerPool() + { + if (!m_workerPool) + m_workerPool = acquireIOWorkerPool(); + return m_workerPool; + } + private static HANDLE idToHandle(FileFD id) @trusted @nogc { return cast(HANDLE)cast(size_t)id; } } + +private static struct CloseParams { + FileFD file; + FileCloseCallback callback; + CloseStatus status; + WinAPIEventDriverCore core; +} diff --git a/source/eventcore/drivers/winapi/pipes.d b/source/eventcore/drivers/winapi/pipes.d index 07fc43b..74bb6c5 100644 --- a/source/eventcore/drivers/winapi/pipes.d +++ b/source/eventcore/drivers/winapi/pipes.d @@ -53,9 +53,13 @@ final class WinAPIEventDriverPipes : EventDriverPipes { assert(false, "TODO!"); } - override void close(PipeFD pipe) + override void close(PipeFD pipe, PipeCloseCallback on_closed) { - if (!isValid(pipe)) return; + if (!isValid(pipe)) { + if (on_closed) + on_closed(pipe, CloseStatus.invalidHandle); + return; + } assert(false, "TODO!"); } diff --git a/source/eventcore/internal/ioworker.d b/source/eventcore/internal/ioworker.d index 0dcfd1a..48a4946 100644 --- a/source/eventcore/internal/ioworker.d +++ b/source/eventcore/internal/ioworker.d @@ -4,7 +4,7 @@ module eventcore.internal.ioworker; import eventcore.internal.utils; -import std.parallelism : TaskPool; +import std.parallelism : TaskPool, Task, task; IOWorkerPool acquireIOWorkerPool() @@ -28,6 +28,14 @@ struct IOWorkerPool { @property TaskPool pool() { return m_pool; } alias pool this; + + auto run(alias fun, ARGS...)(ARGS args) + { + auto t = task!(fun, ARGS)(args); + try m_pool.put(t); + catch (Exception e) assert(false, e.msg); + return t; + } } // Maintains a single thread pool shared by all driver instances (threads) diff --git a/tests/0-file.d b/tests/0-file.d index 58b33f5..3a20391 100644 --- a/tests/0-file.d +++ b/tests/0-file.d @@ -36,13 +36,15 @@ void main() assert(status == IOStatus.ok); assert(nbytes == data.length - 5); assert(dst == data); - eventDriver.files.close(f); - () @trusted { - scope (failure) assert(false); - remove("test.txt"); - } (); - eventDriver.files.releaseRef(f); - s_done = true; + eventDriver.files.close(f, (f, s) { + assert(s == CloseStatus.ok); + () @trusted { + scope (failure) assert(false); + remove("test.txt"); + } (); + eventDriver.files.releaseRef(f); + s_done = true; + }); }); }); }); diff --git a/tests/0-runinownerthread.d b/tests/0-runinownerthread.d index bb9208e..157a732 100644 --- a/tests/0-runinownerthread.d +++ b/tests/0-runinownerthread.d @@ -31,7 +31,7 @@ void main() void threadFunc(shared(NativeEventDriver) drv) { - drv.core.runInOwnerThread((id) { + drv.core.runInOwnerThread((int id) { s_id = id; }, 42); }