2016-03-01 19:30:42 +00:00
/ * *
TCP / UDP connection and server handling .
Copyright : © 2012 - 2016 RejectedSoftware e . K .
Authors : Sönke Ludwig
License : Subject to the terms of the MIT license , as written in the included LICENSE . txt file .
* /
module vibe.core.net ;
import eventcore.core ;
import std.exception : enforce ;
import std.format : format ;
import std.functional : toDelegate ;
import std.socket : AddressFamily , UnknownAddress ;
import vibe.core.log ;
2016-06-18 08:00:02 +00:00
import vibe.core.stream ;
2016-03-01 19:30:42 +00:00
import vibe.internal.async ;
2016-06-14 06:01:03 +00:00
import core.time : Duration ;
2016-11-02 19:58:00 +00:00
@safe :
2016-03-01 19:30:42 +00:00
/ * *
Resolves the given host name / IP address string .
Setting use_dns to false will only allow IP address strings but also guarantees
that the call will not block .
* /
NetworkAddress resolveHost ( string host , AddressFamily address_family = AddressFamily . UNSPEC , bool use_dns = true )
{
return resolveHost ( host , cast ( ushort ) address_family , use_dns ) ;
}
/// ditto
NetworkAddress resolveHost ( string host , ushort address_family , bool use_dns = true )
{
2016-06-14 06:01:03 +00:00
import std.socket : parseAddress ;
2017-07-02 22:39:41 +00:00
version ( Windows ) import core.sys.windows.winsock2 : sockaddr_in , sockaddr_in6 ;
2016-06-14 06:01:03 +00:00
else import core.sys.posix.netinet.in_ : sockaddr_in , sockaddr_in6 ;
enforce ( host . length > 0 , "Host name must not be empty." ) ;
if ( host [ 0 ] = = ':' | | host [ 0 ] > = '0' & & host [ 0 ] < = '9' ) {
auto addr = parseAddress ( host ) ;
enforce ( address_family = = AddressFamily . UNSPEC | | addr . addressFamily = = address_family ) ;
NetworkAddress ret ;
ret . family = addr . addressFamily ;
switch ( addr . addressFamily ) with ( AddressFamily ) {
default : throw new Exception ( "Unsupported address family" ) ;
2016-11-02 19:58:00 +00:00
case INET : * ret . sockAddrInet4 = ( ) @trusted { return * cast ( sockaddr_in * ) addr . name ; } ( ) ; break ;
case INET6 : * ret . sockAddrInet6 = ( ) @trusted { return * cast ( sockaddr_in6 * ) addr . name ; } ( ) ; break ;
2016-06-14 06:01:03 +00:00
}
return ret ;
} else {
enforce ( use_dns , "Malformed IP address string." ) ;
2017-01-15 23:20:35 +00:00
NetworkAddress res ;
bool success = false ;
2017-01-15 23:23:37 +00:00
Waitable ! ( DNSLookupCallback ,
2016-10-24 22:27:51 +00:00
cb = > eventDriver . dns . lookupHost ( host , cb ) ,
2017-01-15 23:20:35 +00:00
( cb , id ) = > eventDriver . dns . cancelLookup ( id ) ,
( DNSLookupID , DNSStatus status , scope RefAddress [ ] addrs ) {
if ( status = = DNSStatus . ok & & addrs . length > 0 ) {
try res = NetworkAddress ( addrs [ 0 ] ) ;
catch ( Exception e ) { logDiagnostic ( "Failed to store address from DNS lookup: %s" , e . msg ) ; }
success = true ;
}
}
) waitable ;
asyncAwaitAny ! true ( waitable ) ;
enforce ( success , "Failed to lookup host '" ~ host ~ "'." ) ;
return res ;
2016-06-14 06:01:03 +00:00
}
2016-03-01 19:30:42 +00:00
}
/ * *
Starts listening on the specified port .
' connection_callback ' will be called for each client that connects to the
server socket . Each new connection gets its own fiber . The stream parameter
then allows to perform blocking I / O on the client socket .
The address parameter can be used to specify the network
interface on which the server socket is supposed to listen for connections .
By default , all IPv4 and IPv6 interfaces will be used .
* /
TCPListener [ ] listenTCP ( ushort port , TCPConnectionDelegate connection_callback , TCPListenOptions options = TCPListenOptions . defaults )
{
TCPListener [ ] ret ;
try ret ~ = listenTCP ( port , connection_callback , "::" , options ) ;
catch ( Exception e ) logDiagnostic ( "Failed to listen on \"::\": %s" , e . msg ) ;
try ret ~ = listenTCP ( port , connection_callback , "0.0.0.0" , options ) ;
catch ( Exception e ) logDiagnostic ( "Failed to listen on \"0.0.0.0\": %s" , e . msg ) ;
enforce ( ret . length > 0 , format ( "Failed to listen on all interfaces on port %s" , port ) ) ;
return ret ;
}
/// ditto
TCPListener listenTCP ( ushort port , TCPConnectionDelegate connection_callback , string address , TCPListenOptions options = TCPListenOptions . defaults )
{
auto addr = resolveHost ( address ) ;
addr . port = port ;
2017-01-27 21:51:17 +00:00
StreamListenOptions sopts = StreamListenOptions . defaults ;
if ( options & TCPListenOptions . reusePort )
sopts | = StreamListenOptions . reusePort ;
2017-01-30 10:40:11 +00:00
scope addrc = new RefAddress ( addr . sockAddr , addr . sockAddrLen ) ;
auto sock = eventDriver . sockets . listenStream ( addrc , sopts ,
2017-01-15 19:59:36 +00:00
( StreamListenSocketFD ls , StreamSocketFD s , scope RefAddress addr ) @safe nothrow {
import vibe.core.core : runTask ;
auto conn = TCPConnection ( s , addr ) ;
runTask ( connection_callback , conn ) ;
} ) ;
2017-01-30 21:52:36 +00:00
enforce ( sock ! = StreamListenSocketFD . invalid , "Failed to listen on " ~ addr . toString ( ) ) ;
2016-03-01 19:30:42 +00:00
return TCPListener ( sock ) ;
}
2017-01-30 08:19:08 +00:00
/// Compatibility overload - use an `@safe nothrow` callback instead.
deprecated ( "Use a @safe nothrow callback instead." )
TCPListener [ ] listenTCP ( ushort port , void delegate ( TCPConnection ) connection_callback , TCPListenOptions options = TCPListenOptions . defaults )
{
TCPListener [ ] ret ;
try ret ~ = listenTCP ( port , connection_callback , "::" , options ) ;
catch ( Exception e ) logDiagnostic ( "Failed to listen on \"::\": %s" , e . msg ) ;
try ret ~ = listenTCP ( port , connection_callback , "0.0.0.0" , options ) ;
catch ( Exception e ) logDiagnostic ( "Failed to listen on \"0.0.0.0\": %s" , e . msg ) ;
enforce ( ret . length > 0 , format ( "Failed to listen on all interfaces on port %s" , port ) ) ;
return ret ;
}
/// ditto
deprecated ( "Use a @safe nothrow callback instead." )
TCPListener listenTCP ( ushort port , void delegate ( TCPConnection ) connection_callback , string address , TCPListenOptions options = TCPListenOptions . defaults )
{
return listenTCP ( port , ( conn ) @trusted nothrow {
try connection_callback ( conn ) ;
catch ( Exception e ) {
logError ( "Handling of connection failed: %s" , e . msg ) ;
conn . close ( ) ;
}
} , address , options ) ;
}
2016-03-01 19:30:42 +00:00
/ * *
Starts listening on the specified port .
This function is the same as listenTCP but takes a function callback instead of a delegate .
* /
TCPListener [ ] listenTCP_s ( ushort port , TCPConnectionFunction connection_callback , TCPListenOptions options = TCPListenOptions . defaults )
{
return listenTCP ( port , toDelegate ( connection_callback ) , options ) ;
}
/// ditto
TCPListener listenTCP_s ( ushort port , TCPConnectionFunction connection_callback , string address , TCPListenOptions options = TCPListenOptions . defaults )
{
return listenTCP ( port , toDelegate ( connection_callback ) , address , options ) ;
}
/ * *
Establishes a connection to the given host / port .
* /
2016-11-02 19:58:00 +00:00
TCPConnection connectTCP ( string host , ushort port , string bind_interface = null , ushort bind_port = 0 )
2016-03-01 19:30:42 +00:00
{
NetworkAddress addr = resolveHost ( host ) ;
addr . port = port ;
2016-11-02 19:58:00 +00:00
if ( addr . family ! = AddressFamily . UNIX )
addr . port = port ;
NetworkAddress bind_address ;
if ( bind_interface . length ) bind_address = resolveHost ( bind_interface , addr . family ) ;
else {
bind_address . family = addr . family ;
if ( bind_address . family = = AddressFamily . INET ) bind_address . sockAddrInet4 . sin_addr . s_addr = 0 ;
else if ( bind_address . family ! = AddressFamily . UNIX ) bind_address . sockAddrInet6 . sin6_addr . s6_addr [ ] = 0 ;
}
if ( addr . family ! = AddressFamily . UNIX )
bind_address . port = bind_port ;
return connectTCP ( addr , bind_address ) ;
2016-03-01 19:30:42 +00:00
}
/// ditto
2016-11-02 19:58:00 +00:00
TCPConnection connectTCP ( NetworkAddress addr , NetworkAddress bind_address = anyAddress )
2016-03-01 19:30:42 +00:00
{
import std.conv : to ;
2016-11-02 19:58:00 +00:00
if ( bind_address . family = = AddressFamily . UNSPEC ) {
bind_address . family = addr . family ;
if ( bind_address . family = = AddressFamily . INET ) bind_address . sockAddrInet4 . sin_addr . s_addr = 0 ;
else if ( bind_address . family ! = AddressFamily . UNIX ) bind_address . sockAddrInet6 . sin6_addr . s6_addr [ ] = 0 ;
if ( bind_address . family ! = AddressFamily . UNIX )
bind_address . port = 0 ;
}
enforce ( addr . family = = bind_address . family , "Destination address and bind address have different address families." ) ;
return ( ) @trusted { // scope
2017-01-15 19:59:36 +00:00
scope uaddr = new RefAddress ( addr . sockAddr , addr . sockAddrLen ) ;
2017-01-15 22:17:14 +00:00
scope baddr = new RefAddress ( bind_address . sockAddr , bind_address . sockAddrLen ) ;
2017-07-18 09:55:39 +00:00
2016-11-02 19:58:00 +00:00
// FIXME: make this interruptible
2017-07-18 09:55:39 +00:00
auto result = asyncAwaitUninterruptible ! ( ConnectCallback ,
2016-11-02 19:58:00 +00:00
cb = > eventDriver . sockets . connectStream ( uaddr , baddr , cb )
//cb => eventDriver.sockets.cancelConnect(cb)
) ;
enforce ( result [ 1 ] = = ConnectStatus . connected , "Failed to connect to " ~ addr . toString ( ) ~ ": " ~ result [ 1 ] . to ! string ) ;
2017-01-15 19:59:36 +00:00
return TCPConnection ( result [ 0 ] , uaddr ) ;
2016-11-02 19:58:00 +00:00
} ( ) ;
2016-03-01 19:30:42 +00:00
}
/ * *
Creates a bound UDP socket suitable for sending and receiving packets .
* /
2017-09-03 09:17:54 +00:00
UDPConnection listenUDP ( ref NetworkAddress addr )
{
return UDPConnection ( addr ) ;
}
/// ditto
2016-03-01 19:30:42 +00:00
UDPConnection listenUDP ( ushort port , string bind_address = "0.0.0.0" )
{
2016-10-24 22:27:51 +00:00
auto addr = resolveHost ( bind_address , AddressFamily . UNSPEC , false ) ;
addr . port = port ;
return UDPConnection ( addr ) ;
2016-03-01 19:30:42 +00:00
}
2016-11-02 19:58:00 +00:00
NetworkAddress anyAddress ( )
{
NetworkAddress ret ;
ret . family = AddressFamily . UNSPEC ;
return ret ;
}
2016-03-01 19:30:42 +00:00
/// Callback invoked for incoming TCP connections.
@safe nothrow alias TCPConnectionDelegate = void delegate ( TCPConnection stream ) ;
/// ditto
@safe nothrow alias TCPConnectionFunction = void delegate ( TCPConnection stream ) ;
/ * *
Represents a network / socket address .
* /
struct NetworkAddress {
2017-01-15 22:17:14 +00:00
import std.algorithm.comparison : max ;
2016-10-24 22:27:51 +00:00
import std.socket : Address ;
2016-12-19 15:16:50 +00:00
version ( Windows ) import core.sys.windows.winsock2 ;
2016-03-01 19:30:42 +00:00
else import core.sys.posix.netinet.in_ ;
2017-03-10 18:53:40 +00:00
version ( Posix ) import core.sys.posix.sys.un : sockaddr_un ;
2016-03-01 19:30:42 +00:00
@safe :
private union {
sockaddr addr ;
2017-03-10 18:53:40 +00:00
version ( Posix ) sockaddr_un addr_unix ;
2016-03-01 19:30:42 +00:00
sockaddr_in addr_ip4 ;
sockaddr_in6 addr_ip6 ;
}
2017-01-15 22:17:14 +00:00
enum socklen_t sockAddrMaxLen = max ( addr . sizeof , addr_ip6 . sizeof ) ;
2016-10-24 22:27:51 +00:00
this ( Address addr )
@trusted
{
assert ( addr ! is null ) ;
switch ( addr . addressFamily ) {
default : throw new Exception ( "Unsupported address family." ) ;
case AddressFamily . INET :
this . family = AddressFamily . INET ;
assert ( addr . nameLen > = sockaddr_in . sizeof ) ;
* this . sockAddrInet4 = * cast ( sockaddr_in * ) addr . name ;
break ;
case AddressFamily . INET6 :
this . family = AddressFamily . INET6 ;
assert ( addr . nameLen > = sockaddr_in6 . sizeof ) ;
* this . sockAddrInet6 = * cast ( sockaddr_in6 * ) addr . name ;
break ;
2017-03-10 18:53:40 +00:00
version ( Posix ) {
case AddressFamily . UNIX :
this . family = AddressFamily . UNIX ;
assert ( addr . nameLen > = sockaddr_un . sizeof ) ;
2017-03-10 19:07:04 +00:00
* this . sockAddrUnix = * cast ( sockaddr_un * ) addr . name ;
2017-03-10 18:53:40 +00:00
break ;
}
2016-10-24 22:27:51 +00:00
}
}
2016-03-01 19:30:42 +00:00
/ * * Family of the socket address .
* /
@property ushort family ( ) const pure nothrow { return addr . sa_family ; }
/// ditto
@property void family ( AddressFamily val ) pure nothrow { addr . sa_family = cast ( ubyte ) val ; }
/// ditto
@property void family ( ushort val ) pure nothrow { addr . sa_family = cast ( ubyte ) val ; }
/ * * The port in host byte order .
* /
@property ushort port ( )
const pure nothrow {
ushort nport ;
switch ( this . family ) {
default : assert ( false , "port() called for invalid address family." ) ;
case AF_INET : nport = addr_ip4 . sin_port ; break ;
case AF_INET6 : nport = addr_ip6 . sin6_port ; break ;
}
return ( ) @trusted { return ntoh ( nport ) ; } ( ) ;
}
/// ditto
@property void port ( ushort val )
pure nothrow {
auto nport = ( ) @trusted { return hton ( val ) ; } ( ) ;
switch ( this . family ) {
default : assert ( false , "port() called for invalid address family." ) ;
case AF_INET : addr_ip4 . sin_port = nport ; break ;
case AF_INET6 : addr_ip6 . sin6_port = nport ; break ;
}
}
/ * * A pointer to a sockaddr struct suitable for passing to socket functions .
* /
@property inout ( sockaddr ) * sockAddr ( ) inout pure nothrow { return & addr ; }
/ * * Size of the sockaddr struct that is returned by sockAddr ( ) .
* /
2017-01-15 22:17:14 +00:00
@property socklen_t sockAddrLen ( )
2016-03-01 19:30:42 +00:00
const pure nothrow {
switch ( this . family ) {
default : assert ( false , "sockAddrLen() called for invalid address family." ) ;
case AF_INET : return addr_ip4 . sizeof ;
case AF_INET6 : return addr_ip6 . sizeof ;
2017-03-10 18:53:40 +00:00
version ( Posix ) {
case AF_UNIX : return addr_unix . sizeof ;
}
2016-03-01 19:30:42 +00:00
}
}
@property inout ( sockaddr_in ) * sockAddrInet4 ( ) inout pure nothrow
in { assert ( family = = AF_INET ) ; }
body { return & addr_ip4 ; }
@property inout ( sockaddr_in6 ) * sockAddrInet6 ( ) inout pure nothrow
in { assert ( family = = AF_INET6 ) ; }
body { return & addr_ip6 ; }
2017-03-10 18:53:40 +00:00
version ( Posix ) {
@property inout ( sockaddr_un ) * sockAddrUnix ( ) inout pure nothrow
in { assert ( family = = AddressFamily . UNIX ) ; }
body { return & addr_unix ; }
}
2016-03-01 19:30:42 +00:00
/ * * Returns a string representation of the IP address
* /
string toAddressString ( )
2017-01-29 19:19:38 +00:00
const nothrow {
2016-03-01 19:30:42 +00:00
import std.array : appender ;
2016-03-11 07:21:51 +00:00
auto ret = appender ! string ( ) ;
ret . reserve ( 40 ) ;
toAddressString ( str = > ret . put ( str ) ) ;
return ret . data ;
}
/// ditto
void toAddressString ( scope void delegate ( const ( char ) [ ] ) @safe sink )
2017-01-29 19:19:38 +00:00
const nothrow {
2016-03-11 07:21:51 +00:00
import std.array : appender ;
2016-03-01 19:30:42 +00:00
import std.format : formattedWrite ;
ubyte [ 2 ] _dummy = void ; // Workaround for DMD regression in master
2017-01-29 19:19:38 +00:00
scope ( failure ) assert ( false ) ;
2016-03-01 19:30:42 +00:00
switch ( this . family ) {
default : assert ( false , "toAddressString() called for invalid address family." ) ;
2017-07-19 12:50:20 +00:00
case AF_UNSPEC :
sink ( "<UNSPEC>" ) ;
break ;
2017-03-10 22:00:43 +00:00
case AF_INET : {
2016-03-01 19:30:42 +00:00
ubyte [ 4 ] ip = ( ) @trusted { return ( cast ( ubyte * ) & addr_ip4 . sin_addr . s_addr ) [ 0 . . 4 ] ; } ( ) ;
2016-03-11 07:21:51 +00:00
sink . formattedWrite ( "%d.%d.%d.%d" , ip [ 0 ] , ip [ 1 ] , ip [ 2 ] , ip [ 3 ] ) ;
2017-03-10 22:00:43 +00:00
} break ;
case AF_INET6 : {
2016-03-01 19:30:42 +00:00
ubyte [ 16 ] ip = addr_ip6 . sin6_addr . s6_addr ;
foreach ( i ; 0 . . 8 ) {
2016-03-11 07:21:51 +00:00
if ( i > 0 ) sink ( ":" ) ;
2016-03-01 19:30:42 +00:00
_dummy [ ] = ip [ i * 2 . . i * 2 + 2 ] ;
2016-03-11 07:21:51 +00:00
sink . formattedWrite ( "%x" , bigEndianToNative ! ushort ( _dummy ) ) ;
2016-03-01 19:30:42 +00:00
}
2017-03-10 22:00:43 +00:00
} break ;
2017-03-10 18:53:40 +00:00
version ( Posix ) {
case AddressFamily . UNIX :
import std.traits : hasMember ;
2017-03-10 19:07:04 +00:00
import std.string : fromStringz ;
2017-03-10 18:53:40 +00:00
static if ( hasMember ! ( sockaddr_un , "sun_len" ) )
2017-03-10 19:07:04 +00:00
sink ( ( ) @trusted { return cast ( char [ ] ) addr_unix . sun_path [ 0. . addr_unix . sun_len ] ; } ( ) ) ;
2017-03-10 18:53:40 +00:00
else
2017-03-10 19:07:04 +00:00
sink ( ( ) @trusted { return ( cast ( char * ) addr_unix . sun_path . ptr ) . fromStringz ; } ( ) ) ;
2017-03-10 18:53:40 +00:00
break ;
}
2016-03-01 19:30:42 +00:00
}
}
/ * * Returns a full string representation of the address , including the port number .
* /
string toString ( )
2017-01-29 19:19:38 +00:00
const nothrow {
2016-03-11 07:21:51 +00:00
import std.array : appender ;
auto ret = appender ! string ( ) ;
toString ( str = > ret . put ( str ) ) ;
return ret . data ;
}
/// ditto
void toString ( scope void delegate ( const ( char ) [ ] ) @safe sink )
2017-01-29 19:19:38 +00:00
const nothrow {
2016-03-11 07:21:51 +00:00
import std.format : formattedWrite ;
2017-07-19 12:50:20 +00:00
try {
switch ( this . family ) {
default : assert ( false , "toString() called for invalid address family." ) ;
case AF_UNSPEC :
sink ( "<UNSPEC>" ) ;
break ;
case AF_INET :
toAddressString ( sink ) ;
sink . formattedWrite ( ":%s" , port ) ;
break ;
case AF_INET6 :
sink ( "[" ) ;
toAddressString ( sink ) ;
sink . formattedWrite ( "]:%s" , port ) ;
break ;
case AddressFamily . UNIX :
toAddressString ( sink ) ;
break ;
}
} catch ( Exception e ) {
assert ( false , "Unexpected exception: " ~ e . msg ) ;
2016-03-01 19:30:42 +00:00
}
}
version ( Have_libev ) { }
else {
unittest {
void test ( string ip ) {
auto res = ( ) @trusted { return resolveHost ( ip , AF_UNSPEC , false ) ; } ( ) . toAddressString ( ) ;
assert ( res = = ip ,
"IP " ~ ip ~ " yielded wrong string representation: " ~ res ) ;
}
test ( "1.2.3.4" ) ;
test ( "102:304:506:708:90a:b0c:d0e:f10" ) ;
}
}
}
/ * *
Represents a single TCP connection .
* /
struct TCPConnection {
@safe :
import core.time : seconds ;
2016-04-10 12:49:11 +00:00
import vibe.internal.array : BatchBuffer ;
2016-03-01 19:30:42 +00:00
//static assert(isConnectionStream!TCPConnection);
2017-01-15 21:19:41 +00:00
static struct Context {
2016-04-10 12:49:11 +00:00
BatchBuffer ! ubyte readBuffer ;
2017-01-15 19:59:36 +00:00
bool tcpNoDelay = false ;
bool keepAlive = false ;
Duration readTimeout = Duration . max ;
string remoteAddressString ;
2016-03-01 19:30:42 +00:00
}
private {
StreamSocketFD m_socket ;
Context * m_context ;
}
2017-01-15 19:59:36 +00:00
private this ( StreamSocketFD socket , scope RefAddress remote_address )
2016-03-01 19:30:42 +00:00
nothrow {
2017-01-15 19:59:36 +00:00
import std.exception : enforce ;
2016-03-01 19:30:42 +00:00
m_socket = socket ;
2017-06-26 23:11:50 +00:00
m_context = ( ) @trusted { return & eventDriver . sockets . userData ! Context ( socket ) ; } ( ) ;
2016-03-01 19:30:42 +00:00
m_context . readBuffer . capacity = 4096 ;
}
this ( this )
nothrow {
if ( m_socket ! = StreamSocketFD . invalid )
2016-10-05 12:40:29 +00:00
eventDriver . sockets . addRef ( m_socket ) ;
2016-03-01 19:30:42 +00:00
}
~ this ( )
nothrow {
if ( m_socket ! = StreamSocketFD . invalid )
2016-10-05 12:40:29 +00:00
eventDriver . sockets . releaseRef ( m_socket ) ;
2016-03-01 19:30:42 +00:00
}
2016-11-02 19:58:00 +00:00
bool opCast ( T ) ( ) const nothrow if ( is ( T = = bool ) ) { return m_socket ! = StreamSocketFD . invalid ; }
2017-01-29 19:19:38 +00:00
@property void tcpNoDelay ( bool enabled ) nothrow { eventDriver . sockets . setTCPNoDelay ( m_socket , enabled ) ; m_context . tcpNoDelay = enabled ; }
@property bool tcpNoDelay ( ) const nothrow { return m_context . tcpNoDelay ; }
@property void keepAlive ( bool enabled ) nothrow { eventDriver . sockets . setKeepAlive ( m_socket , enabled ) ; m_context . keepAlive = enabled ; }
@property bool keepAlive ( ) const nothrow { return m_context . keepAlive ; }
2017-01-15 19:59:36 +00:00
@property void readTimeout ( Duration duration ) { m_context . readTimeout = duration ; }
2017-01-29 19:19:38 +00:00
@property Duration readTimeout ( ) const nothrow { return m_context . readTimeout ; }
2017-03-10 20:06:18 +00:00
@property string peerAddress ( ) const nothrow { return this . remoteAddress . toString ( ) ; }
@property NetworkAddress localAddress ( ) const nothrow {
NetworkAddress naddr ;
scope addr = new RefAddress ( naddr . sockAddr , naddr . sockAddrMaxLen ) ;
2017-07-19 12:50:20 +00:00
if ( ! eventDriver . sockets . getLocalAddress ( m_socket , addr ) )
logWarn ( "Failed to get local address for TCP connection" ) ;
2017-03-10 20:06:18 +00:00
return naddr ;
}
@property NetworkAddress remoteAddress ( ) const nothrow {
NetworkAddress naddr ;
scope addr = new RefAddress ( naddr . sockAddr , naddr . sockAddrMaxLen ) ;
2017-07-19 12:50:20 +00:00
if ( ! eventDriver . sockets . getRemoteAddress ( m_socket , addr ) )
logWarn ( "Failed to get remote address for TCP connection" ) ;
2017-03-10 20:06:18 +00:00
return naddr ;
}
2016-03-01 19:30:42 +00:00
@property bool connected ( )
2017-01-29 19:19:38 +00:00
const nothrow {
2016-03-01 19:30:42 +00:00
if ( m_socket = = StreamSocketFD . invalid ) return false ;
2016-10-05 12:40:29 +00:00
auto s = eventDriver . sockets . getConnectionState ( m_socket ) ;
2016-03-01 19:30:42 +00:00
return s > = ConnectionState . connected & & s < ConnectionState . activeClose ;
}
@property bool empty ( ) { return leastSize = = 0 ; }
2017-01-30 11:04:21 +00:00
@property ulong leastSize ( ) { waitForData ( ) ; return m_context & & m_context . readBuffer . length ; }
2016-03-01 19:30:42 +00:00
@property bool dataAvailableForRead ( ) { return waitForData ( 0. seconds ) ; }
2017-07-18 09:55:39 +00:00
2016-03-01 19:30:42 +00:00
void close ( )
nothrow {
//logInfo("close %s", cast(int)m_fd);
if ( m_socket ! = StreamSocketFD . invalid ) {
2016-10-24 06:22:37 +00:00
eventDriver . sockets . shutdown ( m_socket , true , true ) ;
2016-10-05 12:40:29 +00:00
eventDriver . sockets . releaseRef ( m_socket ) ;
2016-03-01 19:30:42 +00:00
m_socket = StreamSocketFD . invalid ;
m_context = null ;
}
}
2017-07-18 09:55:39 +00:00
2016-03-01 19:30:42 +00:00
bool waitForData ( Duration timeout = Duration . max )
{
mixin ( tracer ) ;
2017-01-30 11:04:21 +00:00
if ( ! m_context ) return false ;
2016-03-01 19:30:42 +00:00
if ( m_context . readBuffer . length > 0 ) return true ;
auto mode = timeout < = 0. seconds ? IOMode . immediate : IOMode . once ;
2017-01-15 19:59:36 +00:00
2017-01-15 23:23:37 +00:00
Waitable ! ( IOCallback ,
2016-10-05 12:40:29 +00:00
cb = > eventDriver . sockets . read ( m_socket , m_context . readBuffer . peekDst ( ) , mode , cb ) ,
2017-01-15 23:23:37 +00:00
cb = > eventDriver . sockets . cancelRead ( m_socket )
2017-01-15 19:59:36 +00:00
) waiter ;
asyncAwaitAny ! true ( timeout , waiter ) ;
if ( waiter . cancelled ) return false ;
logTrace ( "Socket %s, read %s bytes: %s" , waiter . results [ 0 ] , waiter . results [ 2 ] , waiter . results [ 1 ] ) ;
2016-03-01 19:30:42 +00:00
assert ( m_context . readBuffer . length = = 0 ) ;
2017-01-15 19:59:36 +00:00
m_context . readBuffer . putN ( waiter . results [ 2 ] ) ;
switch ( waiter . results [ 1 ] ) {
2016-03-01 19:30:42 +00:00
default :
2017-01-30 10:29:01 +00:00
logDebug ( "Error status when waiting for data: %s" , waiter . results [ 1 ] ) ;
break ;
2016-03-01 19:30:42 +00:00
case IOStatus . ok : break ;
case IOStatus . wouldBlock : assert ( mode = = IOMode . immediate ) ; break ;
case IOStatus . disconnected : break ;
}
return m_context . readBuffer . length > 0 ;
}
2017-01-30 16:27:43 +00:00
const ( ubyte ) [ ] peek ( ) { return m_context ? m_context . readBuffer . peek ( ) : null ; }
2016-03-01 19:30:42 +00:00
void skip ( ulong count )
{
import std.algorithm.comparison : min ;
2017-01-15 19:59:36 +00:00
m_context . readTimeout . loopWithTimeout ! ( ( remaining ) {
waitForData ( remaining ) ;
2016-03-01 19:30:42 +00:00
auto n = min ( count , m_context . readBuffer . length ) ;
m_context . readBuffer . popFrontN ( n ) ;
count - = n ;
2017-01-15 19:59:36 +00:00
return count = = 0 ;
} ) ;
2016-03-01 19:30:42 +00:00
}
2017-01-18 23:36:32 +00:00
size_t read ( scope ubyte [ ] dst , IOMode mode )
2016-03-01 19:30:42 +00:00
{
mixin ( tracer ) ;
import std.algorithm.comparison : min ;
2017-01-18 23:36:32 +00:00
if ( ! dst . length ) return 0 ;
size_t nbytes = 0 ;
2017-01-15 19:59:36 +00:00
m_context . readTimeout . loopWithTimeout ! ( ( remaining ) {
2017-01-18 23:36:32 +00:00
if ( m_context . readBuffer . length = = 0 ) {
if ( mode = = IOMode . immediate | | mode = = IOMode . once & & nbytes > 0 )
return true ;
enforce ( waitForData ( remaining ) , "Reached end of stream while reading data." ) ;
}
2016-03-01 19:30:42 +00:00
assert ( m_context . readBuffer . length > 0 ) ;
auto l = min ( dst . length , m_context . readBuffer . length ) ;
m_context . readBuffer . read ( dst [ 0 . . l ] ) ;
dst = dst [ l . . $ ] ;
2017-01-18 23:36:32 +00:00
nbytes + = l ;
2017-01-15 19:59:36 +00:00
return dst . length = = 0 ;
} ) ;
2017-01-18 23:36:32 +00:00
return nbytes ;
2016-03-01 19:30:42 +00:00
}
2017-01-18 23:36:32 +00:00
void read ( scope ubyte [ ] dst ) { auto r = read ( dst , IOMode . all ) ; assert ( r = = dst . length ) ; }
size_t write ( in ubyte [ ] bytes , IOMode mode )
2016-03-01 19:30:42 +00:00
{
mixin ( tracer ) ;
2017-01-18 23:36:32 +00:00
if ( bytes . length = = 0 ) return 0 ;
2016-03-01 19:30:42 +00:00
2016-06-14 06:01:03 +00:00
auto res = asyncAwait ! ( IOCallback ,
2017-01-18 23:36:32 +00:00
cb = > eventDriver . sockets . write ( m_socket , bytes , mode , cb ) ,
2016-10-05 12:40:29 +00:00
cb = > eventDriver . sockets . cancelWrite ( m_socket ) ) ;
2017-07-18 09:55:39 +00:00
2016-03-01 19:30:42 +00:00
switch ( res [ 1 ] ) {
default :
throw new Exception ( "Error writing data to socket." ) ;
case IOStatus . ok : break ;
case IOStatus . disconnected : break ;
}
2017-01-18 23:36:32 +00:00
return res [ 2 ] ;
2016-03-01 19:30:42 +00:00
}
2017-01-18 23:36:32 +00:00
void write ( in ubyte [ ] bytes ) { auto r = write ( bytes , IOMode . all ) ; assert ( r = = bytes . length ) ; }
2016-11-02 19:58:00 +00:00
void write ( in char [ ] bytes ) { write ( cast ( const ( ubyte ) [ ] ) bytes ) ; }
void write ( InputStream stream ) { write ( stream , 0 ) ; }
2016-03-01 19:30:42 +00:00
void flush ( ) {
mixin ( tracer ) ;
}
void finalize ( ) { }
2016-06-18 08:00:02 +00:00
void write ( InputStream ) ( InputStream stream , ulong nbytes = 0 ) if ( isInputStream ! InputStream ) { writeDefault ( stream , nbytes ) ; }
2016-03-01 19:30:42 +00:00
private void writeDefault ( InputStream ) ( InputStream stream , ulong nbytes = 0 )
2016-06-18 08:00:02 +00:00
if ( isInputStream ! InputStream )
2016-03-01 19:30:42 +00:00
{
import std.algorithm.comparison : min ;
static struct Buffer { ubyte [ 64 * 1024 - 4 * size_t . sizeof ] bytes = void ; }
scope bufferobj = new Buffer ; // FIXME: use heap allocation
auto buffer = bufferobj . bytes [ ] ;
//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
if ( nbytes = = 0 ) {
while ( ! stream . empty ) {
size_t chunk = min ( stream . leastSize , buffer . length ) ;
assert ( chunk > 0 , "leastSize returned zero for non-empty stream." ) ;
//logTrace("read pipe chunk %d", chunk);
stream . read ( buffer [ 0 . . chunk ] ) ;
write ( buffer [ 0 . . chunk ] ) ;
}
} else {
while ( nbytes > 0 ) {
size_t chunk = min ( nbytes , buffer . length ) ;
//logTrace("read pipe chunk %d", chunk);
stream . read ( buffer [ 0 . . chunk ] ) ;
write ( buffer [ 0 . . chunk ] ) ;
nbytes - = chunk ;
}
}
}
}
2016-10-26 11:11:28 +00:00
mixin validateConnectionStream ! TCPConnection ;
2017-01-15 19:59:36 +00:00
private void loopWithTimeout ( alias LoopBody , ExceptionType = Exception ) ( Duration timeout )
{
import core.time : seconds ;
import std.datetime : Clock , SysTime , UTC ;
SysTime now ;
if ( timeout ! = Duration . max )
now = Clock . currTime ( UTC ( ) ) ;
do {
if ( LoopBody ( timeout ) )
return ;
2017-07-18 09:55:39 +00:00
2017-01-15 19:59:36 +00:00
if ( timeout ! = Duration . max ) {
auto prev = now ;
now = Clock . currTime ( UTC ( ) ) ;
if ( now > prev ) timeout - = now - prev ;
}
} while ( timeout > 0. seconds ) ;
throw new ExceptionType ( "Operation timed out." ) ;
}
2016-03-01 19:30:42 +00:00
/ * *
Represents a listening TCP socket .
* /
struct TCPListener {
2017-07-18 09:55:39 +00:00
// FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking
// the previous behavior of keeping the socket alive when the listener isn't stored. At the same time,
// stopListening() needs to keep working.
2016-03-01 19:30:42 +00:00
private {
StreamListenSocketFD m_socket ;
2017-01-15 19:59:36 +00:00
NetworkAddress m_bindAddress ;
2016-03-01 19:30:42 +00:00
}
this ( StreamListenSocketFD socket )
{
m_socket = socket ;
}
2016-11-02 19:58:00 +00:00
bool opCast ( T ) ( ) const nothrow if ( is ( T = = bool ) ) { return m_socket ! = StreamListenSocketFD . invalid ; }
2016-03-01 19:30:42 +00:00
/// The local address at which TCP connections are accepted.
@property NetworkAddress bindAddress ( )
{
2017-01-15 19:59:36 +00:00
return m_bindAddress ;
2016-03-01 19:30:42 +00:00
}
/// Stops listening and closes the socket.
void stopListening ( )
{
2017-07-18 09:55:39 +00:00
if ( m_socket ! = StreamListenSocketFD . invalid ) {
eventDriver . sockets . releaseRef ( m_socket ) ;
m_socket = StreamListenSocketFD . invalid ;
}
2016-03-01 19:30:42 +00:00
}
}
/ * *
Represents a bound and possibly ' connected ' UDP socket .
* /
struct UDPConnection {
2017-01-15 21:19:41 +00:00
static struct Context {
bool canBroadcast ;
}
2016-10-24 22:27:51 +00:00
private {
DatagramSocketFD m_socket ;
2017-01-15 21:19:41 +00:00
Context * m_context ;
2016-10-24 22:27:51 +00:00
}
2017-07-18 09:55:39 +00:00
private this ( ref NetworkAddress bind_address )
2016-10-24 22:27:51 +00:00
{
2017-01-30 10:40:11 +00:00
scope baddr = new RefAddress ( bind_address . sockAddr , bind_address . sockAddrLen ) ;
m_socket = eventDriver . sockets . createDatagramSocket ( baddr , null ) ;
2017-01-30 10:19:51 +00:00
enforce ( m_socket ! = DatagramSocketFD . invalid , "Failed to create datagram socket." ) ;
2017-06-26 23:11:50 +00:00
m_context = ( ) @trusted { return & eventDriver . sockets . userData ! Context ( m_socket ) ; } ( ) ;
2016-10-24 22:27:51 +00:00
}
this ( this )
nothrow {
if ( m_socket ! = StreamSocketFD . invalid )
eventDriver . sockets . addRef ( m_socket ) ;
}
~ this ( )
nothrow {
if ( m_socket ! = StreamSocketFD . invalid )
eventDriver . sockets . releaseRef ( m_socket ) ;
}
2016-11-02 19:58:00 +00:00
bool opCast ( T ) ( ) const nothrow if ( is ( T = = bool ) ) { return m_socket ! = DatagramSocketFD . invalid ; }
2016-10-24 22:27:51 +00:00
2016-03-01 19:30:42 +00:00
/ * * Returns the address to which the UDP socket is bound .
* /
2017-01-15 21:19:41 +00:00
@property string bindAddress ( ) const { return localAddress . toString ( ) ; }
2016-03-01 19:30:42 +00:00
/ * * Determines if the socket is allowed to send to broadcast addresses .
* /
2017-01-15 21:19:41 +00:00
@property bool canBroadcast ( ) const { return m_context . canBroadcast ; }
2016-03-01 19:30:42 +00:00
/// ditto
2017-01-15 21:19:41 +00:00
@property void canBroadcast ( bool val ) { enforce ( eventDriver . sockets . setBroadcast ( m_socket , val ) , "Failed to set UDP broadcast flag." ) ; m_context . canBroadcast = val ; }
2016-03-01 19:30:42 +00:00
/// The local/bind address of the underlying socket.
2017-03-10 22:00:43 +00:00
@property NetworkAddress localAddress ( ) const nothrow {
NetworkAddress naddr ;
scope addr = new RefAddress ( naddr . sockAddr , naddr . sockAddrMaxLen ) ;
try {
enforce ( eventDriver . sockets . getLocalAddress ( m_socket , addr ) , "Failed to query socket address." ) ;
} catch ( Exception e ) { logWarn ( "Failed to get local address for TCP connection: %s" , e . msg ) ; }
return naddr ;
}
2016-03-01 19:30:42 +00:00
2017-07-08 08:44:42 +00:00
/ * * Set IP multicast loopback mode .
This is on by default . All packets send will also loopback if enabled .
Useful if more than one application is running on same host and both need each other ' s packets .
* /
@property void multicastLoopback ( bool loop )
{
2017-09-03 14:47:37 +00:00
enforce ( eventDriver . sockets . setOption ( m_socket , DatagramSocketOption . multicastLoopback , loop ) ,
"Failed to set multicast loopback mode." ) ;
2017-07-08 08:44:42 +00:00
}
/ * * Become a member of an IP multicast group .
The multiaddr parameter should be in the range 239.0 . 0 . 0 - 239.255 . 255 . 255 .
See https : //www.iana.org/assignments/multicast-addresses/multicast-addresses.xml#multicast-addresses-12
and https : //www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml
* /
2017-09-15 13:54:59 +00:00
void addMembership ( ref NetworkAddress multiaddr , uint interface_address = 0 )
2017-07-08 08:44:42 +00:00
{
2017-09-03 14:47:37 +00:00
scope addr = new RefAddress ( multiaddr . sockAddr , multiaddr . sockAddrMaxLen ) ;
2017-09-15 13:54:59 +00:00
enforce ( eventDriver . sockets . joinMulticastGroup ( m_socket , addr , interface_address ) ,
2017-09-03 14:47:37 +00:00
"Failed to add multicast membership." ) ;
2017-07-08 08:44:42 +00:00
}
2016-03-01 19:30:42 +00:00
/ * * Stops listening for datagrams and frees all resources .
* /
2017-01-15 21:19:41 +00:00
void close ( ) { eventDriver . sockets . releaseRef ( m_socket ) ; m_socket = DatagramSocketFD . init ; }
2016-03-01 19:30:42 +00:00
/ * * Locks the UDP connection to a certain peer .
Once connected , the UDPConnection can only communicate with the specified peer .
Otherwise communication with any reachable peer is possible .
* /
2017-01-15 21:19:41 +00:00
void connect ( string host , ushort port ) { connect ( resolveHost ( host , port ) ) ; }
2016-03-01 19:30:42 +00:00
/// ditto
2017-03-10 22:00:43 +00:00
void connect ( NetworkAddress address )
{
scope addr = new RefAddress ( address . sockAddr , address . sockAddrLen ) ;
eventDriver . sockets . setTargetAddress ( m_socket , addr ) ;
}
2016-03-01 19:30:42 +00:00
/ * * Sends a single packet .
If peer_address is given , the packet is send to that address . Otherwise the packet
will be sent to the address specified by a call to connect ( ) .
* /
2017-01-15 21:19:41 +00:00
void send ( in ubyte [ ] data , in NetworkAddress * peer_address = null )
{
2017-03-10 22:00:43 +00:00
scope addrc = new RefAddress ;
if ( peer_address )
addrc . set ( ( ) @trusted { return ( cast ( NetworkAddress * ) peer_address ) . sockAddr ; } ( ) , peer_address . sockAddrLen ) ;
2017-01-15 21:19:41 +00:00
2017-01-15 23:20:35 +00:00
IOStatus status ;
size_t nbytes ;
2017-01-15 23:23:37 +00:00
Waitable ! ( DatagramIOCallback ,
2017-03-10 22:00:43 +00:00
cb = > eventDriver . sockets . send ( m_socket , data , IOMode . once , peer_address ? addrc : null , cb ) ,
2017-01-15 23:20:35 +00:00
cb = > eventDriver . sockets . cancelSend ( m_socket ) ,
( DatagramSocketFD , IOStatus status_ , size_t nbytes_ , scope RefAddress addr )
{
status = status_ ;
nbytes = nbytes_ ;
}
) waitable ;
asyncAwaitAny ! true ( waitable ) ;
enforce ( ! waitable . cancelled & & status = = IOStatus . ok , "Failed to send packet." ) ;
enforce ( nbytes = = data . length , "Packet was only sent partially." ) ;
2016-10-24 22:27:51 +00:00
}
2016-03-01 19:30:42 +00:00
/ * * Receives a single packet .
If a buffer is given , it must be large enough to hold the full packet .
The timeout overload will throw an Exception if no data arrives before the
specified duration has elapsed .
* /
2016-10-24 22:27:51 +00:00
ubyte [ ] recv ( ubyte [ ] buf = null , NetworkAddress * peer_address = null )
{
return recv ( Duration . max , buf , peer_address ) ;
}
2016-03-01 19:30:42 +00:00
/// ditto
2016-10-24 22:27:51 +00:00
ubyte [ ] recv ( Duration timeout , ubyte [ ] buf = null , NetworkAddress * peer_address = null )
{
import std.socket : Address ;
if ( buf . length = = 0 ) buf = new ubyte [ 65536 ] ;
2017-01-15 23:20:35 +00:00
IOStatus status ;
size_t nbytes ;
2017-01-15 23:23:37 +00:00
Waitable ! ( DatagramIOCallback ,
2016-10-24 22:27:51 +00:00
cb = > eventDriver . sockets . receive ( m_socket , buf , IOMode . once , cb ) ,
2017-01-15 23:20:35 +00:00
cb = > eventDriver . sockets . cancelReceive ( m_socket ) ,
( DatagramSocketFD , IOStatus status_ , size_t nbytes_ , scope RefAddress addr )
{
status = status_ ;
nbytes = nbytes_ ;
if ( status_ = = IOStatus . ok & & peer_address ) {
try * peer_address = NetworkAddress ( addr ) ;
catch ( Exception e ) logWarn ( "Failed to store datagram source address: %s" , e . msg ) ;
}
}
) waitable ;
asyncAwaitAny ! true ( timeout , waitable ) ;
enforce ( ! waitable . cancelled , "Receive timeout." ) ;
enforce ( status = = IOStatus . ok , "Failed to receive packet." ) ;
return buf [ 0 . . nbytes ] ;
2016-10-24 22:27:51 +00:00
}
2016-03-01 19:30:42 +00:00
}
/ * *
Flags to control the behavior of listenTCP .
* /
enum TCPListenOptions {
/// Don't enable any particular option
defaults = 0 ,
2017-07-20 16:48:08 +00:00
/// Deprecated: causes incoming connections to be distributed across the thread pool
2016-03-01 19:30:42 +00:00
distribute = 1 < < 0 ,
/// Disables automatic closing of the connection when the connection callback exits
disableAutoClose = 1 < < 1 ,
2016-11-02 19:58:00 +00:00
/ * * Enable port reuse on linux kernel version > = 3.9 , do nothing on other OS
Does not affect libasync driver because it is always enabled by libasync .
* /
reusePort = 1 < < 2 ,
2016-03-01 19:30:42 +00:00
}
private pure nothrow {
import std.bitmanip ;
ushort ntoh ( ushort val )
{
version ( LittleEndian ) return swapEndian ( val ) ;
else version ( BigEndian ) return val ;
else static assert ( false , "Unknown endianness." ) ;
}
ushort hton ( ushort val )
{
version ( LittleEndian ) return swapEndian ( val ) ;
else version ( BigEndian ) return val ;
else static assert ( false , "Unknown endianness." ) ;
}
}
2016-03-11 07:21:51 +00:00
private enum tracer = "" ;