diff --git a/source/eventcore/internal/consumablequeue.d b/source/eventcore/internal/consumablequeue.d index 6bf8e3f..8018ae4 100644 --- a/source/eventcore/internal/consumablequeue.d +++ b/source/eventcore/internal/consumablequeue.d @@ -71,7 +71,7 @@ final class ConsumableQueue(T) */ ConsumedRange consume() @safe { - auto first = m_first + m_consumedCount; + auto first = (m_first + m_consumedCount) % m_storage.length; auto count = m_pendingCount; m_consumedCount += count; m_pendingCount = 0; @@ -103,14 +103,14 @@ final class ConsumableQueue(T) m_queue = queue; m_first = first; m_count = count; - m_queue.m_storage[first].rc++; + m_queue.m_storage[first & m_queue.m_capacityMask].rc++; } } this(this) { if (m_count) - m_queue.m_storage[m_first].rc++; + m_queue.m_storage[m_first & m_queue.m_capacityMask].rc++; } ~this() @@ -125,7 +125,7 @@ final class ConsumableQueue(T) @property size_t length() const { return m_count; } - @property ref inout(T) front() inout { return m_queue.m_storage[m_first].value; } + @property ref inout(T) front() inout { return m_queue.m_storage[m_first & m_queue.m_capacityMask].value; } void popFront() { @@ -149,14 +149,14 @@ final class ConsumableQueue(T) { if (shift_up) { m_storage[(first+1) & m_capacityMask].rc++; - if (!--m_storage[first].rc && first == m_first) { + if (!--m_storage[first & m_capacityMask].rc && first == m_first) { m_first++; m_consumedCount--; } } else { - m_storage[first].rc--; + m_storage[first & m_capacityMask].rc--; if (first == m_first) - while (m_consumedCount > 0 && !m_storage[m_first].rc) { + while (m_consumedCount > 0 && !m_storage[m_first & m_capacityMask].rc) { m_first++; m_consumedCount--; } @@ -222,3 +222,16 @@ unittest { assert(q.consume.equal([17])); assert(q.consume.empty); } + +unittest { + import std.range : iota; + import std.algorithm.comparison : equal; + + auto q = new ConsumableQueue!int; + foreach (i; 0 .. 14) + q.put(i); + assert(q.consume().equal(iota(14))); + foreach (i; 0 .. 4) + q.put(i); + assert(q.consume().equal(iota(4))); +}