From d06ad74ee32cf426f5b75bb73893c50701e07034 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 2 Nov 2016 20:23:57 +0100 Subject: [PATCH] Fix and safe-ify ConnectionPool. --- source/vibe/core/connectionpool.d | 55 ++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/source/vibe/core/connectionpool.d b/source/vibe/core/connectionpool.d index d0dec81..cf7adee 100644 --- a/source/vibe/core/connectionpool.d +++ b/source/vibe/core/connectionpool.d @@ -11,7 +11,7 @@ import vibe.core.log; import core.thread; import vibe.core.sync; -//import vibe.utils.memory; +import vibe.internal.memory; /** Generic connection pool class. @@ -30,18 +30,18 @@ import vibe.core.sync; class ConnectionPool(Connection) { private { - Connection delegate() m_connectionFactory; + Connection delegate() @safe m_connectionFactory; Connection[] m_connections; int[const(Connection)] m_lockCount; FreeListRef!LocalTaskSemaphore m_sem; debug Thread m_thread; } - this(Connection delegate() connection_factory, uint max_concurrent = uint.max) + this(Connection delegate() @safe connection_factory, uint max_concurrent = uint.max) { m_connectionFactory = connection_factory; - m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); - debug m_thread = Thread.getThis(); + () @trusted { m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); } (); + debug m_thread = () @trusted { return Thread.getThis(); } (); } /** Determines the maximum number of concurrently open connections. @@ -64,10 +64,10 @@ class ConnectionPool(Connection) to determine when to unlock the connection. */ LockedConnection!Connection lockConnection() - { - debug assert(m_thread is Thread.getThis(), "ConnectionPool was called from a foreign thread!"); + @safe { + debug assert(m_thread is () @trusted { return Thread.getThis(); } (), "ConnectionPool was called from a foreign thread!"); - m_sem.lock(); + () @trusted { m_sem.lock(); } (); size_t cidx = size_t.max; foreach( i, c; m_connections ){ auto plc = c in m_lockCount; @@ -84,7 +84,7 @@ class ConnectionPool(Connection) } else { logDebug("creating new %s connection, all %d are in use", Connection.stringof, m_connections.length); conn = m_connectionFactory(); // NOTE: may block - logDebug(" ... %s", cast(void*)conn); + logDebug(" ... %s", () @trusted { return cast(void*)conn; } ()); } m_lockCount[conn] = 1; if( cidx == size_t.max ){ @@ -96,7 +96,38 @@ class ConnectionPool(Connection) } } +/// +unittest { + class Connection { + void write() {} + } + + auto pool = ConnectionPool!Connection({ + return new Connection; // perform the connection here + }); + + // create and lock a first connection + auto c1 = pool.lockConnection(); + c1.write(); + + // create and lock a second connection + auto c2 = pool.lockConnection(); + c2.write(); + + // writing to c1 will still write to the first connection + c1.write(); + + // free up the reference to the first connection, so that it can be reused + destroy(c1); + + // locking a new connection will reuse the first connection now instead of creating a new one + auto c3 = pool.lockConnection(); + c3.write(); +} + struct LockedConnection(Connection) { + import vibe.core.task : Task; + private { ConnectionPool!Connection m_pool; Task m_task; @@ -104,6 +135,8 @@ struct LockedConnection(Connection) { debug uint m_magic = 0xB1345AC2; } + @safe: + private this(ConnectionPool!Connection pool, Connection conn) { assert(conn !is null); @@ -119,7 +152,7 @@ struct LockedConnection(Connection) { auto fthis = Task.getThis(); assert(fthis is m_task); m_pool.m_lockCount[m_conn]++; - logTrace("conn %s copy %d", cast(void*)m_conn, m_pool.m_lockCount[m_conn]); + logTrace("conn %s copy %d", () @trusted { return cast(void*)m_conn; } (), m_pool.m_lockCount[m_conn]); } } @@ -134,7 +167,7 @@ struct LockedConnection(Connection) { assert(*plc >= 1); //logTrace("conn %s destroy %d", cast(void*)m_conn, *plc-1); if( --*plc == 0 ){ - m_pool.m_sem.unlock(); + () @trusted { m_pool.m_sem.unlock(); } (); //logTrace("conn %s release", cast(void*)m_conn); } m_conn = null;