Fix range errors in ConsumableQueue and add removePending.

This commit is contained in:
Sönke Ludwig 2016-06-15 18:18:26 +02:00
parent 5298e00c09
commit fe939bff18

View file

@ -52,6 +52,15 @@ class ConsumableQueue(T)
m_capacityMask = new_capacity_mask; m_capacityMask = new_capacity_mask;
} }
void removePending(T item)
{
foreach (i; 0 .. m_pendingCount)
if (getPendingAt(i) == item) {
getPendingAt(i) = getPendingAt(m_pendingCount-1);
m_pendingCount--;
}
}
/** Consumes all elements of the queue and returns a range containing the /** Consumes all elements of the queue and returns a range containing the
consumed elements. consumed elements.
@ -60,9 +69,9 @@ class ConsumableQueue(T)
*/ */
ConsumedRange consume() ConsumedRange consume()
@safe { @safe {
auto first = m_first; auto first = m_first + m_consumedCount;
auto count = m_pendingCount; auto count = m_pendingCount;
m_first += count; m_consumedCount += count;
m_pendingCount = 0; m_pendingCount = 0;
return ConsumedRange(this, first, count); return ConsumedRange(this, first, count);
} }
@ -73,6 +82,7 @@ class ConsumableQueue(T)
auto ret = m_storage[(m_first + m_consumedCount) & m_capacityMask].value; auto ret = m_storage[(m_first + m_consumedCount) & m_capacityMask].value;
if (m_consumedCount) m_consumedCount++; if (m_consumedCount) m_consumedCount++;
else m_first = (m_first + 1) & m_capacityMask; else m_first = (m_first + 1) & m_capacityMask;
m_pendingCount--;
return ret; return ret;
} }
@ -87,20 +97,24 @@ class ConsumableQueue(T)
this(ConsumableQueue queue, size_t first, size_t count) this(ConsumableQueue queue, size_t first, size_t count)
{ {
m_queue = queue; if (m_count) {
m_queue.m_storage[first].rc++; m_queue = queue;
m_first = first; m_first = first;
m_count = count; m_count = count;
m_queue.m_storage[first].rc++;
}
} }
this(this) this(this)
{ {
m_queue.m_storage[m_first].rc++; if (m_count)
m_queue.m_storage[m_first].rc++;
} }
~this() ~this()
{ {
m_queue.consumed(m_first, false); if (m_count)
m_queue.consumed(m_first, false);
} }
@property ConsumedRange save() { return this; } @property ConsumedRange save() { return this; }
@ -140,12 +154,18 @@ class ConsumableQueue(T)
} else { } else {
m_storage[first].rc--; m_storage[first].rc--;
if (first == m_first) if (first == m_first)
while (!m_storage[m_first].rc) { while (m_consumedCount > 0 && !m_storage[m_first].rc) {
m_first++; m_first++;
m_consumedCount--; m_consumedCount--;
} }
} }
} }
private ref T getPendingAt(size_t idx)
{
assert(idx < m_pendingCount, "Pending item index out of bounds.");
return m_storage[(m_first + m_consumedCount + idx) & m_capacityMask].value;
}
} }
/// ///
@ -156,26 +176,36 @@ unittest {
q.put(1); q.put(1);
q.put(2); q.put(2);
q.put(3); q.put(3);
assert(q.m_consumedCount == 0 && q.m_pendingCount == 3);
auto r1 = q.consume; auto r1 = q.consume;
assert(r1.length == 3); assert(r1.length == 3);
assert(q.m_consumedCount == 3 && q.m_pendingCount == 0);
q.put(4); q.put(4);
q.put(5); q.put(5);
assert(q.m_consumedCount == 3 && q.m_pendingCount == 2);
auto r2 = q.consume; auto r2 = q.consume;
assert(r2.length == 2); assert(r2.length == 2);
assert(q.m_consumedCount == 5 && q.m_pendingCount == 0);
q.put(6); q.put(6);
assert(q.m_consumedCount == 5 && q.m_pendingCount == 1);
auto r3 = r1.save; auto r3 = r1.save;
assert(r3.length == 3); assert(r3.length == 3);
assert(r2.equal([4, 5])); assert(q.m_consumedCount == 5 && q.m_pendingCount == 1);
assert(r1.equal([1, 2, 3])); assert((&r2).equal([4, 5]));
assert(r3.equal([1, 2, 3])); assert(q.m_consumedCount == 5 && q.m_pendingCount == 1);
assert(q.m_consumedCount == 0); assert((&r1).equal([1, 2, 3]));
assert(q.m_consumedCount == 5 && q.m_pendingCount == 1);
assert((&r3).equal([1, 2, 3]));
assert(q.m_consumedCount == 0 && q.m_pendingCount == 1);
assert(q.length == 1); assert(q.length == 1);
assert(q.consume.equal([6])); assert(q.consumeOne == 6);
assert(q.length == 0); assert(q.length == 0);
assert(q.m_consumedCount == 0);
} }