Rework circular queue locking

Should now be (hopefully) race-free, also switch to a spinlock to avoid any locking overhead.
This commit is contained in:
Billy Laws 2023-01-08 19:18:28 +00:00
parent 5d527cb965
commit 2f6d27e8d7
2 changed files with 39 additions and 43 deletions

View File

@ -15,6 +15,7 @@
#include <string>
#include <memory>
#include <compare>
#include <condition_variable>
#include <boost/container/small_vector.hpp>
#include <common/exception.h>
#include <common/span.h>

View File

@ -4,6 +4,7 @@
#pragma once
#include <common/trace.h>
#include <common/spin_lock.h>
#include <common.h>
namespace skyline {
@ -14,12 +15,12 @@ namespace skyline {
class CircularQueue {
private:
std::vector<u8> vector; //!< The internal vector holding the circular queue's data, we use a byte vector due to the default item construction/destruction semantics not being appropriate for a circular buffer
Type *start{reinterpret_cast<Type *>(vector.begin().base())}; //!< The start/oldest element of the queue
Type *end{reinterpret_cast<Type *>(vector.begin().base())}; //!< The end/newest element of the queue
std::mutex consumptionMutex;
std::condition_variable consumeCondition;
std::mutex productionMutex;
std::condition_variable produceCondition;
std::atomic<Type *> start{reinterpret_cast<Type *>(vector.begin().base())}; //!< The start/oldest element of the queue
std::atomic<Type *> end{reinterpret_cast<Type *>(vector.begin().base())}; //!< The end/newest element of the queue
SpinLock consumptionMutex;
std::condition_variable_any consumeCondition;
SpinLock productionMutex;
std::condition_variable_any produceCondition;
public:
/**
@ -57,14 +58,14 @@ namespace skyline {
while (true) {
if (start == end) {
std::unique_lock lock{consumptionMutex};
std::unique_lock productionLock{productionMutex};
TRACE_EVENT_END("containers");
preWait();
produceCondition.wait(lock, [this]() { return start != end; });
produceCondition.wait(productionLock, [this]() { return start != end; });
TRACE_EVENT_BEGIN("containers", "CircularQueue::Process");
}
std::scoped_lock comsumptionLock{consumptionMutex};
while (start != end) {
auto next{start + 1};
next = (next == reinterpret_cast<Type *>(vector.end().base())) ? reinterpret_cast<Type *>(vector.begin().base()) : next;
@ -77,46 +78,50 @@ namespace skyline {
}
Type Pop() {
std::unique_lock lock(productionMutex);
produceCondition.wait(lock, [this]() { return start != end; });
{
std::unique_lock productionLock{productionMutex};
produceCondition.wait(productionLock, [this]() { return start != end; });
}
std::scoped_lock comsumptionLock{consumptionMutex};
auto next{start + 1};
next = (next == reinterpret_cast<Type *>(vector.end().base())) ? reinterpret_cast<Type *>(vector.begin().base()) : next;
Type item{*next};
start = next;
if (start == end)
consumeCondition.notify_one();
consumeCondition.notify_one();
return item;
}
void Push(const Type &item) {
std::unique_lock lock(productionMutex);
auto next{end + 1};
next = (next == reinterpret_cast<Type *>(vector.end().base())) ? reinterpret_cast<Type *>(vector.begin().base()) : next;
if (next == start) {
std::unique_lock consumeLock(consumptionMutex);
consumeCondition.wait(consumeLock, [=]() { return next != start; });
}
*next = item;
end = next;
produceCondition.notify_one();
}
Type *waitNext{};
void Append(span <Type> buffer) {
std::unique_lock lock(productionMutex);
for (const auto &item : buffer) {
while (true) {
if (waitNext) {
std::unique_lock consumeLock{consumptionMutex};
consumeCondition.wait(consumeLock, [=]() { return waitNext != start; });
waitNext = nullptr;
}
std::scoped_lock lock{productionMutex};
auto next{end + 1};
next = (next == reinterpret_cast<Type *>(vector.end().base())) ? reinterpret_cast<Type *>(vector.begin().base()) : next;
if (next == start) {
std::unique_lock consumeLock(consumptionMutex);
consumeCondition.wait(consumeLock, [=]() { return next != start; });
waitNext = next;
continue;
}
*next = item;
end = next++;
end = next;
produceCondition.notify_one();
break;
}
produceCondition.notify_one();
}
void Append(span<Type> buffer) {
for (const auto &item : buffer)
Push(item);
}
/**
@ -125,18 +130,8 @@ namespace skyline {
*/
template<typename TransformedType, typename Transformation>
void AppendTranform(TransformedType &container, Transformation transformation) {
std::unique_lock lock(productionMutex);
for (auto &item : container) {
auto next{end + 1};
next = (next == reinterpret_cast<Type *>(vector.end().base())) ? reinterpret_cast<Type *>(vector.begin().base()) : next;
if (next == start) {
std::unique_lock consumeLock(consumptionMutex);
consumeCondition.wait(consumeLock, [=]() { return next != start; });
}
*next = transformation(item);
end = next;
}
produceCondition.notify_one();
for (const auto &item : container)
Push(transformation(item));
}
};
}