Implement socket adoption and fix wait loops on Windows for the Posix driver.

This commit is contained in:
Sönke Ludwig 2017-01-22 10:43:18 +01:00
parent 65e9693265
commit ca81d25645
5 changed files with 65 additions and 14 deletions

View file

@ -32,9 +32,10 @@ Timers | yes | yes | yes | &m
Events | yes | yes | yes | — Events | yes | yes | yes | —
Unix Signals | yes² | yes² | — | — Unix Signals | yes² | yes² | — | —
Files | yes | yes | yes | — Files | yes | yes | yes | —
UI Integration | — | — | yes | — UI Integration | yes¹ | yes¹ | yes | —
File watcher | yes² | yes² | yes | — File watcher | yes² | yes² | yes | —
¹ Manually, by adopting the X11 display connection socket
² Currently only supported on Linux ² Currently only supported on Linux

View file

@ -82,6 +82,7 @@ interface EventDriverCore {
interface EventDriverSockets { interface EventDriverSockets {
@safe: /*@nogc:*/ nothrow: @safe: /*@nogc:*/ nothrow:
StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect); StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect);
StreamSocketFD adoptStream(int socket);
StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept); StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept);
void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept);
ConnectionState getConnectionState(StreamSocketFD sock); ConnectionState getConnectionState(StreamSocketFD sock);
@ -96,6 +97,7 @@ interface EventDriverSockets {
void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write); void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write);
DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address); DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address);
DatagramSocketFD adoptDatagramSocket(int socket);
bool setBroadcast(DatagramSocketFD socket, bool enable); bool setBroadcast(DatagramSocketFD socket, bool enable);
void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish); void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish);
void cancelReceive(DatagramSocketFD socket); void cancelReceive(DatagramSocketFD socket);

View file

@ -89,6 +89,11 @@ final class LibasyncEventDriverSockets : EventDriverSockets {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override StreamSocketFD adoptStream(int socket)
{
assert(false, "TODO!");
}
override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
@ -154,6 +159,12 @@ final class LibasyncEventDriverSockets : EventDriverSockets {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override DatagramSocketFD adoptDatagramSocket(int socket)
{
assert(false);
}
override bool setBroadcast(DatagramSocketFD socket, bool enable) override bool setBroadcast(DatagramSocketFD socket, bool enable)
{ {
assert(false, "TODO!"); assert(false, "TODO!");

View file

@ -12,6 +12,8 @@ import eventcore.drivers.threadedfile;
import eventcore.internal.consumablequeue : ConsumableQueue; import eventcore.internal.consumablequeue : ConsumableQueue;
import eventcore.internal.utils; import eventcore.internal.utils;
import std.algorithm.comparison : among;
import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress; import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress;
version (Posix) { version (Posix) {
import std.socket : UnixAddress; import std.socket : UnixAddress;
@ -255,7 +257,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
on_connect(sock, ConnectStatus.connected); on_connect(sock, ConnectStatus.connected);
} else { } else {
auto err = getSocketError(); auto err = getSocketError();
if (err == EINPROGRESS) { if (err.among!(EAGAIN, EINPROGRESS)) {
with (m_loop.m_fds[sock].streamSocket) { with (m_loop.m_fds[sock].streamSocket) {
connectCallback = on_connect; connectCallback = on_connect;
state = ConnectionState.connecting; state = ConnectionState.connecting;
@ -273,6 +275,18 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return sock; return sock;
} }
final override StreamSocketFD adoptStream(int socket)
{
auto fd = StreamSocketFD(socket);
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return StreamSocketFD.invalid;
setSocketNonBlocking(fd);
m_loop.initFD(fd);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[fd].specific = StreamSocketSlot.init;
return fd;
}
private void onConnect(FD sock) private void onConnect(FD sock)
{ {
m_loop.setNotifyCallback!(EventType.write)(sock, null); m_loop.setNotifyCallback!(EventType.write)(sock, null);
@ -348,6 +362,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
auto fd = cast(StreamSocketFD)sockfd; auto fd = cast(StreamSocketFD)sockfd;
m_loop.initFD(fd); m_loop.initFD(fd);
m_loop.m_fds[fd].specific = StreamSocketSlot.init; m_loop.m_fds[fd].specific = StreamSocketSlot.init;
m_loop.m_fds[fd].specific.state = ConnectionState.connected;
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
//print("accept %d", sockfd); //print("accept %d", sockfd);
scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
@ -394,7 +409,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (!err.among!(EAGAIN, EINPROGRESS)) {
print("sock error %s!", err); print("sock error %s!", err);
on_read_finish(socket, IOStatus.error, 0); on_read_finish(socket, IOStatus.error, 0);
return; return;
@ -458,7 +473,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
() @trusted { ret = .recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } (); () @trusted { ret = .recv(socket, slot.readBuffer.ptr, slot.readBuffer.length, 0); } ();
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (!err.among!(EAGAIN, EINPROGRESS)) {
finalize(IOStatus.error); finalize(IOStatus.error);
return; return;
} }
@ -492,7 +507,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (!err.among!(EAGAIN, EINPROGRESS)) {
on_write_finish(socket, IOStatus.error, 0); on_write_finish(socket, IOStatus.error, 0);
return; return;
} }
@ -550,7 +565,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (!err.among!(EAGAIN, EINPROGRESS)) {
m_loop.setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
slot.writeCallback(socket, IOStatus.error, slot.bytesRead); slot.writeCallback(socket, IOStatus.error, slot.bytesRead);
return; return;
@ -582,7 +597,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (!err.among!(EAGAIN, EINPROGRESS)) {
on_data_available(socket, IOStatus.error, 0); on_data_available(socket, IOStatus.error, 0);
return; return;
} }
@ -627,7 +642,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
() @trusted { ret = recv(socket, &tmp, 1, MSG_PEEK); } (); () @trusted { ret = recv(socket, &tmp, 1, MSG_PEEK); } ();
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) finalize(IOStatus.error); if (!err.among!(EAGAIN, EINPROGRESS)) finalize(IOStatus.error);
} else finalize(ret ? IOStatus.ok : IOStatus.disconnected); } else finalize(ret ? IOStatus.ok : IOStatus.disconnected);
} }
@ -662,6 +677,18 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
return sock; return sock;
} }
final override DatagramSocketFD adoptDatagramSocket(int socket)
{
auto fd = DatagramSocketFD(socket);
if (m_loop.m_fds[fd].common.refCount) // FD already in use?
return DatagramSocketFD.init;
setSocketNonBlocking(fd);
m_loop.initFD(fd);
m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
m_loop.m_fds[fd].specific = DgramSocketSlot.init;
return fd;
}
final override bool setBroadcast(DatagramSocketFD socket, bool enable) final override bool setBroadcast(DatagramSocketFD socket, bool enable)
{ {
int tmp_broad = enable; int tmp_broad = enable;
@ -681,7 +708,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (!err.among!(EAGAIN, EINPROGRESS)) {
print("sock error %s!", err); print("sock error %s!", err);
on_receive_finish(socket, IOStatus.error, 0, null); on_receive_finish(socket, IOStatus.error, 0, null);
return; return;
@ -725,7 +752,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (!err.among!(EAGAIN, EINPROGRESS)) {
m_loop.setNotifyCallback!(EventType.read)(socket, null); m_loop.setNotifyCallback!(EventType.read)(socket, null);
slot.readCallback(socket, IOStatus.error, 0, null); slot.readCallback(socket, IOStatus.error, 0, null);
return; return;
@ -751,7 +778,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (!err.among!(EAGAIN, EINPROGRESS)) {
print("sock error %s!", err); print("sock error %s!", err);
on_send_finish(socket, IOStatus.error, 0, null); on_send_finish(socket, IOStatus.error, 0, null);
return; return;
@ -796,7 +823,7 @@ final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets
if (ret < 0) { if (ret < 0) {
auto err = getSocketError(); auto err = getSocketError();
if (err != EAGAIN) { if (!err.among!(EAGAIN, EINPROGRESS)) {
m_loop.setNotifyCallback!(EventType.write)(socket, null); m_loop.setNotifyCallback!(EventType.write)(socket, null);
() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null); () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null);
return; return;
@ -1344,7 +1371,7 @@ final class SignalFDEventDriverSignals(Loop : PosixEventLoop) : EventDriverSigna
signalfd_siginfo nfo; signalfd_siginfo nfo;
do { do {
auto ret = () @trusted { return read(fd, &nfo, nfo.sizeof); } (); auto ret = () @trusted { return read(fd, &nfo, nfo.sizeof); } ();
if (ret == -1 && errno == EAGAIN) if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS))
break; break;
auto cb = m_loop.m_fds[fd].signal.callback; auto cb = m_loop.m_fds[fd].signal.callback;
if (ret != nfo.sizeof) { if (ret != nfo.sizeof) {
@ -1456,7 +1483,7 @@ final class InotifyEventDriverWatchers(Loop : PosixEventLoop) : EventDriverWatch
while (true) { while (true) {
auto ret = () @trusted { return read(id, &buf[0], buf.length); } (); auto ret = () @trusted { return read(id, &buf[0], buf.length); } ();
if (ret == -1 && errno == EAGAIN) if (ret == -1 && errno.among!(EAGAIN, EINPROGRESS))
break; break;
assert(ret <= buf.length); assert(ret <= buf.length);

View file

@ -241,6 +241,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override StreamSocketFD adoptStream(int socket)
{
assert(false, "TODO!");
}
override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) override StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept)
{ {
assert(false, "TODO!"); assert(false, "TODO!");
@ -306,6 +311,11 @@ final class WinAPIEventDriverSockets : EventDriverSockets {
assert(false, "TODO!"); assert(false, "TODO!");
} }
override DatagramSocketFD adoptDatagramSocket(int socket)
{
assert(false);
}
override bool setBroadcast(DatagramSocketFD socket, bool enable) override bool setBroadcast(DatagramSocketFD socket, bool enable)
{ {
assert(false, "TODO!"); assert(false, "TODO!");