Fix LocalTaskSemaphore.

Fixes a potential hang, potential over-use of lock slots and only needs a single LocalManualEvent now instead of one created for each wait.
This commit is contained in:
Sönke Ludwig 2017-01-30 10:07:06 +01:00
parent 76df9212ef
commit 8f6c4dd536
No known key found for this signature in database
GPG key ID: D95E8DB493EE314C

View file

@ -147,7 +147,6 @@ class LocalTaskSemaphore
private { private {
static struct ThreadWaiter { static struct ThreadWaiter {
LocalManualEvent signal;
ubyte priority; ubyte priority;
uint seq; uint seq;
} }
@ -156,11 +155,13 @@ class LocalTaskSemaphore
uint m_maxLocks; uint m_maxLocks;
uint m_locks; uint m_locks;
uint m_seq; uint m_seq;
LocalManualEvent m_signal;
} }
this(uint max_locks) this(uint max_locks)
{ {
m_maxLocks = max_locks; m_maxLocks = max_locks;
m_signal = createManualEvent();
} }
/// Maximum number of concurrent locks /// Maximum number of concurrent locks
@ -192,7 +193,7 @@ class LocalTaskSemaphore
/** Acquires a lock. /** Acquires a lock.
Once the limit of concurrent locks is reaced, this method will block Once the limit of concurrent locks is reached, this method will block
until the number of locks drops below the limit. until the number of locks drops below the limit.
*/ */
void lock(ubyte priority = 0) void lock(ubyte priority = 0)
@ -203,43 +204,38 @@ class LocalTaskSemaphore
return; return;
ThreadWaiter w; ThreadWaiter w;
w.signal = createManualEvent();
w.priority = priority; w.priority = priority;
w.seq = min(0, m_seq - w.priority); w.seq = min(0, m_seq - w.priority);
if (++m_seq == uint.max) if (++m_seq == uint.max)
rewindSeq(); rewindSeq();
() @trusted { m_waiters.insert(w); } (); () @trusted { m_waiters.insert(w); } ();
do w.signal.wait(); while (!tryLock());
// on resume: while (true) {
destroy(w.signal); m_signal.waitUninterruptible();
if (m_waiters.front.seq == w.seq && tryLock()) {
m_locks++;
return;
}
}
} }
/** Gives up an existing lock. /** Gives up an existing lock.
*/ */
void unlock() void unlock()
{ {
assert(m_locks >= 1);
m_locks--; m_locks--;
if (m_waiters.length > 0 && available > 0) { if (m_waiters.length > 0)
ThreadWaiter w = m_waiters.front(); m_signal.emit(); // resume one
w.signal.emit(); // resume one
() @trusted { m_waiters.removeFront(); } ();
}
} }
// if true, a goes after b. ie. b comes out front() // if true, a goes after b. ie. b comes out front()
/// private /// private
static bool asc(ref ThreadWaiter a, ref ThreadWaiter b) static bool asc(ref ThreadWaiter a, ref ThreadWaiter b)
{ {
if (a.seq == b.seq) { if (a.priority != b.priority)
if (a.priority == b.priority) {
// resolve using the pointer address
return (cast(size_t)&a.signal) > (cast(size_t) &b.signal);
}
// resolve using priority
return a.priority < b.priority; return a.priority < b.priority;
}
// resolve using seq number
return a.seq > b.seq; return a.seq > b.seq;
} }