-
Notifications
You must be signed in to change notification settings - Fork 0
/
ConcurrentQueue.h
99 lines (82 loc) · 1.6 KB
/
ConcurrentQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#pragma once
#include "GuardedPointer.h"
#include <optional>
#include <list>
#include <mutex>
#include <condition_variable>
template<typename T>
class ConcurrentQueue
{
private:
mutable std::mutex m_stateMutex;
std::condition_variable m_cv;
std::list<T> m_queue;
public:
using UniqueLock = std::unique_lock<decltype(m_stateMutex)>;
ConcurrentQueue() = default;
template<typename U>
void push(U &&item)
{
UniqueLock lk(m_stateMutex);
m_queue.emplace_front(std::forward<U>(item));
lk.unlock();
m_cv.notify_one();
}
std::size_t size() const
{
UniqueLock lk(m_stateMutex);
return m_queue.size();
}
bool empty() const
{
return size() == 0;
}
std::optional<T> tryPop()
{
UniqueLock lk(m_stateMutex, std::defer_lock);
if (lk.try_lock()) {
return popImpl();
}
return {};
}
std::optional<T> pop()
{
UniqueLock lk(m_stateMutex);
return popImpl();
}
template<typename Pred>
std::optional<T> popWait(Pred pred)
{
UniqueLock lk(m_stateMutex);
m_cv.wait(lk, [this, pred] {
return !m_queue.empty() || pred();
});
return popImpl();
}
std::optional<T> popWait()
{
return popWait([]{ return false; });
}
template<typename Func>
void cancelWait(Func cancellation)
{
UniqueLock lk(m_stateMutex);
cancellation();
lk.unlock();
m_cv.notify_all();
}
protected:
std::optional<T> popImpl()
{
if (m_queue.empty()) {
return {};
}
T result = std::move(m_queue.back());
m_queue.pop_back();
return result;
}
ptr_guard::PointerGuard<decltype(m_stateMutex), std::list<T> *> lockQueue()
{
return ptr_guard::make_guarded_ptr(m_stateMutex, &m_queue);
}
};