Merge pull request #35 from vibe-d/avoid_dot_in_watcher_path
Fix more directory watcher issues, especially on macOS merged-on-behalf-of: l-kramer <l-kramer@users.noreply.github.com>
This commit is contained in:
commit
2e3f145f0f
|
@ -30,6 +30,11 @@ static if (!is(NativeEventDriver == EventDriver)) {
|
||||||
if (!s_driver) s_driver = new NativeEventDriver;
|
if (!s_driver) s_driver = new NativeEventDriver;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ~this()
|
||||||
|
{
|
||||||
|
s_driver.dispose();
|
||||||
|
}
|
||||||
|
|
||||||
shared static this()
|
shared static this()
|
||||||
{
|
{
|
||||||
s_driver = new NativeEventDriver;
|
s_driver = new NativeEventDriver;
|
||||||
|
|
|
@ -89,9 +89,11 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
|
||||||
|
|
||||||
final override void dispose()
|
final override void dispose()
|
||||||
{
|
{
|
||||||
|
if (!m_loop) return;
|
||||||
m_files.dispose();
|
m_files.dispose();
|
||||||
m_dns.dispose();
|
m_dns.dispose();
|
||||||
m_loop.dispose();
|
m_loop.dispose();
|
||||||
|
m_loop = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,13 +5,16 @@ import eventcore.driver;
|
||||||
import eventcore.drivers.posix.driver;
|
import eventcore.drivers.posix.driver;
|
||||||
import eventcore.internal.consumablequeue : ConsumableQueue;
|
import eventcore.internal.consumablequeue : ConsumableQueue;
|
||||||
|
|
||||||
import std.socket : InternetAddress;
|
|
||||||
|
|
||||||
version (linux) {
|
version (linux) {
|
||||||
nothrow @nogc extern (C) int eventfd(uint initval, int flags);
|
nothrow @nogc extern (C) int eventfd(uint initval, int flags);
|
||||||
import core.sys.posix.unistd : close, read, write;
|
|
||||||
enum EFD_NONBLOCK = 0x800;
|
enum EFD_NONBLOCK = 0x800;
|
||||||
}
|
}
|
||||||
|
version (Posix) {
|
||||||
|
import core.sys.posix.unistd : close, read, write;
|
||||||
|
} else {
|
||||||
|
import core.sys.windows.winsock2 : closesocket, AF_INET, SOCKET, SOCK_DGRAM,
|
||||||
|
bind, connect, getsockname, send, socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents {
|
final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents {
|
||||||
|
@ -19,10 +22,13 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
private {
|
private {
|
||||||
Loop m_loop;
|
Loop m_loop;
|
||||||
Sockets m_sockets;
|
Sockets m_sockets;
|
||||||
|
ubyte[ulong.sizeof] m_buf;
|
||||||
version (linux) {}
|
version (linux) {}
|
||||||
else {
|
else {
|
||||||
EventSlot[DatagramSocketFD] m_events;
|
// TODO: avoid the overhead of a mutex backed map here
|
||||||
ubyte[long.sizeof] m_buf;
|
import core.sync.mutex : Mutex;
|
||||||
|
Mutex m_eventsMutex;
|
||||||
|
EventID[DatagramSocketFD] m_events;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,6 +36,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
{
|
{
|
||||||
m_loop = loop;
|
m_loop = loop;
|
||||||
m_sockets = sockets;
|
m_sockets = sockets;
|
||||||
|
version (linux) {}
|
||||||
|
else m_eventsMutex = new Mutex;
|
||||||
}
|
}
|
||||||
|
|
||||||
package @property Loop loop() { return m_loop; }
|
package @property Loop loop() { return m_loop; }
|
||||||
|
@ -49,14 +57,60 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation
|
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation
|
||||||
m_loop.registerFD(id, EventMask.read);
|
m_loop.registerFD(id, EventMask.read);
|
||||||
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
|
m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
|
||||||
|
releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return
|
||||||
|
assert(getRC(id) == 1);
|
||||||
return id;
|
return id;
|
||||||
} else {
|
} else {
|
||||||
auto addr = new InternetAddress(0x7F000001, 0);
|
sock_t[2] fd;
|
||||||
auto s = m_sockets.createDatagramSocketInternal(addr, addr, true);
|
version (Posix) {
|
||||||
if (s == DatagramSocketFD.invalid) return EventID.invalid;
|
// create a pair of sockets to communicate between threads
|
||||||
|
import core.sys.posix.sys.socket : SOCK_DGRAM, AF_UNIX, socketpair;
|
||||||
|
if (() @trusted { return socketpair(AF_UNIX, SOCK_DGRAM, 0, fd); } () != 0)
|
||||||
|
return EventID.invalid;
|
||||||
|
|
||||||
|
assert(fd[0] != fd[1]);
|
||||||
|
|
||||||
|
// use the first socket as the async receiver
|
||||||
|
auto s = m_sockets.adoptDatagramSocketInternal(fd[0]);
|
||||||
|
} else {
|
||||||
|
// fake missing socketpair support on Windows
|
||||||
|
import std.socket : InternetAddress;
|
||||||
|
auto addr = new InternetAddress(0x7F000001, 0);
|
||||||
|
auto s = m_sockets.createDatagramSocketInternal(addr, null, true);
|
||||||
|
if (s == DatagramSocketFD.invalid) return EventID.invalid;
|
||||||
|
fd[0] = cast(sock_t)s;
|
||||||
|
if (!() @trusted {
|
||||||
|
fd[1] = socket(AF_INET, SOCK_DGRAM, 0);
|
||||||
|
int nl = addr.nameLen;
|
||||||
|
import eventcore.internal.utils : print;
|
||||||
|
if (bind(fd[1], addr.name, addr.nameLen) != 0)
|
||||||
|
return false;
|
||||||
|
assert(nl == addr.nameLen);
|
||||||
|
if (getsockname(fd[0], addr.name, &nl) != 0)
|
||||||
|
return false;
|
||||||
|
if (connect(fd[1], addr.name, addr.nameLen) != 0)
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
} ())
|
||||||
|
{
|
||||||
|
m_sockets.releaseRef(s);
|
||||||
|
return EventID.invalid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
||||||
m_events[s] = EventSlot(new ConsumableQueue!EventCallback, false, is_internal); // FIXME: avoid dynamic memory allocation
|
|
||||||
return cast(EventID)s;
|
// use the second socket as the event ID and as the sending end for
|
||||||
|
// other threads
|
||||||
|
auto id = cast(EventID)fd[1];
|
||||||
|
try {
|
||||||
|
synchronized (m_eventsMutex)
|
||||||
|
m_events[s] = id;
|
||||||
|
} catch (Exception e) assert(false, e.msg);
|
||||||
|
m_loop.initFD(id, FDFlags.internal);
|
||||||
|
m_loop.m_fds[id].specific = EventSlot(new ConsumableQueue!EventCallback, false, is_internal, s); // FIXME: avoid dynamic memory allocation
|
||||||
|
assert(getRC(id) == 1);
|
||||||
|
return id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,8 +140,8 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
long one = 1;
|
long one = 1;
|
||||||
//log("emitting for all threads");
|
//log("emitting for all threads");
|
||||||
if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true);
|
if (notify_all) atomicStore(thisus.getSlot(event).triggerAll, true);
|
||||||
version (linux) () @trusted { .write(cast(int)event, &one, one.sizeof); } ();
|
version (Posix) .write(cast(int)event, &one, one.sizeof);
|
||||||
else thisus.m_sockets.send(cast(DatagramSocketFD)event, thisus.m_buf, IOMode.once, null, &thisus.onSocketDataSent);
|
else assert(send(cast(int)event, cast(const(ubyte*))&one, one.sizeof, 0) == one.sizeof);
|
||||||
}
|
}
|
||||||
|
|
||||||
final override void wait(EventID event, EventCallback on_event)
|
final override void wait(EventID event, EventCallback on_event)
|
||||||
|
@ -119,13 +173,15 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
|
|
||||||
version (linux) {}
|
version (linux) {}
|
||||||
else {
|
else {
|
||||||
private void onSocketDataSent(DatagramSocketFD s, IOStatus status, size_t, scope RefAddress)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress)
|
private void onSocketData(DatagramSocketFD s, IOStatus, size_t, scope RefAddress)
|
||||||
{
|
{
|
||||||
onEvent(cast(EventID)s);
|
|
||||||
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
m_sockets.receive(s, m_buf, IOMode.once, &onSocketData);
|
||||||
|
EventID evt;
|
||||||
|
try {
|
||||||
|
synchronized (m_eventsMutex)
|
||||||
|
evt = m_events[s];
|
||||||
|
onEvent(evt);
|
||||||
|
} catch (Exception e) assert(false, e.msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,40 +194,36 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
|
||||||
final override bool releaseRef(EventID descriptor)
|
final override bool releaseRef(EventID descriptor)
|
||||||
{
|
{
|
||||||
assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
|
assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
|
||||||
void destroy() {
|
if (--getRC(descriptor) == 0) {
|
||||||
|
if (!isInternal(descriptor))
|
||||||
|
m_loop.m_waiterCount -= getSlot(descriptor).waiters.length;
|
||||||
() @trusted nothrow {
|
() @trusted nothrow {
|
||||||
scope (failure) assert(false);
|
try .destroy(getSlot(descriptor).waiters);
|
||||||
.destroy(getSlot(descriptor).waiters);
|
catch (Exception e) assert(false, e.msg);
|
||||||
assert(getSlot(descriptor).waiters is null);
|
|
||||||
} ();
|
} ();
|
||||||
}
|
version (linux) {
|
||||||
version (linux) {
|
|
||||||
if (--getRC(descriptor) == 0) {
|
|
||||||
destroy();
|
|
||||||
m_loop.unregisterFD(descriptor, EventMask.read);
|
m_loop.unregisterFD(descriptor, EventMask.read);
|
||||||
m_loop.clearFD(descriptor);
|
} else {
|
||||||
close(cast(int)descriptor);
|
auto rs = getSlot(descriptor).recvSocket;
|
||||||
return false;
|
m_sockets.cancelReceive(rs);
|
||||||
}
|
m_sockets.releaseRef(rs);
|
||||||
} else {
|
try {
|
||||||
if (!m_sockets.releaseRef(cast(DatagramSocketFD)descriptor)) {
|
synchronized (m_eventsMutex)
|
||||||
destroy();
|
m_events.remove(rs);
|
||||||
m_events.remove(cast(DatagramSocketFD)descriptor);
|
} catch (Exception e) assert(false, e.msg);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
m_loop.clearFD(descriptor);
|
||||||
|
version (Posix) close(cast(int)descriptor);
|
||||||
|
else () @trusted { closesocket(cast(SOCKET)descriptor); } ();
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private EventSlot* getSlot(EventID id)
|
private EventSlot* getSlot(EventID id)
|
||||||
{
|
{
|
||||||
version (linux) {
|
assert(id < m_loop.m_fds.length, "Invalid event ID.");
|
||||||
assert(id < m_loop.m_fds.length, "Invalid event ID.");
|
return () @trusted { return &m_loop.m_fds[id].event(); } ();
|
||||||
return () @trusted { return &m_loop.m_fds[id].event(); } ();
|
|
||||||
} else {
|
|
||||||
assert(cast(DatagramSocketFD)id in m_events, "Invalid event ID.");
|
|
||||||
return &m_events[cast(DatagramSocketFD)id];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ref uint getRC(EventID id)
|
private ref uint getRC(EventID id)
|
||||||
|
@ -190,4 +242,8 @@ package struct EventSlot {
|
||||||
ConsumableQueue!EventCallback waiters;
|
ConsumableQueue!EventCallback waiters;
|
||||||
shared bool triggerAll;
|
shared bool triggerAll;
|
||||||
bool isInternal;
|
bool isInternal;
|
||||||
|
version (linux) {}
|
||||||
|
else {
|
||||||
|
DatagramSocketFD recvSocket;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,14 +148,16 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
return fd;
|
return fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onConnect(FD sock)
|
private void onConnect(FD fd)
|
||||||
{
|
{
|
||||||
|
auto sock = cast(StreamSocketFD)fd;
|
||||||
|
auto l = lockHandle(sock);
|
||||||
m_loop.setNotifyCallback!(EventType.write)(sock, null);
|
m_loop.setNotifyCallback!(EventType.write)(sock, null);
|
||||||
with (m_loop.m_fds[sock].streamSocket) {
|
with (m_loop.m_fds[sock].streamSocket) {
|
||||||
state = ConnectionState.connected;
|
state = ConnectionState.connected;
|
||||||
auto cb = connectCallback;
|
auto cb = connectCallback;
|
||||||
connectCallback = null;
|
connectCallback = null;
|
||||||
if (cb) cb(cast(StreamSocketFD)sock, ConnectStatus.connected);
|
if (cb) cb(sock, ConnectStatus.connected);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,6 +351,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
|
|
||||||
void finalize()(IOStatus status)
|
void finalize()(IOStatus status)
|
||||||
{
|
{
|
||||||
|
auto l = lockHandle(socket);
|
||||||
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
||||||
//m_fds[fd].readBuffer = null;
|
//m_fds[fd].readBuffer = null;
|
||||||
slot.readCallback(socket, status, slot.bytesRead);
|
slot.readCallback(socket, status, slot.bytesRead);
|
||||||
|
@ -446,6 +449,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
auto err = getSocketError();
|
auto err = getSocketError();
|
||||||
if (!err.among!(EAGAIN, EINPROGRESS)) {
|
if (!err.among!(EAGAIN, EINPROGRESS)) {
|
||||||
|
auto l = lockHandle(socket);
|
||||||
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
||||||
slot.writeCallback(socket, IOStatus.error, slot.bytesRead);
|
slot.writeCallback(socket, IOStatus.error, slot.bytesRead);
|
||||||
return;
|
return;
|
||||||
|
@ -456,6 +460,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
slot.bytesWritten += ret;
|
slot.bytesWritten += ret;
|
||||||
slot.writeBuffer = slot.writeBuffer[ret .. $];
|
slot.writeBuffer = slot.writeBuffer[ret .. $];
|
||||||
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
|
if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
|
||||||
|
auto l = lockHandle(socket);
|
||||||
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
||||||
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten);
|
slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten);
|
||||||
return;
|
return;
|
||||||
|
@ -506,6 +511,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
|
|
||||||
void finalize()(IOStatus status)
|
void finalize()(IOStatus status)
|
||||||
{
|
{
|
||||||
|
auto l = lockHandle(socket);
|
||||||
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
||||||
//m_fds[fd].readBuffer = null;
|
//m_fds[fd].readBuffer = null;
|
||||||
slot.readCallback(socket, status, 0);
|
slot.readCallback(socket, status, 0);
|
||||||
|
@ -574,12 +580,17 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
}
|
}
|
||||||
|
|
||||||
final override DatagramSocketFD adoptDatagramSocket(int socket)
|
final override DatagramSocketFD adoptDatagramSocket(int socket)
|
||||||
|
{
|
||||||
|
return adoptDatagramSocketInternal(socket, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true)
|
||||||
{
|
{
|
||||||
auto fd = DatagramSocketFD(socket);
|
auto fd = DatagramSocketFD(socket);
|
||||||
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
|
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
|
||||||
return DatagramSocketFD.init;
|
return DatagramSocketFD.init;
|
||||||
setSocketNonBlocking(fd);
|
setSocketNonBlocking(fd);
|
||||||
m_loop.initFD(fd, FDFlags.none);
|
m_loop.initFD(fd, is_internal ? FDFlags.internal : FDFlags.none);
|
||||||
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
|
||||||
m_loop.m_fds[fd].specific = DgramSocketSlot.init;
|
m_loop.m_fds[fd].specific = DgramSocketSlot.init;
|
||||||
return fd;
|
return fd;
|
||||||
|
@ -683,12 +694,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
auto err = getSocketError();
|
auto err = getSocketError();
|
||||||
if (!err.among!(EAGAIN, EINPROGRESS)) {
|
if (!err.among!(EAGAIN, EINPROGRESS)) {
|
||||||
|
auto l = lockHandle(socket);
|
||||||
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
||||||
slot.readCallback(socket, IOStatus.error, 0, null);
|
slot.readCallback(socket, IOStatus.error, 0, null);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto l = lockHandle(socket);
|
||||||
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
m_loop.setNotifyCallback!(EventType.read)(socket, null);
|
||||||
scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr.sizeof);
|
scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr.sizeof);
|
||||||
() @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addrc);
|
() @trusted { return cast(DatagramIOCallback)slot.readCallback; } ()(socket, IOStatus.ok, ret, src_addrc);
|
||||||
|
@ -754,12 +767,14 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
auto err = getSocketError();
|
auto err = getSocketError();
|
||||||
if (!err.among!(EAGAIN, EINPROGRESS)) {
|
if (!err.among!(EAGAIN, EINPROGRESS)) {
|
||||||
|
auto l = lockHandle(socket);
|
||||||
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
||||||
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null);
|
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto l = lockHandle(socket);
|
||||||
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
m_loop.setNotifyCallback!(EventType.write)(socket, null);
|
||||||
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null);
|
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null);
|
||||||
}
|
}
|
||||||
|
@ -844,6 +859,19 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
|
||||||
}
|
}
|
||||||
return sock;
|
return sock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// keeps a scoped reference to a handle to avoid it getting destroyed
|
||||||
|
private auto lockHandle(H)(H handle)
|
||||||
|
{
|
||||||
|
addRef(handle);
|
||||||
|
static struct R {
|
||||||
|
PosixEventDriverSockets drv;
|
||||||
|
H handle;
|
||||||
|
@disable this(this);
|
||||||
|
~this() { drv.releaseRef(handle); }
|
||||||
|
}
|
||||||
|
return R(this, handle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
package struct StreamSocketSlot {
|
package struct StreamSocketSlot {
|
||||||
|
|
|
@ -216,7 +216,22 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
PollingThread[EventID] m_pollers;
|
PollingThread[EventID] m_pollers;
|
||||||
}
|
}
|
||||||
|
|
||||||
this(Events events) { m_events = events; }
|
this(Events events)
|
||||||
|
{
|
||||||
|
m_events = events;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dispose()
|
||||||
|
@trusted {
|
||||||
|
foreach (pt; m_pollers.byValue) {
|
||||||
|
pt.dispose();
|
||||||
|
try pt.join();
|
||||||
|
catch (Exception e) {
|
||||||
|
// not bringing down the application here, because not being
|
||||||
|
// able to join the thread here likely isn't a problem
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
|
final override WatcherID watchDirectory(string path, bool recursive, FileChangesCallback on_change)
|
||||||
{
|
{
|
||||||
|
@ -257,6 +272,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
assert(pt !is null);
|
assert(pt !is null);
|
||||||
if (!m_events.releaseRef(evt)) {
|
if (!m_events.releaseRef(evt)) {
|
||||||
pt.dispose();
|
pt.dispose();
|
||||||
|
m_pollers.remove(evt);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -280,6 +296,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
pt.m_callback(cast(WatcherID)evt, ch);
|
pt.m_callback(cast(WatcherID)evt, ch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private final class PollingThread : Thread {
|
private final class PollingThread : Thread {
|
||||||
int refCount = 1;
|
int refCount = 1;
|
||||||
EventID changesEvent;
|
EventID changesEvent;
|
||||||
|
@ -291,7 +308,6 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
immutable string m_basePath;
|
immutable string m_basePath;
|
||||||
immutable bool m_recursive;
|
immutable bool m_recursive;
|
||||||
immutable FileChangesCallback m_callback;
|
immutable FileChangesCallback m_callback;
|
||||||
shared bool m_shutdown = false;
|
|
||||||
size_t m_entryCount;
|
size_t m_entryCount;
|
||||||
|
|
||||||
struct Entry {
|
struct Entry {
|
||||||
|
@ -337,8 +353,6 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
|
|
||||||
void dispose()
|
void dispose()
|
||||||
nothrow {
|
nothrow {
|
||||||
import core.atomic : atomicStore;
|
|
||||||
|
|
||||||
try synchronized (m_changesMutex) {
|
try synchronized (m_changesMutex) {
|
||||||
changesEvent = EventID.invalid;
|
changesEvent = EventID.invalid;
|
||||||
} catch (Exception e) assert(false, e.msg);
|
} catch (Exception e) assert(false, e.msg);
|
||||||
|
@ -346,21 +360,25 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
|
|
||||||
private void run()
|
private void run()
|
||||||
nothrow @trusted {
|
nothrow @trusted {
|
||||||
import core.atomic : atomicLoad;
|
|
||||||
import core.time : msecs;
|
import core.time : msecs;
|
||||||
import std.algorithm.comparison : min;
|
import std.algorithm.comparison : min;
|
||||||
|
import std.datetime : Clock, UTC;
|
||||||
|
|
||||||
try while (true) {
|
try while (true) {
|
||||||
() @trusted { Thread.sleep(min(m_entryCount, 60000).msecs + 1000.msecs); } ();
|
auto timeout = Clock.currTime(UTC()) + min(m_entryCount, 60000).msecs + 1000.msecs;
|
||||||
|
while (true) {
|
||||||
try synchronized (m_changesMutex) {
|
try synchronized (m_changesMutex) {
|
||||||
if (changesEvent == EventID.invalid) break;
|
if (changesEvent == EventID.invalid) return;
|
||||||
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
|
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
|
||||||
|
auto remaining = timeout - Clock.currTime(UTC());
|
||||||
|
if (remaining <= 0.msecs) break;
|
||||||
|
sleep(min(1000.msecs, remaining));
|
||||||
|
}
|
||||||
|
|
||||||
scan(true);
|
scan(true);
|
||||||
|
|
||||||
try synchronized (m_changesMutex) {
|
try synchronized (m_changesMutex) {
|
||||||
if (changesEvent == EventID.invalid) break;
|
if (changesEvent == EventID.invalid) return;
|
||||||
if (m_changes.length)
|
if (m_changes.length)
|
||||||
m_eventsDriver.trigger(changesEvent, false);
|
m_eventsDriver.trigger(changesEvent, false);
|
||||||
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
|
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
|
||||||
|
@ -376,7 +394,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
private void addChange(FileChangeKind kind, Key key, bool is_dir)
|
private void addChange(FileChangeKind kind, Key key, bool is_dir)
|
||||||
{
|
{
|
||||||
try synchronized (m_changesMutex) {
|
try synchronized (m_changesMutex) {
|
||||||
m_changes ~= FileChange(kind, m_basePath, key.parent ? key.parent.path : ".", key.name, is_dir);
|
m_changes ~= FileChange(kind, m_basePath, key.parent ? key.parent.path : "", key.name, is_dir);
|
||||||
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
|
} catch (Exception e) assert(false, "Mutex lock failed: "~e.msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,9 +403,10 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
import std.algorithm.mutation : swap;
|
import std.algorithm.mutation : swap;
|
||||||
|
|
||||||
Entry*[Key] new_entries;
|
Entry*[Key] new_entries;
|
||||||
|
Entry*[] added;
|
||||||
size_t ec = 0;
|
size_t ec = 0;
|
||||||
|
|
||||||
scan(null, generate_changes, new_entries, ec);
|
scan(null, generate_changes, new_entries, added, ec);
|
||||||
|
|
||||||
foreach (e; m_entries.byKeyValue) {
|
foreach (e; m_entries.byKeyValue) {
|
||||||
if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) {
|
if (!e.key.parent || Key(e.key.parent.parent, e.key.parent.name) !in m_entries) {
|
||||||
|
@ -397,11 +416,14 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
delete e.value;
|
delete e.value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
foreach (e; added)
|
||||||
|
addChange(FileChangeKind.added, Key(e.parent, e.name), e.isDir);
|
||||||
|
|
||||||
swap(m_entries, new_entries);
|
swap(m_entries, new_entries);
|
||||||
m_entryCount = ec;
|
m_entryCount = ec;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref size_t ec)
|
private void scan(Entry* parent, bool generate_changes, ref Entry*[Key] new_entries, ref Entry*[] added, ref size_t ec)
|
||||||
@trusted nothrow {
|
@trusted nothrow {
|
||||||
import std.file : SpanMode, dirEntries;
|
import std.file : SpanMode, dirEntries;
|
||||||
import std.path : buildPath, baseName;
|
import std.path : buildPath, baseName;
|
||||||
|
@ -413,7 +435,7 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
if (auto pe = key in m_entries) {
|
if (auto pe = key in m_entries) {
|
||||||
if ((*pe).isDir) {
|
if ((*pe).isDir) {
|
||||||
if (m_recursive)
|
if (m_recursive)
|
||||||
scan(*pe, generate_changes, new_entries, ec);
|
scan(*pe, generate_changes, new_entries, added, ec);
|
||||||
} else {
|
} else {
|
||||||
if ((*pe).size != de.size || (*pe).lastChange != modified_time) {
|
if ((*pe).size != de.size || (*pe).lastChange != modified_time) {
|
||||||
if (generate_changes)
|
if (generate_changes)
|
||||||
|
@ -430,10 +452,9 @@ final class PollEventDriverWatchers(Events : EventDriverEvents) : EventDriverWat
|
||||||
auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time);
|
auto e = new Entry(parent, key.name, de.isDir ? ulong.max : de.size, modified_time);
|
||||||
new_entries[key] = e;
|
new_entries[key] = e;
|
||||||
ec++;
|
ec++;
|
||||||
if (generate_changes)
|
if (generate_changes) added ~= e;
|
||||||
addChange(FileChangeKind.added, key, e.isDir);
|
|
||||||
|
|
||||||
if (de.isDir && m_recursive) scan(e, false, new_entries, ec);
|
if (de.isDir && m_recursive) scan(e, false, new_entries, added, ec);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {} // will result in all children being flagged as removed
|
} catch (Exception e) {} // will result in all children being flagged as removed
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,7 +72,9 @@ final class WinAPIEventDriver : EventDriver {
|
||||||
|
|
||||||
override void dispose()
|
override void dispose()
|
||||||
{
|
{
|
||||||
|
if (!m_events) return;
|
||||||
m_events.dispose();
|
m_events.dispose();
|
||||||
|
m_events = null;
|
||||||
assert(threadInstance !is null);
|
assert(threadInstance !is null);
|
||||||
threadInstance = null;
|
threadInstance = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,9 +26,6 @@ FileChange[] pendingChanges;
|
||||||
|
|
||||||
void main()
|
void main()
|
||||||
{
|
{
|
||||||
version (OSX) writefln("Directory watchers are not yet supported on macOS. Skipping test.");
|
|
||||||
else {
|
|
||||||
|
|
||||||
if (exists(testDir))
|
if (exists(testDir))
|
||||||
rmdirRecurse(testDir);
|
rmdirRecurse(testDir);
|
||||||
|
|
||||||
|
@ -38,7 +35,7 @@ void main()
|
||||||
// test non-recursive watcher
|
// test non-recursive watcher
|
||||||
watcher = eventDriver.watchers.watchDirectory(testDir, false, toDelegate(&testCallback));
|
watcher = eventDriver.watchers.watchDirectory(testDir, false, toDelegate(&testCallback));
|
||||||
assert(watcher != WatcherID.invalid);
|
assert(watcher != WatcherID.invalid);
|
||||||
Thread.sleep(1000.msecs); // some watcher implementations need time to initialize
|
Thread.sleep(400.msecs); // some watcher implementations need time to initialize
|
||||||
testFile( "file1.dat");
|
testFile( "file1.dat");
|
||||||
testFile( "file2.dat");
|
testFile( "file2.dat");
|
||||||
testFile( "dira/file1.dat", false);
|
testFile( "dira/file1.dat", false);
|
||||||
|
@ -53,7 +50,7 @@ void main()
|
||||||
// test recursive watcher
|
// test recursive watcher
|
||||||
watcher = eventDriver.watchers.watchDirectory(testDir, true, toDelegate(&testCallback));
|
watcher = eventDriver.watchers.watchDirectory(testDir, true, toDelegate(&testCallback));
|
||||||
assert(watcher != WatcherID.invalid);
|
assert(watcher != WatcherID.invalid);
|
||||||
Thread.sleep(100.msecs); // some watcher implementations need time to initialize
|
Thread.sleep(400.msecs); // some watcher implementations need time to initialize
|
||||||
testFile( "file1.dat");
|
testFile( "file1.dat");
|
||||||
testFile( "file2.dat");
|
testFile( "file2.dat");
|
||||||
testFile( "dira/file1.dat");
|
testFile( "dira/file1.dat");
|
||||||
|
@ -73,8 +70,6 @@ void main()
|
||||||
// make sure that no watchers are registered anymore
|
// make sure that no watchers are registered anymore
|
||||||
auto er = eventDriver.core.processEvents(10.msecs);
|
auto er = eventDriver.core.processEvents(10.msecs);
|
||||||
assert(er == ExitReason.outOfWaiters);
|
assert(er == ExitReason.outOfWaiters);
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void testCallback(WatcherID w, in ref FileChange ch)
|
void testCallback(WatcherID w, in ref FileChange ch)
|
||||||
|
@ -105,6 +100,8 @@ void expectChange(FileChange ch, bool expect_change)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
assert(expect_change, "Got change although none was expected.");
|
||||||
|
|
||||||
auto pch = pendingChanges[0];
|
auto pch = pendingChanges[0];
|
||||||
|
|
||||||
// adjust for Windows behavior
|
// adjust for Windows behavior
|
||||||
|
|
|
@ -6,40 +6,42 @@ module test;
|
||||||
|
|
||||||
import eventcore.core;
|
import eventcore.core;
|
||||||
import std.stdio : File, writefln;
|
import std.stdio : File, writefln;
|
||||||
import std.file : exists, remove;
|
import std.file : exists, mkdir, remove, rmdirRecurse;
|
||||||
import core.time : Duration, msecs;
|
import core.time : Duration, msecs;
|
||||||
|
|
||||||
bool s_done;
|
bool s_done;
|
||||||
int s_cnt = 0;
|
int s_cnt = 0;
|
||||||
|
|
||||||
|
enum testDir = "watcher_test";
|
||||||
enum testFilename = "test.dat";
|
enum testFilename = "test.dat";
|
||||||
|
|
||||||
void main()
|
void main()
|
||||||
{
|
{
|
||||||
version (OSX) writefln("Directory watchers are not yet supported on macOS. Skipping test.");
|
if (exists(testDir))
|
||||||
else {
|
rmdirRecurse(testDir);
|
||||||
|
mkdir(testDir);
|
||||||
|
scope (exit) rmdirRecurse(testDir);
|
||||||
|
|
||||||
if (exists(testFilename))
|
auto id = eventDriver.watchers.watchDirectory(testDir, false, (id, ref change) {
|
||||||
remove(testFilename);
|
|
||||||
|
|
||||||
auto id = eventDriver.watchers.watchDirectory(".", false, (id, ref change) {
|
|
||||||
switch (s_cnt++) {
|
switch (s_cnt++) {
|
||||||
default: assert(false);
|
default:
|
||||||
|
import std.conv : to;
|
||||||
|
assert(false, "Unexpected change: "~change.to!string);
|
||||||
case 0:
|
case 0:
|
||||||
assert(change.kind == FileChangeKind.added);
|
assert(change.kind == FileChangeKind.added);
|
||||||
assert(change.baseDirectory == ".");
|
assert(change.baseDirectory == testDir);
|
||||||
assert(change.directory == "");
|
assert(change.directory == "");
|
||||||
assert(change.name == testFilename);
|
assert(change.name == testFilename);
|
||||||
break;
|
break;
|
||||||
case 1:
|
case 1:
|
||||||
assert(change.kind == FileChangeKind.modified);
|
assert(change.kind == FileChangeKind.modified);
|
||||||
assert(change.baseDirectory == ".");
|
assert(change.baseDirectory == testDir);
|
||||||
assert(change.directory == "");
|
assert(change.directory == "");
|
||||||
assert(change.name == testFilename);
|
assert(change.name == testFilename);
|
||||||
break;
|
break;
|
||||||
case 2:
|
case 2:
|
||||||
assert(change.kind == FileChangeKind.removed);
|
assert(change.kind == FileChangeKind.removed);
|
||||||
assert(change.baseDirectory == ".");
|
assert(change.baseDirectory == testDir);
|
||||||
assert(change.directory == "");
|
assert(change.directory == "");
|
||||||
assert(change.name == testFilename);
|
assert(change.name == testFilename);
|
||||||
eventDriver.watchers.releaseRef(id);
|
eventDriver.watchers.releaseRef(id);
|
||||||
|
@ -48,18 +50,18 @@ void main()
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
auto fil = File(testFilename, "wt");
|
auto fil = File(testDir~"/"~testFilename, "wt");
|
||||||
|
|
||||||
auto tm = eventDriver.timers.create();
|
auto tm = eventDriver.timers.create();
|
||||||
eventDriver.timers.set(tm, 100.msecs, 0.msecs);
|
eventDriver.timers.set(tm, 1500.msecs, 0.msecs);
|
||||||
eventDriver.timers.wait(tm, (tm) {
|
eventDriver.timers.wait(tm, (tm) {
|
||||||
scope (failure) assert(false);
|
scope (failure) assert(false);
|
||||||
fil.write("test");
|
fil.write("test");
|
||||||
fil.close();
|
fil.close();
|
||||||
eventDriver.timers.set(tm, 100.msecs, 0.msecs);
|
eventDriver.timers.set(tm, 1500.msecs, 0.msecs);
|
||||||
eventDriver.timers.wait(tm, (tm) {
|
eventDriver.timers.wait(tm, (tm) {
|
||||||
scope (failure) assert(false);
|
scope (failure) assert(false);
|
||||||
remove(testFilename);
|
remove(testDir~"/"~testFilename);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -69,6 +71,4 @@ void main()
|
||||||
assert(er == ExitReason.outOfWaiters);
|
assert(er == ExitReason.outOfWaiters);
|
||||||
assert(s_done);
|
assert(s_done);
|
||||||
s_done = false;
|
s_done = false;
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue