diff --git a/source/eventcore/drivers/posix.d b/source/eventcore/drivers/posix.d index bfc0aca..8847e2b 100644 --- a/source/eventcore/drivers/posix.d +++ b/source/eventcore/drivers/posix.d @@ -399,10 +399,10 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets final override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) { - if (buffer.length == 0) { + /*if (buffer.length == 0) { on_read_finish(socket, IOStatus.ok, 0); return; - } + }*/ sizediff_t ret; () @trusted { ret = .recv(socket, buffer.ptr, buffer.length, 0); } (); @@ -417,11 +417,13 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } if (ret == 0) { + print("disconnect"); on_read_finish(socket, IOStatus.disconnected, 0); return; } if (ret < 0 && mode == IOMode.immediate) { + print("wouldblock"); on_read_finish(socket, IOStatus.wouldBlock, 0); return; } @@ -469,8 +471,9 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets slot.readCallback(socket, status, slot.bytesRead); } - sizediff_t ret; - () @trusted { ret = .recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } (); + sizediff_t ret = 0; + if (!slot.readBuffer.length) + () @trusted { ret = .recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } (); if (ret < 0) { auto err = getSocketError(); if (!err.among!(EAGAIN, EINPROGRESS)) { @@ -479,13 +482,13 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets } } - if (ret == 0) { + if (ret == 0 && slot.readBuffer.length) { slot.state = ConnectionState.passiveClose; finalize(IOStatus.disconnected); return; } - if (ret > 0) { + if (ret > 0 || !slot.readBuffer.length) { slot.bytesRead += ret; slot.readBuffer = slot.readBuffer[ret .. $]; if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) { diff --git a/tests/0-tcp-readwait.d b/tests/0-tcp-readwait.d new file mode 100644 index 0000000..18a214e --- /dev/null +++ b/tests/0-tcp-readwait.d @@ -0,0 +1,66 @@ +/++ dub.sdl: + name "test" + dependency "eventcore" path=".." ++/ +module test; + +import eventcore.core; +import eventcore.socket; +import std.socket : InternetAddress; +import core.time : Duration, msecs; + +ubyte[256] s_rbuf; +bool s_done; + +void main() +{ + static ubyte[] pack1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + + auto baddr = new InternetAddress(0x7F000001, 40002); + auto server = listenStream(baddr); + StreamSocket client; + StreamSocket incoming; + + server.waitForConnections!((incoming_, addr) { + incoming = incoming_; // work around ref counting issue + incoming.read!((status, bts) { + assert(status == IOStatus.ok); + assert(bts == 0); + + incoming.read!((status, bts) { + assert(status == IOStatus.ok); + assert(bts == pack1.length); + assert(s_rbuf[0 .. bts] == pack1); + + destroy(incoming); + destroy(server); + destroy(client); + s_done = true; + + // FIXME: this shouldn't ne necessary: + eventDriver.core.exit(); + })(s_rbuf, IOMode.immediate); + })(s_rbuf[0 .. 0], IOMode.once); + }); + + connectStream!((sock, status) { + assert(status == ConnectStatus.connected); + client = sock; + + auto tm = eventDriver.timers.create(); + eventDriver.timers.set(tm, 100.msecs, 0.msecs); + eventDriver.timers.wait(tm, (tm) { + client.write!((wstatus, bytes) { + assert(wstatus == IOStatus.ok); + assert(bytes == 10); + })(pack1, IOMode.all); + }); + })(baddr); + + ExitReason er; + do er = eventDriver.core.processEvents(Duration.max); + while (er == ExitReason.idle); + //assert(er == ExitReason.outOfWaiters); // FIXME: see above + assert(s_done); + s_done = false; +}