diff --git a/source/eventcore/internal/consumablequeue.d b/source/eventcore/internal/consumablequeue.d index ac381d7..6cf06fc 100644 --- a/source/eventcore/internal/consumablequeue.d +++ b/source/eventcore/internal/consumablequeue.d @@ -52,6 +52,15 @@ class ConsumableQueue(T) m_capacityMask = new_capacity_mask; } + void removePending(T item) + { + foreach (i; 0 .. m_pendingCount) + if (getPendingAt(i) == item) { + getPendingAt(i) = getPendingAt(m_pendingCount-1); + m_pendingCount--; + } + } + /** Consumes all elements of the queue and returns a range containing the consumed elements. @@ -60,9 +69,9 @@ class ConsumableQueue(T) */ ConsumedRange consume() @safe { - auto first = m_first; + auto first = m_first + m_consumedCount; auto count = m_pendingCount; - m_first += count; + m_consumedCount += count; m_pendingCount = 0; return ConsumedRange(this, first, count); } @@ -73,6 +82,7 @@ class ConsumableQueue(T) auto ret = m_storage[(m_first + m_consumedCount) & m_capacityMask].value; if (m_consumedCount) m_consumedCount++; else m_first = (m_first + 1) & m_capacityMask; + m_pendingCount--; return ret; } @@ -87,20 +97,24 @@ class ConsumableQueue(T) this(ConsumableQueue queue, size_t first, size_t count) { - m_queue = queue; - m_queue.m_storage[first].rc++; - m_first = first; - m_count = count; + if (m_count) { + m_queue = queue; + m_first = first; + m_count = count; + m_queue.m_storage[first].rc++; + } } this(this) { - m_queue.m_storage[m_first].rc++; + if (m_count) + m_queue.m_storage[m_first].rc++; } ~this() { - m_queue.consumed(m_first, false); + if (m_count) + m_queue.consumed(m_first, false); } @property ConsumedRange save() { return this; } @@ -140,12 +154,18 @@ class ConsumableQueue(T) } else { m_storage[first].rc--; if (first == m_first) - while (!m_storage[m_first].rc) { + while (m_consumedCount > 0 && !m_storage[m_first].rc) { m_first++; m_consumedCount--; } } } + + private ref T getPendingAt(size_t idx) + { + assert(idx < m_pendingCount, "Pending item index out of bounds."); + return m_storage[(m_first + m_consumedCount + idx) & m_capacityMask].value; + } } /// @@ -156,26 +176,36 @@ unittest { q.put(1); q.put(2); q.put(3); + assert(q.m_consumedCount == 0 && q.m_pendingCount == 3); auto r1 = q.consume; assert(r1.length == 3); + assert(q.m_consumedCount == 3 && q.m_pendingCount == 0); q.put(4); q.put(5); + assert(q.m_consumedCount == 3 && q.m_pendingCount == 2); auto r2 = q.consume; assert(r2.length == 2); + assert(q.m_consumedCount == 5 && q.m_pendingCount == 0); q.put(6); + assert(q.m_consumedCount == 5 && q.m_pendingCount == 1); auto r3 = r1.save; assert(r3.length == 3); - assert(r2.equal([4, 5])); - assert(r1.equal([1, 2, 3])); - assert(r3.equal([1, 2, 3])); - assert(q.m_consumedCount == 0); + assert(q.m_consumedCount == 5 && q.m_pendingCount == 1); + assert((&r2).equal([4, 5])); + assert(q.m_consumedCount == 5 && q.m_pendingCount == 1); + assert((&r1).equal([1, 2, 3])); + assert(q.m_consumedCount == 5 && q.m_pendingCount == 1); + assert((&r3).equal([1, 2, 3])); + assert(q.m_consumedCount == 0 && q.m_pendingCount == 1); + assert(q.length == 1); - assert(q.consume.equal([6])); + assert(q.consumeOne == 6); assert(q.length == 0); + assert(q.m_consumedCount == 0); }