-
Notifications
You must be signed in to change notification settings - Fork 8
/
double_list_mpmc_queue.h
145 lines (119 loc) · 2.89 KB
/
double_list_mpmc_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#ifndef __DOUBLE_LIST_MPMC_QUEUE__
#define __DOUBLE_LIST_MPMC_QUEUE__
#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <string>
#include "list.h" // demo list
#include "workflow_list.h"
template<typename T>
class DoubleListMpmcQueue
{
private:
struct __queue_waiter
{
struct list_head list;
bool flag;
pthread_cond_t cond;
};
public:
DoubleListMpmcQueue(int capacity)
{
this->res_max = capacity;
this->put_res_cnt = 0;
this->get_res_cnt = 0;
// INIT_LIST_HEAD(&this->put_waiter_list);
INIT_LIST_HEAD(&this->get_waiter_list);
}
void enqueue(T element)
{
struct __queue_waiter *waiter = NULL;
struct list_head *pos;
pthread_mutex_lock(&this->put_mutex);
while (this->put_res_cnt >= this->res_max)
pthread_cond_wait(&this->put_cond, &this->put_mutex);
pos = this->get_waiter_list.next;
// somebody is waiting
if (pos != &this->get_waiter_list)
{
waiter = list_entry(pos, struct __queue_waiter ,list);
list_del(pos);
waiter->flag = true;
}
else
{
this->put_list.add_tail(element);
this->put_res_cnt++;
}
pthread_mutex_unlock(&this->put_mutex);
if (waiter)
pthread_cond_signal(&waiter->cond);
}
T dequeue()
{
T ret;
pthread_mutex_lock(&this->get_mutex);
if (!this->get_list.empty() || this->swap_list() > 0)
{
this->get_list.get_head(ret);
this->get_res_cnt--;
}
pthread_mutex_unlock(&this->get_mutex);
return ret;
}
int swap_list()
{
pthread_mutex_lock(&this->put_mutex);
if (this->put_res_cnt == 0)
{
struct __queue_waiter waiter = {
.list = {},
.flag = false,
.cond = PTHREAD_COND_INITIALIZER
};
list_add(&waiter.list, &this->get_waiter_list);
do
pthread_cond_wait(&waiter.cond, &this->put_mutex);
while (waiter.flag == false);
}
else
{
this->get_res_cnt = this->put_res_cnt;
if (this->get_res_cnt > this->res_max - 1)
pthread_cond_broadcast(&this->put_cond); // why broadcast
this->get_list = std::move(this->put_list);
this->put_res_cnt = 0;
}
pthread_mutex_unlock(&this->put_mutex);
return this->get_res_cnt;
/*
pthread_mutex_lock(&this->put_mutex);
while (this->put_res_cnt == 0)
pthread_cond_wait(&get_cond, &put_mutex);
this->get_res_cnt = this->put_res_cnt;
if (this->get_res_cnt > this->res_max - 1)
pthread_cond_broadcast(&this->put_cond);
this->get_list = std::move(this->put_list);
this->put_res_cnt = 0;
pthread_mutex_unlock(&this->put_mutex);
return this->get_res_cnt;
*/
}
int size()
{
return this->put_res_cnt + this->get_res_cnt;
}
private:
int res_max;
List<T> put_list;
List<T> get_list;
int put_res_cnt;
int get_res_cnt;
pthread_mutex_t put_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t get_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t put_cond = PTHREAD_COND_INITIALIZER;
// struct list_head put_waiter_list;
struct list_head get_waiter_list;
};
#endif