Merge pull request #144 from vibe-d/async_fixes

Fix API issues resulting in blocking the main thread
This commit is contained in:
Sönke Ludwig 2020-05-12 10:57:18 +02:00 committed by GitHub
commit bace6c6c80
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 295 additions and 189 deletions

View file

@ -16,12 +16,12 @@
cancellation function returns, or afterwards. cancellation function returns, or afterwards.
*/ */
module eventcore.driver; module eventcore.driver;
@safe: /*@nogc:*/ nothrow:
import core.time : Duration; import core.time : Duration;
import std.process : StdProcessConfig = Config; import std.process : StdProcessConfig = Config;
import std.socket : Address; import std.socket : Address;
import std.stdint : intptr_t; import std.stdint : intptr_t;
import std.typecons : Tuple;
import std.variant : Algebraic; import std.variant : Algebraic;
@ -71,14 +71,13 @@ interface EventDriver {
/** Provides generic event loop control. /** Provides generic event loop control.
*/ */
interface EventDriverCore { interface EventDriverCore {
@safe: /*@nogc:*/ nothrow:
/** The number of pending callbacks. /** The number of pending callbacks.
When this number drops to zero, the event loop can safely be quit. It is When this number drops to zero, the event loop can safely be quit. It is
guaranteed that no callbacks will be made anymore, unless new callbacks guaranteed that no callbacks will be made anymore, unless new callbacks
get registered. get registered.
*/ */
size_t waiterCount(); size_t waiterCount() @safe nothrow;
/** Runs the event loop to process a chunk of events. /** Runs the event loop to process a chunk of events.
@ -92,7 +91,7 @@ interface EventDriverCore {
duration of `Duration.max`, if necessary, will wait indefinitely duration of `Duration.max`, if necessary, will wait indefinitely
until an event arrives. until an event arrives.
*/ */
ExitReason processEvents(Duration timeout); ExitReason processEvents(Duration timeout) @safe nothrow;
/** Causes `processEvents` to return with `ExitReason.exited` as soon as /** Causes `processEvents` to return with `ExitReason.exited` as soon as
possible. possible.
@ -101,7 +100,7 @@ interface EventDriverCore {
so that it returns immediately. If no call is in progress, the next call so that it returns immediately. If no call is in progress, the next call
to `processEvents` will immediately return with `ExitReason.exited`. to `processEvents` will immediately return with `ExitReason.exited`.
*/ */
void exit(); void exit() @safe nothrow;
/** Resets the exit flag. /** Resets the exit flag.
@ -110,26 +109,28 @@ interface EventDriverCore {
`processEvents`, the next call to `processEvents` will return with `processEvents`, the next call to `processEvents` will return with
`ExitCode.exited` immediately. This function can be used to avoid this. `ExitCode.exited` immediately. This function can be used to avoid this.
*/ */
void clearExitFlag(); void clearExitFlag() @safe nothrow;
/** Executes a callback in the thread owning the driver. /** Executes a callback in the thread owning the driver.
*/ */
void runInOwnerThread(ThreadCallback2 fun, intptr_t param1, intptr_t param2) shared; void runInOwnerThread(ThreadCallbackGen fun, ref ThreadCallbackGenParams params) shared @safe nothrow;
/// ditto /// ditto
final void runInOwnerThread(ThreadCallback1 fun, intptr_t param1) final void runInOwnerThread(ARGS...)(void function(ARGS) @safe nothrow fun, ARGS args) shared
shared { if (ARGS.length != 1 || !is(ARGS[0] == ThreadCallbackGenParams))
runInOwnerThread((p1, p2) { {
auto f = () @trusted { return cast(ThreadCallback1)p2; } (); alias F = void function(ARGS) @safe nothrow;
f(p1); alias T = Tuple!ARGS;
}, param1, cast(intptr_t)fun); static assert(F.sizeof + T.sizeof <= ThreadCallbackGenParams.length,
} "Parameters take up too much space.");
/// ditto
final void runInOwnerThread(ThreadCallback0 fun) ThreadCallbackGenParams params;
shared { () @trusted { (cast(F[])params[0 .. F.sizeof])[0] = fun; } ();
runInOwnerThread((p1, p2) { (cast(T[])params[F.sizeof .. F.sizeof + T.sizeof])[0] = T(args);
auto f = () @trusted { return cast(ThreadCallback0)p2; } (); runInOwnerThread((ref ThreadCallbackGenParams p) {
f(); auto f = () @trusted { return cast(F[])p[0 .. F.sizeof]; } ()[0];
}, 0, cast(intptr_t)fun); auto pt = () @trusted { return cast(T[])p[F.sizeof .. F.sizeof + T.sizeof]; } ();
f(pt[0].expand);
}, params);
} }
/// Low-level user data access. Use `getUserData` instead. /// Low-level user data access. Use `getUserData` instead.
@ -141,7 +142,7 @@ interface EventDriverCore {
*/ */
deprecated("Use `EventDriverSockets.userData` instead.") deprecated("Use `EventDriverSockets.userData` instead.")
@property final ref T userData(T, FD)(FD descriptor) @property final ref T userData(T, FD)(FD descriptor)
@trusted { @trusted nothrow {
import std.conv : emplace; import std.conv : emplace;
static void init(void* ptr) { emplace(cast(T*)ptr); } static void init(void* ptr) { emplace(cast(T*)ptr); }
static void destr(void* ptr) { destroy(*cast(T*)ptr); } static void destr(void* ptr) { destroy(*cast(T*)ptr); }
@ -460,12 +461,11 @@ interface EventDriverFiles {
/** Disallows any reads/writes and removes any exclusive locks. /** Disallows any reads/writes and removes any exclusive locks.
Note that this function may not actually close the file handle. The Note that the file handle may become invalid at any point after the
handle is only guaranteed to be closed one the reference count drops call to `close`, regardless of its current reference count. Any
to zero. However, the remaining effects of calling this function will operations on the handle will not have an effect.
be similar to actually closing the file.
*/ */
void close(FileFD file); void close(FileFD file, FileCloseCallback on_closed);
ulong getSize(FileFD file); ulong getSize(FileFD file);
@ -882,8 +882,12 @@ interface EventDriverPipes {
void waitForData(PipeFD pipe, PipeIOCallback on_data_available); void waitForData(PipeFD pipe, PipeIOCallback on_data_available);
/** Immediately close the pipe. Future read or write operations may fail. /** Immediately close the pipe. Future read or write operations may fail.
Note that the file handle may become invalid at any point after the
call to `close`, regardless of its current reference count. Any
operations on the handle will not have an effect.
*/ */
void close(PipeFD pipe); void close(PipeFD file, PipeCloseCallback on_closed);
/** Determines whether the given pipe handle is valid. /** Determines whether the given pipe handle is valid.
@ -954,6 +958,7 @@ final class RefAddress : Address {
} }
} }
@safe: /*@nogc:*/ nothrow:
alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus);
alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope RefAddress remote_address); alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope RefAddress remote_address);
@ -961,7 +966,9 @@ alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress); alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress);
alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]); alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]);
alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); alias FileIOCallback = void delegate(FileFD, IOStatus, size_t);
alias FileCloseCallback = void delegate(FileFD, CloseStatus);
alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t); alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t);
alias PipeCloseCallback = void delegate(PipeFD, CloseStatus);
alias EventCallback = void delegate(EventID); alias EventCallback = void delegate(EventID);
alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); alias SignalCallback = void delegate(SignalListenID, SignalStatus, int);
alias TimerCallback = void delegate(TimerID); alias TimerCallback = void delegate(TimerID);
@ -984,6 +991,12 @@ enum ExitReason {
exited exited
} }
enum CloseStatus {
ok,
ioError,
invalidHandle,
}
enum ConnectStatus { enum ConnectStatus {
connected, connected,
refused, refused,
@ -1093,13 +1106,6 @@ struct FileChange {
/// Name of the changed file /// Name of the changed file
const(char)[] name; const(char)[] name;
/** Determines if the changed entity is a file or a directory.
Note that depending on the platform this may not be accurate for
`FileChangeKind.removed`.
*/
bool isDirectory;
} }
/** Describes a spawned process /** Describes a spawned process
@ -1157,10 +1163,9 @@ mixin template Handle(string NAME, T, T invalid_value = T.init) {
alias value this; alias value this;
} }
alias ThreadCallback0 = void function() @safe nothrow; alias ThreadCallbackGenParams = ubyte[8 * intptr_t.sizeof];
alias ThreadCallback1 = void function(intptr_t param1) @safe nothrow; alias ThreadCallbackGen = void function(ref ThreadCallbackGenParams param3) @safe nothrow;
alias ThreadCallback2 = void function(intptr_t param1, intptr_t param2) @safe nothrow; deprecated alias ThreadCallback = void function(intptr_t param1) @safe nothrow;
alias ThreadCallback = ThreadCallback1;
struct FD { mixin Handle!("fd", size_t, size_t.max); } struct FD { mixin Handle!("fd", size_t, size_t.max); }
struct SocketFD { mixin Handle!("socket", FD); } struct SocketFD { mixin Handle!("socket", FD); }

View file

@ -160,7 +160,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
protected alias ExtraEventsCallback = bool delegate(long); protected alias ExtraEventsCallback = bool delegate(long);
private alias ThreadCallbackEntry = Tuple!(ThreadCallback2, intptr_t, intptr_t); private alias ThreadCallbackEntry = Tuple!(ThreadCallbackGen, ThreadCallbackGenParams);
private { private {
Loop m_loop; Loop m_loop;
@ -262,7 +262,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
m_exit = false; m_exit = false;
} }
final override void runInOwnerThread(ThreadCallback2 del, intptr_t param1, intptr_t param2) final override void runInOwnerThread(ThreadCallbackGen del,
ref ThreadCallbackGenParams params)
shared { shared {
auto m = atomicLoad(m_threadCallbackMutex); auto m = atomicLoad(m_threadCallbackMutex);
auto evt = atomicLoad(m_wakeupEvent); auto evt = atomicLoad(m_wakeupEvent);
@ -275,7 +276,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
try { try {
synchronized (m) synchronized (m)
() @trusted { return (cast()this).m_threadCallbacks; } () () @trusted { return (cast()this).m_threadCallbacks; } ()
.put(ThreadCallbackEntry(del, param1, param2)); .put(ThreadCallbackEntry(del, params));
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
m_events.trigger(m_wakeupEvent, false); m_events.trigger(m_wakeupEvent, false);
@ -311,7 +312,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
del = m_threadCallbacks.consumeOne; del = m_threadCallbacks.consumeOne;
} }
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
del[0](del[1], del[2]); del[0](del[1]);
} }
} }
} }

View file

@ -10,7 +10,7 @@ import std.algorithm : min, max;
final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
import core.stdc.errno : errno, EAGAIN; import core.stdc.errno : errno, EAGAIN, EINTR;
import core.sys.posix.unistd : close, read, write; import core.sys.posix.unistd : close, read, write;
import core.sys.posix.fcntl; import core.sys.posix.fcntl;
import core.sys.posix.poll; import core.sys.posix.poll;
@ -304,12 +304,21 @@ final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
return false; return false;
} }
final override void close(PipeFD pipe) final override void close(PipeFD pipe, PipeCloseCallback on_closed)
{ {
if (!isValid(pipe)) return; if (!isValid(pipe)) {
on_closed(pipe, CloseStatus.invalidHandle);
return;
}
// TODO: Maybe actually close here instead of waiting for releaseRef? int res;
close(cast(int)pipe); do res = close(cast(int)pipe);
while (res != 0 && errno == EINTR);
m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status);
m_loop.clearFD!PipeSlot(pipe);
if (on_closed)
on_closed(pipe, res == 0 ? CloseStatus.ok : CloseStatus.ioError);
} }
override bool isValid(PipeFD handle) override bool isValid(PipeFD handle)
@ -337,10 +346,7 @@ final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD."); nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD.");
if (--slot.common.refCount == 0) { if (--slot.common.refCount == 0) {
m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status); close(pipe, null);
m_loop.clearFD!PipeSlot(pipe);
close(cast(int)pipe);
return false; return false;
} }
return true; return true;
@ -350,7 +356,6 @@ final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
@system { @system {
return m_loop.rawUserDataImpl(fd, size, initialize, destroy); return m_loop.rawUserDataImpl(fd, size, initialize, destroy);
} }
>>>>>>> 568465d... Make the API robust against using invalid handles. Fixes #105.
} }
final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
@ -387,8 +392,13 @@ final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override void close(PipeFD pipe) override void close(PipeFD pipe, PipeCloseCallback on_closed)
{ {
if (!isValid(pipe)) {
on_closed(pipe, CloseStatus.invalidHandle);
return;
}
assert(false, "TODO!"); assert(false, "TODO!");
} }

View file

@ -206,7 +206,7 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid); m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid);
} }
private static void onLocalProcessExit(intptr_t system_pid) private static void onLocalProcessExit(int system_pid)
{ {
int exitCode; int exitCode;
ProcessWaitCallback[] callbacks; ProcessWaitCallback[] callbacks;
@ -214,12 +214,12 @@ final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProces
ProcessID pid; ProcessID pid;
PosixEventDriverProcesses driver; PosixEventDriverProcesses driver;
lockedProcessInfoPlain(cast(int)system_pid, (info) { lockedProcessInfoPlain(system_pid, (info) {
assert(info !is null); assert(info !is null);
exitCode = info.exitCode; exitCode = info.exitCode;
callbacks = info.callbacks; callbacks = info.callbacks;
pid = ProcessID(cast(int)system_pid, info.validationCounter); pid = ProcessID(system_pid, info.validationCounter);
info.callbacks = null; info.callbacks = null;
driver = info.driver; driver = info.driver;

View file

@ -150,7 +150,6 @@ final class InotifyEventDriverWatchers(Events : EventDriverEvents) : EventDriver
ch.baseDirectory = m_watches[id].basePath; ch.baseDirectory = m_watches[id].basePath;
ch.directory = subdir; ch.directory = subdir;
ch.isDirectory = (ev.mask & IN_ISDIR) != 0;
ch.name = name; ch.name = name;
addRef(id); // assure that the id doesn't get invalidated until after the callback addRef(id); // assure that the id doesn't get invalidated until after the callback
auto cb = m_loop.m_fds[id].watcher.callback; auto cb = m_loop.m_fds[id].watcher.callback;
@ -487,9 +486,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
@property size_t entryCount() const { return m_entryCount; } @property size_t entryCount() const { return m_entryCount; }
private void addChange(FileChangeKind kind, Key key, bool is_dir) private void addChange(FileChangeKind kind, Key key)
{ {
m_onChange(FileChange(kind, m_basePath, key.parent ? key.parent.path : "", key.name, is_dir)); m_onChange(FileChange(kind, m_basePath, key.parent ? key.parent.path : "", key.name));
} }
private void scan(bool generate_changes) private void scan(bool generate_changes)
@ -506,12 +505,12 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
foreach (e; m_entries.byKeyValue) { foreach (e; m_entries.byKeyValue) {
if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) { if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) {
if (generate_changes) if (generate_changes)
addChange(FileChangeKind.removed, e.key, e.value.isDir); addChange(FileChangeKind.removed, e.key);
} }
} }
foreach (e; added) foreach (e; added)
addChange(FileChangeKind.added, Key(e.parent, e.name), e.isDir); addChange(FileChangeKind.added, Key(e.parent, e.name));
swap(m_entries, new_entries); swap(m_entries, new_entries);
m_entryCount = ec; m_entryCount = ec;
@ -539,7 +538,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
} else { } else {
if ((*pe).size != de.size || (*pe).lastChange != modified_time) { if ((*pe).size != de.size || (*pe).lastChange != modified_time) {
if (generate_changes) if (generate_changes)
addChange(FileChangeKind.modified, key, (*pe).isDir); addChange(FileChangeKind.modified, key);
(*pe).size = de.size; (*pe).size = de.size;
(*pe).lastChange = modified_time; (*pe).lastChange = modified_time;
} }

View file

@ -1,6 +1,7 @@
module eventcore.drivers.threadedfile; module eventcore.drivers.threadedfile;
import eventcore.driver; import eventcore.driver;
import eventcore.internal.ioworker;
import eventcore.internal.utils; import eventcore.internal.utils;
import core.atomic; import core.atomic;
import core.stdc.errno; import core.stdc.errno;
@ -101,7 +102,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
static struct FileInfo { static struct FileInfo {
IOInfo read; IOInfo read;
IOInfo write; IOInfo write;
bool open = true;
uint validationCounter; uint validationCounter;
@ -110,7 +110,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
ubyte[16*size_t.sizeof] userData; ubyte[16*size_t.sizeof] userData;
} }
TaskPool m_fileThreadPool; IOWorkerPool m_fileThreadPool;
ChoppedVector!FileInfo m_files; // TODO: use the one from the posix loop ChoppedVector!FileInfo m_files; // TODO: use the one from the posix loop
SmallIntegerSet!size_t m_activeReads; SmallIntegerSet!size_t m_activeReads;
SmallIntegerSet!size_t m_activeWrites; SmallIntegerSet!size_t m_activeWrites;
@ -128,10 +128,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
void dispose() void dispose()
{ {
if (m_fileThreadPool) { m_fileThreadPool = IOWorkerPool.init;
StaticTaskPool.releaseRef();
m_fileThreadPool = null;
}
if (m_readyEvent != EventID.invalid) { if (m_readyEvent != EventID.invalid) {
log("finishing file events"); log("finishing file events");
@ -179,15 +176,22 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
return FileFD(system_file_handle, vc + 1); return FileFD(system_file_handle, vc + 1);
} }
void close(FileFD file) void close(FileFD file, FileCloseCallback on_closed)
{ {
if (!isValid(file)) return; if (!isValid(file)) {
on_closed(file, CloseStatus.invalidHandle);
return;
}
// NOTE: The file descriptor itself must stay open until the reference // TODO: close may block and should be executed in a worker thread
// count drops to zero, or this would result in dangling handles. int res;
// In case of an exclusive file lock, the lock should be lifted do res = .close(cast(int)file.value);
// here. while (res != 0 && errno == EINTR);
m_files[file].open = false;
m_files[file.value] = FileInfo.init;
if (on_closed)
on_closed(file, res == 0 ? CloseStatus.ok : CloseStatus.ioError);
} }
ulong getSize(FileFD file) ulong getSize(FileFD file)
@ -232,11 +236,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
//assert(this.writable); //assert(this.writable);
auto f = () @trusted { return &m_files[file]; } (); auto f = () @trusted { return &m_files[file]; } ();
if (!f.open) {
on_write_finish(file, IOStatus.disconnected, 0);
return;
}
if (!safeCAS(f.write.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) if (!safeCAS(f.write.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated))
assert(false, "Concurrent file writes are not allowed."); assert(false, "Concurrent file writes are not allowed.");
assert(f.write.callback is null, "Concurrent file writes are not allowed."); assert(f.write.callback is null, "Concurrent file writes are not allowed.");
@ -278,11 +277,6 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
auto f = () @trusted { return &m_files[file]; } (); auto f = () @trusted { return &m_files[file]; } ();
if (!f.open) {
on_read_finish(file, IOStatus.disconnected, 0);
return;
}
if (!safeCAS(f.read.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) if (!safeCAS(f.read.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated))
assert(false, "Concurrent file reads are not allowed."); assert(false, "Concurrent file reads are not allowed.");
assert(f.read.callback is null, "Concurrent file reads are not allowed."); assert(f.read.callback is null, "Concurrent file reads are not allowed.");
@ -334,8 +328,7 @@ final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFil
auto f = () @trusted { return &m_files[descriptor]; } (); auto f = () @trusted { return &m_files[descriptor]; } ();
if (!--f.refCount) { if (!--f.refCount) {
.close(cast(int)descriptor); close(descriptor, null);
*f = FileInfo.init;
assert(!m_activeReads.contains(descriptor.value)); assert(!m_activeReads.contains(descriptor.value));
assert(!m_activeWrites.contains(descriptor.value)); assert(!m_activeWrites.contains(descriptor.value));
return false; return false;
@ -435,9 +428,9 @@ log("ready event");
log("create file event"); log("create file event");
m_readyEvent = m_events.create(); m_readyEvent = m_events.create();
} }
if (m_fileThreadPool is null) { if (!m_fileThreadPool) {
log("aquire thread pool"); log("aquire thread pool");
m_fileThreadPool = StaticTaskPool.addRef(); m_fileThreadPool = acquireIOWorkerPool();
} }
} }
} }
@ -456,63 +449,3 @@ private void log(ARGS...)(string fmt, ARGS args)
writefln(fmt, args); writefln(fmt, args);
} }
} }
// Maintains a single thread pool shared by all driver instances (threads)
private struct StaticTaskPool {
import core.sync.mutex : Mutex;
import std.parallelism : TaskPool;
private {
static shared Mutex m_mutex;
static __gshared TaskPool m_pool;
static __gshared int m_refCount = 0;
}
shared static this()
{
m_mutex = new shared Mutex;
}
static TaskPool addRef()
@trusted nothrow {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!m_refCount++) {
try {
m_pool = mallocT!TaskPool(4);
m_pool.isDaemon = true;
} catch (Exception e) {
assert(false, e.msg);
}
}
return m_pool;
}
static void releaseRef()
@trusted nothrow {
TaskPool fin_pool;
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!--m_refCount) {
fin_pool = m_pool;
m_pool = null;
}
}
if (fin_pool) {
log("finishing thread pool");
try {
fin_pool.finish(true);
freeT(fin_pool);
} catch (Exception e) {
//log("Failed to shut down file I/O thread pool.");
}
}
}
}

View file

@ -16,7 +16,7 @@ import std.typecons : Tuple, tuple;
final class WinAPIEventDriverCore : EventDriverCore { final class WinAPIEventDriverCore : EventDriverCore {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
private alias ThreadCallbackEntry = Tuple!(ThreadCallback2, intptr_t, intptr_t); private alias ThreadCallbackEntry = Tuple!(ThreadCallbackGen, ThreadCallbackGenParams);
private { private {
bool m_exit; bool m_exit;
@ -141,7 +141,8 @@ final class WinAPIEventDriverCore : EventDriverCore {
m_exit = false; m_exit = false;
} }
override void runInOwnerThread(ThreadCallback2 del, intptr_t param1, intptr_t param2) override void runInOwnerThread(ThreadCallbackGen del,
ref ThreadCallbackGenParams params)
shared { shared {
import core.atomic : atomicLoad; import core.atomic : atomicLoad;
@ -155,7 +156,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
try { try {
synchronized (m) synchronized (m)
() @trusted { return (cast()this).m_threadCallbacks; } () () @trusted { return (cast()this).m_threadCallbacks; } ()
.put(ThreadCallbackEntry(del, param1, param2)); .put(ThreadCallbackEntry(del, params));
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
() @trusted { PostThreadMessageW(m_tid, WM_APP, 0, 0); } (); () @trusted { PostThreadMessageW(m_tid, WM_APP, 0, 0); } ();
@ -279,8 +280,6 @@ final class WinAPIEventDriverCore : EventDriverCore {
private void executeThreadCallbacks() private void executeThreadCallbacks()
{ {
import std.stdint : intptr_t;
while (true) { while (true) {
ThreadCallbackEntry del; ThreadCallbackEntry del;
try { try {
@ -289,7 +288,7 @@ final class WinAPIEventDriverCore : EventDriverCore {
del = m_threadCallbacks.consumeOne; del = m_threadCallbacks.consumeOne;
} }
} catch (Exception e) assert(false, e.msg); } catch (Exception e) assert(false, e.msg);
del[0](del[1], del[2]); del[0](del[1]);
} }
} }
} }
@ -360,6 +359,7 @@ package struct FileSlot {
} }
Direction!false read; Direction!false read;
Direction!true write; Direction!true write;
FileCloseCallback closeCallback;
} }
package struct WatcherSlot { package struct WatcherSlot {

View file

@ -4,6 +4,7 @@ version (Windows):
import eventcore.driver; import eventcore.driver;
import eventcore.drivers.winapi.core; import eventcore.drivers.winapi.core;
import eventcore.internal.ioworker;
import eventcore.internal.win32; import eventcore.internal.win32;
private extern(Windows) @trusted nothrow @nogc { private extern(Windows) @trusted nothrow @nogc {
@ -14,6 +15,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
@safe /*@nogc*/ nothrow: @safe /*@nogc*/ nothrow:
private { private {
WinAPIEventDriverCore m_core; WinAPIEventDriverCore m_core;
IOWorkerPool m_workerPool;
} }
this(WinAPIEventDriverCore core) this(WinAPIEventDriverCore core)
@ -71,15 +73,41 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
return FileFD(cast(size_t)handle, m_core.m_handles[handle].validationCounter); return FileFD(cast(size_t)handle, m_core.m_handles[handle].validationCounter);
} }
override void close(FileFD file) override void close(FileFD file, FileCloseCallback on_closed)
{ {
if (!isValid(file)) return; static void doCloseCleanup(CloseParams p)
{
p.core.removeWaiter();
if (p.callback) p.callback(p.file, p.status);
}
static void doClose(FileFD file, FileCloseCallback on_closed,
shared(WinAPIEventDriverCore) core)
{
CloseParams p;
CloseStatus st;
p.file = file;
p.callback = on_closed;
p.core = () @trusted { return cast(WinAPIEventDriverCore)core; } ();
if (CloseHandle(idToHandle(file))) p.status = CloseStatus.ok;
else p.status = CloseStatus.ioError;
() @trusted { core.runInOwnerThread(&doCloseCleanup, p); } ();
}
if (!isValid(file)) {
on_closed(file, CloseStatus.invalidHandle);
return;
}
auto h = idToHandle(file); auto h = idToHandle(file);
auto slot = () @trusted { return &m_core.m_handles[h]; } (); auto slot = () @trusted { return &m_core.m_handles[h]; } ();
if (slot.validationCounter != file.validationCounter) return;
if (slot.file.read.overlapped.hEvent != INVALID_HANDLE_VALUE) if (slot.file.read.overlapped.hEvent != INVALID_HANDLE_VALUE)
slot.file.read.overlapped.hEvent = slot.file.write.overlapped.hEvent = INVALID_HANDLE_VALUE; slot.file.read.overlapped.hEvent = slot.file.write.overlapped.hEvent = INVALID_HANDLE_VALUE;
m_core.addWaiter();
m_core.discardEvents(&slot.file.read.overlapped, &slot.file.write.overlapped);
m_core.freeSlot(h);
workerPool.run!doClose(file, on_closed, () @trusted { return cast(shared)m_core; } ());
} }
override ulong getSize(FileFD file) override ulong getSize(FileFD file)
@ -213,11 +241,7 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
auto h = idToHandle(descriptor); auto h = idToHandle(descriptor);
auto slot = &m_core.m_handles[h]; auto slot = &m_core.m_handles[h];
return slot.releaseRef({ return slot.releaseRef({ close(descriptor, null); });
CloseHandle(h);
m_core.discardEvents(&slot.file.read.overlapped, &slot.file.write.overlapped);
m_core.freeSlot(h);
});
} }
protected override void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) protected override void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
@ -291,8 +315,22 @@ final class WinAPIEventDriverFiles : EventDriverFiles {
} }
} }
private @property ref IOWorkerPool workerPool()
{
if (!m_workerPool)
m_workerPool = acquireIOWorkerPool();
return m_workerPool;
}
private static HANDLE idToHandle(FileFD id) private static HANDLE idToHandle(FileFD id)
@trusted @nogc { @trusted @nogc {
return cast(HANDLE)cast(size_t)id; return cast(HANDLE)cast(size_t)id;
} }
} }
private static struct CloseParams {
FileFD file;
FileCloseCallback callback;
CloseStatus status;
WinAPIEventDriverCore core;
}

View file

@ -53,9 +53,13 @@ final class WinAPIEventDriverPipes : EventDriverPipes {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override void close(PipeFD pipe) override void close(PipeFD pipe, PipeCloseCallback on_closed)
{ {
if (!isValid(pipe)) return; if (!isValid(pipe)) {
if (on_closed)
on_closed(pipe, CloseStatus.invalidHandle);
return;
}
assert(false, "TODO!"); assert(false, "TODO!");
} }

View file

@ -193,10 +193,7 @@ final class WinAPIEventDriverWatchers : EventDriverWatchers {
ch.directory = dirName(path); ch.directory = dirName(path);
if (ch.directory == ".") ch.directory = ""; if (ch.directory == ".") ch.directory = "";
ch.name = baseName(path); ch.name = baseName(path);
try ch.isDirectory = isDir(fullpath); slot.callback(id, ch);
catch (Exception e) {} // FIXME: can happen if the base path is relative and the CWD has changed
if (ch.kind != FileChangeKind.modified || !ch.isDirectory)
slot.callback(id, ch);
if (fni.NextEntryOffset == 0 || !slot.callback) break; if (fni.NextEntryOffset == 0 || !slot.callback) break;
} }

View file

@ -0,0 +1,97 @@
/** Provides a shared task pool for distributing tasks to worker threads.
*/
module eventcore.internal.ioworker;
import eventcore.internal.utils;
import std.parallelism : TaskPool, Task, task;
IOWorkerPool acquireIOWorkerPool()
@safe nothrow {
return IOWorkerPool(true);
}
struct IOWorkerPool {
private {
TaskPool m_pool;
}
@safe nothrow:
private this(bool) { m_pool = StaticTaskPool.addRef(); }
~this() { if (m_pool) StaticTaskPool.releaseRef(); }
this(this) { if (m_pool) StaticTaskPool.addRef(); }
bool opCast(T)() const if (is(T == bool)) { return !!m_pool; }
@property TaskPool pool() { return m_pool; }
alias pool this;
auto run(alias fun, ARGS...)(ARGS args)
{
auto t = task!(fun, ARGS)(args);
try m_pool.put(t);
catch (Exception e) assert(false, e.msg);
return t;
}
}
// Maintains a single thread pool shared by all driver instances (threads)
private struct StaticTaskPool {
import core.sync.mutex : Mutex;
private {
static shared Mutex m_mutex;
static __gshared TaskPool m_pool;
static __gshared int m_refCount = 0;
}
shared static this()
{
m_mutex = new shared Mutex;
}
static TaskPool addRef()
@trusted nothrow {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!m_refCount++) {
try {
m_pool = mallocT!TaskPool(4);
m_pool.isDaemon = true;
} catch (Exception e) {
assert(false, e.msg);
}
}
return m_pool;
}
static void releaseRef()
@trusted nothrow {
TaskPool fin_pool;
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
if (!--m_refCount) {
fin_pool = m_pool;
m_pool = null;
}
}
if (fin_pool) {
//log("finishing thread pool");
try {
fin_pool.finish(true);
freeT(fin_pool);
} catch (Exception e) {
//log("Failed to shut down file I/O thread pool.");
}
}
}
}

View file

@ -8,7 +8,7 @@ import eventcore.core;
import eventcore.internal.utils : print; import eventcore.internal.utils : print;
import core.thread : Thread; import core.thread : Thread;
import core.time : Duration, MonoTime, msecs; import core.time : Duration, MonoTime, msecs;
import std.file : exists, remove, rename, rmdirRecurse, mkdir; import std.file : exists, remove, rename, rmdirRecurse, mkdir, isDir;
import std.format : format; import std.format : format;
import std.functional : toDelegate; import std.functional : toDelegate;
import std.path : baseName, buildPath, dirName; import std.path : baseName, buildPath, dirName;
@ -97,6 +97,20 @@ void expectChange(FileChange ch, bool expect_change)
assert(!expect_change, format("Got no change, expected %s.", ch)); assert(!expect_change, format("Got no change, expected %s.", ch));
return; return;
} }
// ignore different directory modification notifications on Windows as
// opposed to the other systems
while (pendingChanges.length) {
auto pch = pendingChanges[0];
if (pch.kind == FileChangeKind.modified) {
auto p = buildPath(pch.baseDirectory, pch.directory, pch.name);
if (!exists(p) || isDir(p)) {
pendingChanges = pendingChanges[1 .. $];
continue;
}
}
break;
}
} }
assert(expect_change, "Got change although none was expected."); assert(expect_change, "Got change although none was expected.");
@ -119,43 +133,43 @@ void testFile(string name, bool expect_change = true)
{ {
print("test %s CREATE %s", name, expect_change); print("test %s CREATE %s", name, expect_change);
auto fil = File(buildPath(testDir, name), "wt"); auto fil = File(buildPath(testDir, name), "wt");
expectChange(fchange(FileChangeKind.added, name, false), expect_change); expectChange(fchange(FileChangeKind.added, name), expect_change);
print("test %s MODIFY %s", name, expect_change); print("test %s MODIFY %s", name, expect_change);
fil.write("test"); fil.write("test");
fil.close(); fil.close();
expectChange(fchange(FileChangeKind.modified, name, false), expect_change); expectChange(fchange(FileChangeKind.modified, name), expect_change);
print("test %s DELETE %s", name, expect_change); print("test %s DELETE %s", name, expect_change);
remove(buildPath(testDir, name)); remove(buildPath(testDir, name));
expectChange(fchange(FileChangeKind.removed, name, false), expect_change); expectChange(fchange(FileChangeKind.removed, name), expect_change);
} }
void testCreateDir(string name, bool expect_change = true) void testCreateDir(string name, bool expect_change = true)
{ {
print("test %s CREATEDIR %s", name, expect_change); print("test %s CREATEDIR %s", name, expect_change);
mkdir(buildPath(testDir, name)); mkdir(buildPath(testDir, name));
expectChange(fchange(FileChangeKind.added, name, true), expect_change); expectChange(fchange(FileChangeKind.added, name), expect_change);
} }
void testRemoveDir(string name, bool expect_change = true) void testRemoveDir(string name, bool expect_change = true)
{ {
print("test %s DELETEDIR %s", name, expect_change); print("test %s DELETEDIR %s", name, expect_change);
rmdirRecurse(buildPath(testDir, name)); rmdirRecurse(buildPath(testDir, name));
expectChange(fchange(FileChangeKind.removed, name, true), expect_change); expectChange(fchange(FileChangeKind.removed, name), expect_change);
} }
void testRename(string from, string to, bool expect_change = true) void testRename(string from, string to, bool expect_change = true)
{ {
print("test %s RENAME %s %s", from, to, expect_change); print("test %s RENAME %s %s", from, to, expect_change);
rename(buildPath(testDir, from), buildPath(testDir, to)); rename(buildPath(testDir, from), buildPath(testDir, to));
expectChange(fchange(FileChangeKind.removed, from, true), expect_change); expectChange(fchange(FileChangeKind.removed, from), expect_change);
expectChange(fchange(FileChangeKind.added, to, true), expect_change); expectChange(fchange(FileChangeKind.added, to), expect_change);
} }
FileChange fchange(FileChangeKind kind, string name, bool is_dir) FileChange fchange(FileChangeKind kind, string name)
{ {
auto dn = dirName(name); auto dn = dirName(name);
if (dn == ".") dn = ""; if (dn == ".") dn = "";
return FileChange(kind, testDir, dn, baseName(name), is_dir); return FileChange(kind, testDir, dn, baseName(name));
} }

View file

@ -5,8 +5,9 @@
module test; module test;
import eventcore.core; import eventcore.core;
import std.file : exists, isDir, mkdir, remove, rmdirRecurse;
import std.path : buildPath;
import std.stdio : File, writefln; import std.stdio : File, writefln;
import std.file : exists, mkdir, remove, rmdirRecurse;
import core.time : Duration, msecs; import core.time : Duration, msecs;
bool s_done; bool s_done;
@ -23,6 +24,11 @@ void main()
scope (exit) rmdirRecurse(testDir); scope (exit) rmdirRecurse(testDir);
auto id = eventDriver.watchers.watchDirectory(testDir, false, (id, ref change) { auto id = eventDriver.watchers.watchDirectory(testDir, false, (id, ref change) {
try {
if (change.kind == FileChangeKind.modified && isDir(buildPath(change.baseDirectory, change.directory, change.name)))
return;
} catch (Exception e) assert(false, e.msg);
switch (s_cnt++) { switch (s_cnt++) {
default: default:
import std.conv : to; import std.conv : to;

View file

@ -36,13 +36,15 @@ void main()
assert(status == IOStatus.ok); assert(status == IOStatus.ok);
assert(nbytes == data.length - 5); assert(nbytes == data.length - 5);
assert(dst == data); assert(dst == data);
eventDriver.files.close(f); eventDriver.files.close(f, (f, s) {
() @trusted { assert(s == CloseStatus.ok);
scope (failure) assert(false); () @trusted {
remove("test.txt"); scope (failure) assert(false);
} (); remove("test.txt");
eventDriver.files.releaseRef(f); } ();
s_done = true; eventDriver.files.releaseRef(f);
s_done = true;
});
}); });
}); });
}); });

View file

@ -31,7 +31,7 @@ void main()
void threadFunc(shared(NativeEventDriver) drv) void threadFunc(shared(NativeEventDriver) drv)
{ {
drv.core.runInOwnerThread((id) { drv.core.runInOwnerThread((int id) {
s_id = id; s_id = id;
}, 42); }, 42);
} }