Fix and safe-ify ConnectionPool.

This commit is contained in:
Sönke Ludwig 2016-11-02 20:23:57 +01:00
parent 0ee42c4243
commit d06ad74ee3

View file

@ -11,7 +11,7 @@ import vibe.core.log;
import core.thread; import core.thread;
import vibe.core.sync; import vibe.core.sync;
//import vibe.utils.memory; import vibe.internal.memory;
/** /**
Generic connection pool class. Generic connection pool class.
@ -30,18 +30,18 @@ import vibe.core.sync;
class ConnectionPool(Connection) class ConnectionPool(Connection)
{ {
private { private {
Connection delegate() m_connectionFactory; Connection delegate() @safe m_connectionFactory;
Connection[] m_connections; Connection[] m_connections;
int[const(Connection)] m_lockCount; int[const(Connection)] m_lockCount;
FreeListRef!LocalTaskSemaphore m_sem; FreeListRef!LocalTaskSemaphore m_sem;
debug Thread m_thread; debug Thread m_thread;
} }
this(Connection delegate() connection_factory, uint max_concurrent = uint.max) this(Connection delegate() @safe connection_factory, uint max_concurrent = uint.max)
{ {
m_connectionFactory = connection_factory; m_connectionFactory = connection_factory;
m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); () @trusted { m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); } ();
debug m_thread = Thread.getThis(); debug m_thread = () @trusted { return Thread.getThis(); } ();
} }
/** Determines the maximum number of concurrently open connections. /** Determines the maximum number of concurrently open connections.
@ -64,10 +64,10 @@ class ConnectionPool(Connection)
to determine when to unlock the connection. to determine when to unlock the connection.
*/ */
LockedConnection!Connection lockConnection() LockedConnection!Connection lockConnection()
{ @safe {
debug assert(m_thread is Thread.getThis(), "ConnectionPool was called from a foreign thread!"); debug assert(m_thread is () @trusted { return Thread.getThis(); } (), "ConnectionPool was called from a foreign thread!");
m_sem.lock(); () @trusted { m_sem.lock(); } ();
size_t cidx = size_t.max; size_t cidx = size_t.max;
foreach( i, c; m_connections ){ foreach( i, c; m_connections ){
auto plc = c in m_lockCount; auto plc = c in m_lockCount;
@ -84,7 +84,7 @@ class ConnectionPool(Connection)
} else { } else {
logDebug("creating new %s connection, all %d are in use", Connection.stringof, m_connections.length); logDebug("creating new %s connection, all %d are in use", Connection.stringof, m_connections.length);
conn = m_connectionFactory(); // NOTE: may block conn = m_connectionFactory(); // NOTE: may block
logDebug(" ... %s", cast(void*)conn); logDebug(" ... %s", () @trusted { return cast(void*)conn; } ());
} }
m_lockCount[conn] = 1; m_lockCount[conn] = 1;
if( cidx == size_t.max ){ if( cidx == size_t.max ){
@ -96,7 +96,38 @@ class ConnectionPool(Connection)
} }
} }
///
unittest {
class Connection {
void write() {}
}
auto pool = ConnectionPool!Connection({
return new Connection; // perform the connection here
});
// create and lock a first connection
auto c1 = pool.lockConnection();
c1.write();
// create and lock a second connection
auto c2 = pool.lockConnection();
c2.write();
// writing to c1 will still write to the first connection
c1.write();
// free up the reference to the first connection, so that it can be reused
destroy(c1);
// locking a new connection will reuse the first connection now instead of creating a new one
auto c3 = pool.lockConnection();
c3.write();
}
struct LockedConnection(Connection) { struct LockedConnection(Connection) {
import vibe.core.task : Task;
private { private {
ConnectionPool!Connection m_pool; ConnectionPool!Connection m_pool;
Task m_task; Task m_task;
@ -104,6 +135,8 @@ struct LockedConnection(Connection) {
debug uint m_magic = 0xB1345AC2; debug uint m_magic = 0xB1345AC2;
} }
@safe:
private this(ConnectionPool!Connection pool, Connection conn) private this(ConnectionPool!Connection pool, Connection conn)
{ {
assert(conn !is null); assert(conn !is null);
@ -119,7 +152,7 @@ struct LockedConnection(Connection) {
auto fthis = Task.getThis(); auto fthis = Task.getThis();
assert(fthis is m_task); assert(fthis is m_task);
m_pool.m_lockCount[m_conn]++; m_pool.m_lockCount[m_conn]++;
logTrace("conn %s copy %d", cast(void*)m_conn, m_pool.m_lockCount[m_conn]); logTrace("conn %s copy %d", () @trusted { return cast(void*)m_conn; } (), m_pool.m_lockCount[m_conn]);
} }
} }
@ -134,7 +167,7 @@ struct LockedConnection(Connection) {
assert(*plc >= 1); assert(*plc >= 1);
//logTrace("conn %s destroy %d", cast(void*)m_conn, *plc-1); //logTrace("conn %s destroy %d", cast(void*)m_conn, *plc-1);
if( --*plc == 0 ){ if( --*plc == 0 ){
m_pool.m_sem.unlock(); () @trusted { m_pool.m_sem.unlock(); } ();
//logTrace("conn %s release", cast(void*)m_conn); //logTrace("conn %s release", cast(void*)m_conn);
} }
m_conn = null; m_conn = null;