Linux服务器开发之网关服务器的实现

什么是网关服务器

初学linux服务器开发时,我们的服务器是很简单的,只需要一个程序完成与客户端的连接,接收客户端数据,数据处理,向客户端发送数据。
但是在处理量很大的情况下,一台机器不能满足我们的需求,此时我们应该怎么办。
我们可以将服务端的任务分摊到多台机器上完成,见下图

从图中可见,此时整个服务端主要分为了三部分。
网关服务器:负责连接客户端与逻辑服务器,在两者间完成数据转发,使用负载均衡算法保证每个逻辑服务器的工作量均衡,以及进行数据加密。
逻辑服务器:负责业务逻辑的处理,与网关服务器进行数据交互,同时与数据库服务器进行数据交互。
数据库服务器:数据存储与读取的具体执行者。

实现网关服务器需要考虑哪些问题

效率问题

当我们需要用到网关服务器来负载均衡时,我可以假定我们需要处理的客户端请求是很多的(当然,我这里只是为了学习,具体业务并不需要),也就是说我们需要高并发,
高效处理。
因为网关服务器在客户端和逻辑服务器间相当于纽带的作用,所有的数据包都要从此经过,所以我们的网关服务器必须要保证可以高效的处理大量连接上的事件

安全问题

如上所说,如果网关服务器被恶意发起连接,一旦挂掉,我们的全部服务都会终止,因此我们必须要对这种情况进行处理。同时,还有与客户端交互时的数据加密,这个事也
是要交给网关服务器来进行的。逻辑服务器一般都会与网关服务器配置于同一个局域网,所以通常不需要考虑数据的加密。

对连接的标识

逻辑服务器和客户端都会连接在网关服务器上,而网关服务器需要对其sockfd进行标识,要知晓究竟谁是服务器,谁是客户端,而且要对客户端的连接加一条可检索属
性(比如用户名).
为什么呢?因为对于客户端发送过来的数据,我们无论转到哪个逻辑服务器上都可以,而逻辑服务器返回的数据,我们需要知道要将该数据返回给哪个客户端,逻辑服务器并不能
知道每个客户端的sockfd是多少。

效率问题

多路复用

我们不会去为每个sockfd都分配一个线程去服务它,我们更需要有一个线程可以去监听所有的fd上的事件,如果发生,我们再去分配线程去处理他。这就是多路复用。
多路复用有select、poll、epoll,几乎凡是知道多路复用的人都知道epoll的高效。因为其底层红黑树,以及回调机制,是我们最好的选择(在大量连接,活跃量不高的情况下)。
而epoll分两种工作模式,LT和ET,LT模式下,epoll只是一个高效的poll,ET模式下会更高效。事实上众多的第三方库都使用的是LT模式,说白了就是性价比,LT已经很高效,而改用ET模式,除了效率会更高,也会给编写带来一些复杂性以及产生一些头疼的问题,而处理这些特殊情况也需要时间,处理方式不当的话反而还不如LT,所以,总而言之,性价比不高。(本人为了学习,此处使用的ET模式)。

非阻塞

每个连接的sockfd,我们都有两种操作其的方式,阻塞和非阻塞,阻塞意味着我们此刻必须对sockfd进行等待,就是说我们不能去干别的事,这显然不可以。因
此,在以高并发为目标的服务器程序里,非阻塞是我们唯一的选择。
并且,et模式下,必须非阻塞,不然会产生套接字饿死的情况。
非阻塞模式下,我们还需要一样东西,就是缓冲区,因为你并不能保证你接受到的数据就是完整的。

工作模式

这里使用的是多线程Reacter半同步半异步模式。
主线程负责监听以及接收新的连接,维护一个任务队列,其余线程从任务队列里获取任务并完成,同时也将新的任务添加进任务队列。

架构

总体分为以下部分

  • main.h

    程序主线程:监听fd绑定、监听,epoll监听

  • Connection.h

    客户端和逻辑服务器的连接的封装
    实现对连接的操作:
    HandleRead()读, HandleWrite()写, Worker()数据处理,
    shutdown()连接关闭,getData()从用户缓冲区获取数据,puttData()将数据写入用户缓冲区

  • ThreadPool.h

    线程池的封装

  • SyncQueue.h

    任务队列的封装
    实现队列的添加取出,以及同步加锁等处理

  • Buffer.h

    用户缓存区的封装

  • BaseFunc.h

    基本函数的封装:如 setNoBlocking(), addFd()…

  • Util.h

    工具类

正确性测试结果

代码

main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
//
// GataMain.cpp
// QuoridorServer
//
// Created by shiyi on 2016/12/2.
// Copyright © 2016年 shiyi. All rights reserved.
//

#include <stdio.h>
#include <iostream>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <assert.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <functional>
#include "Util.h"
#include "ThreadPool.h"
#include "Connection.h"
#include "BaseFunc.h"

static const char *IP = "10.105.44.34";
// static const char *IP = "127.0.0.1";
// static const char *IP = "182.254.243.29";
static const int PORT = 11111;

//处理的最大连接数
static const int USER_PROCESS = 655536;
//epoll能监听的最大事件
static const int MAX_EVENT_NUMBER = 10000;

//信号通信的管道
static int sigPipefd[2];

//信号回调函数
static void sigHandler(int sig)
{
int saveErrno = errno;
send(sigPipefd[1], (char*)&sig, 1, 0);
errno = saveErrno;
}

//添加信号回调
static void addSig(int sig, void(handler)(int), bool restart = true)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = handler;
if(restart)
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
if(-1 == sigaction(sig, &sa, NULL))
Util::outError("sigaction");
}

static int setupSigPipe()
{
//新建epoll监听表和事件管道
int epollfd = epoll_create(USER_PROCESS);
if(epollfd == -1)
Util::outError("epoll_create");

int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sigPipefd);
assert(ret == 0);

//将写设置为非阻塞
setNoBlocking(sigPipefd[1]);
addFd(epollfd, sigPipefd[0], EPOLLIN | EPOLLET);
setNoBlocking(sigPipefd[0]);

//设置信号处理函数
addSig(SIGCHLD, sigHandler);
addSig(SIGTERM, sigHandler);
addSig(SIGINT, sigHandler);
addSig(SIGPIPE, sigHandler);

return epollfd;
}

int main()
{
int ret;

//构造协议地址结构
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = PF_INET;
inet_pton(PF_INET, IP, &address.sin_addr);
address.sin_port = htons(PORT);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert( listenfd >= 0 );

int opt = 1;
if(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(int)) < 0)
{
perror("setsockopt");
exit(1);
}

ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
if(ret == -1)
{
perror("bind");
}

if(listen(listenfd, 1000) < 0)
{
perror("listen");
exit(1);
}

Connection *users = new Connection[USER_PROCESS];
ThreadPool threadPool;

//统一事件源
int epollfd = setupSigPipe();

epoll_event events[MAX_EVENT_NUMBER];
// addFd(epollfd, listenfd, EPOLLIN | EPOLLET);
addFd(epollfd, listenfd, EPOLLIN);
// setNoBlocking(m_listenfd);

bool isRunning = true;

while(isRunning)
{
int num = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
//如果错误原因不是被中断,则循环退出
if((num < 0) && (errno != EINTR))
{
Util::outError("epoll_wait failure");
break;
}

for(int i=0; i<num; i++)
{
int sockfd = events[i].data.fd;
//处理新的请求
if(sockfd == listenfd)
{
//连接新的请求
struct sockaddr_in clientAddr;
socklen_t clientLen = sizeof(clientAddr);
int connfd = accept(listenfd, (struct sockaddr*)&clientAddr, &clientLen);

if(connfd < 0)
{
Util::outError("accept");
break;
}

Util::outMsg("accept a new client : %d %s\n", connfd, inet_ntoa(clientAddr.sin_addr));

addFd(epollfd, connfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
setNoBlocking(connfd);
//初始化客户端链接
users[connfd].init(epollfd, connfd, clientAddr);

}
//处理信号
else if((sockfd == sigPipefd[0]) && (events[i].events & EPOLLIN))
{
char sigMsg[1024];
int ret = recv(sockfd, sigMsg, sizeof(sigMsg), 0);
if(ret <= 0)
{
continue;
}

for(int j=0; j<ret; j++)
{
//循环处理每个信号
switch(sigMsg[j])
{
case SIGCHLD:
{

break;
}
case SIGTERM:
case SIGINT:
{
//退出
Util::outMsg("程序退出\n");
isRunning = false;
break;
}
}
}
}
//处理读事件
else if(events[i].events & EPOLLIN)
{
//向任务队列添加读任务
threadPool.AddTask(std::bind(&Connection::HandleRead, users+sockfd));
}
//处理写事件
else if(events[i].events & EPOLLOUT)
{
// cout<<"hello"<<sockfd<<endl;
threadPool.AddTask(std::bind(&Connection::HandleWrite, users+sockfd));
}
}
}

delete[] users;

close(sigPipefd[0]);
close(sigPipefd[1]);
close(epollfd);

return 0;
}

Connection.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
//
// Connection.h
// QuoridorServer
//
// Created by shiyi on 2016/12/2.
// Copyright © 2016年 shiyi. All rights reserved.
//

#ifndef Connection_H
#define Connection_H

#include <stdio.h>
#include <iostream>
#include <atomic>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <assert.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <errno.h>
#include <map>
#include "Buffer.h"
#include "Util.h"
#include "BaseFunc.h"

#include "json/json.h"

const std::string serverIP[] = {
"127.0.0.1",
"182.254.243.29"
};

const size_t BUFFER_SIZE = 65535;

class Connection
{
public:

static std::vector<Connection*> serverConnVt;
static std::map<string, Connection*> clientConnMap;
static int serverIdx;

static Connection* getServerConn()
{
int size = serverConnVt.size();
if(size == 0)
return NULL;

serverIdx = (serverIdx+1)%size;

return serverConnVt[serverIdx];
}

Connection() : m_writeing(true), m_epollfd(-1), m_sockfd(-1)
{}

~Connection(){}

//初始化连接
void init(int epollfd, int sockfd, const sockaddr_in& clientAddr)
{
//初始化读写缓冲区
m_inBuff.init();
m_outBuff.init();

m_epollfd = epollfd;
m_sockfd = sockfd;
m_writeing = true;
m_address = clientAddr;
m_username = "";
m_type = -1;

std::string sip(inet_ntoa(clientAddr.sin_addr));

for(auto& ip : serverIP)
{
if(ip.compare(0, sip.size(), sip) == 0)
{
m_type = 1;
serverConnVt.push_back(this);
cout<<sip<<"是服务端"<<endl;
break;
}
}

if(m_type != 1)
{
char t[10];
sprintf(t, "%d", m_sockfd);
m_username = t;

//存入客户端映射表
clientConnMap.insert(pair<string, Connection*>(m_username, this));
}
}

void HandleRead()
{
cout<<"read"<<endl;
while(true)
{
char buf[BUFFER_SIZE];
int ret = recv(m_sockfd, buf, BUFFER_SIZE, 0);
if(ret < 0)
{
//缓冲区内容已读完
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
break;
}
//其他错误直接断开连接
Util::outError("HandleRead");
shutdown();
return;
}
//断开连接
else if(ret == 0)
{
shutdown();
return;
}
else
{
//将读取的内容加入缓冲区
m_inBuff.PutData(buf, ret);
}

}

worker();
}

void HandleWrite()
{
cout<<"write"<<endl;
//更改临界值
if(!m_writeing)
{
//休眠等待
usleep(1000);
//下次再来
modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT);
return;
}
m_writeing = false;

//取出数据
char buf[BUFFER_SIZE];

int len = m_outBuff.GetDataAll(buf);

int n = len;
while (n > 0)
{
int ret = send(m_sockfd, buf+len-n, n, 0);
if (ret < n)
{
if (ret == -1 && errno != EAGAIN)
{
Util::outError("write error");
}
break;
}
n -= ret;
}

//n=0表示数据全部写完,删除写事件
if(n == 0)
{
modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
}
else
{
modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT);
}

//恢复临界值
m_writeing = true;
}

void clientWork()
{
//解析
//取出数据
char buf[BUFFER_SIZE];
int len = getData(buf);

//解密buf
printf("recv from %d :%s\n", m_sockfd, buf);

std::string recvUser;
Json::Reader reader;
Json::Value inRoot;
Json::Value outRoot;
if(reader.parse(buf, inRoot))
{
Json::Value data = inRoot["data"];
outRoot["data"] = data;
outRoot["user"] = m_username;
}

Connection* toConn = getServerConn();
if(toConn->m_sockfd == -1)
{
printf("无可用逻辑服务器\n");
return;
}

//生成json字符串
std::string outStr = outRoot.toStyledString();
len = outStr.size();

printf("send to %d :%s\n", toConn->m_sockfd, outStr.c_str());

memcpy(buf, &len, 4);
memcpy(buf+4, outStr.c_str(), len);

toConn->putData(buf, len+4);

modFd(m_epollfd, toConn->m_sockfd, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT);
}


void serverWork()
{
//解析
//取出数据
char buf[BUFFER_SIZE];
int len = getData(buf);

//解密buf
printf("recv from %d :%s\n", m_sockfd, buf);

std::string toUser;
Json::Reader reader;
Json::Value inRoot;
Json::Value outRoot;
if(reader.parse(buf, inRoot))
{
toUser = inRoot["user"].asString();
Json::Value data = inRoot["data"];
outRoot["data"] = data;
}

auto iter = clientConnMap.find(toUser);
if(iter == clientConnMap.end())
{
printf("客户端%s不存在\n", toUser.c_str());
return;
}

Connection* toConn = (*iter).second;

//生成json字符串
std::string outStr = outRoot.toStyledString();
len = outStr.size();

printf("send to %d :%s\n", toConn->m_sockfd, outStr.c_str());

memcpy(buf, &len, 4);
memcpy(buf+4, outStr.c_str(), len);

toConn->putData(buf, len+4);

modFd(m_epollfd, toConn->m_sockfd, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT);

}

int getData(char *buf)
{
return m_inBuff.GetData(buf);

}

void putData(char *buf, int len)
{
while(!m_writeing)
usleep(1000);
m_writeing = false;

m_outBuff.PutData(buf, len);

m_writeing = true;
}

void worker()
{
//serverWork();
if(m_type == 1)
{
cout<<"workerServer"<<endl;
serverWork();
}
else
{
cout<<"workerClient"<<endl;
clientWork();
}
}

void shutdown()
{
//等待写事件完成后关闭
while(!m_writeing)
usleep(1000);

m_writeing = false;
removeFd(m_epollfd, m_sockfd);
m_writeing = true;

//服务端
if(m_type == 1)
{
for(auto i=serverConnVt.begin(); i!=serverConnVt.end(); i++)
{
if((*i)->m_sockfd == m_sockfd)
{
//在vt中删除该连接
serverConnVt.erase(i);
cout<<"退出服务端"<<m_sockfd<<endl;
break;
}
}
}
//客户端
else
{
//map删除
auto iter = clientConnMap.find(m_username);
if(iter != clientConnMap.end())
{
clientConnMap.erase(iter);
printf("客户端%s退出\n", m_username.c_str());
}
}
}

private:
int m_epollfd; //epoll描述符
int m_sockfd; //套接字描述符
std::string m_username; //连接唯一标识
int m_type; //连接类型 -1为未知客户端 0为已知客户端 1为服务端
sockaddr_in m_address; //套接字地址
Buffer m_inBuff; //读缓冲
Buffer m_outBuff; //写缓冲
std::atomic_bool m_writeing; //是否正在写
};

std::vector<Connection*> Connection::serverConnVt;
std::map<string, Connection*> Connection::clientConnMap;
int Connection::serverIdx = -1;

#endif /* Connection_H */

Buffer.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//
// Buffer.h
// QuoridorServer
//
// Created by shiyi on 2016/12/2.
// Copyright © 2016年 shiyi. All rights reserved.
//

#ifndef Buffer_H
#define Buffer_H

#include <stdio.h>
#include <iostream>
#include <vector>

using namespace std;

class Buffer
{
public:
Buffer() : m_widx(0), m_ridx(0)
{}

~Buffer(){}

void init()
{
m_widx = m_ridx = 0;
m_buf.clear();
}

//增加内容
void PutData(char *data, int len)
{
//如果调整空间后足够存放,则进行调整
int capa = m_buf.capacity();
if(capa < m_widx + len && capa > len + m_widx - m_ridx)
adjust();

for(int i = 0; i < len; i++)
m_buf.push_back(data[i]);

m_widx += len;
}

//返回获取的包的大小,数据不完整返回-1

int GetData(char* data)
{
if(m_widx - m_ridx < 4)
return -1;

int len;
char *t = (char*)&len;
for(int i=0; i<4; i++)
{
t[i] = m_buf[m_ridx+i];
}

//printf("-=-=%d\n", len);

if(len+4 > m_widx-m_ridx)
return -1;

m_ridx += 4;

for(int i = 0; i < len; i++)
{
data[i] = m_buf[m_ridx++];
}

if(m_ridx >= m_widx)
{
m_ridx = m_widx = 0;
m_buf.clear();
}
return len;
}

//返回Buffer内全部内容
int GetDataAll(char* data)
{
int len = m_widx-m_ridx;

for(int i = 0; i < len; i++)
{
if(m_ridx >= m_widx)
break;
data[i] = m_buf[m_ridx++];
}

if(m_ridx >= m_widx)
{
m_ridx = m_widx = 0;
m_buf.clear();
}

return len;
}

private:

//将数据移至容器头部,充分利用空间
void adjust()
{
vector<char> t(m_buf.begin()+m_ridx, m_buf.begin()+m_widx);
m_widx -= m_ridx;
m_ridx = 0;

m_buf.clear();

for(int i=0; i<m_widx; i++)
m_buf.push_back(t[i]);
}

private:

int m_ridx;
int m_widx;
std::vector<char> m_buf;
};

#endif /* Buffer_H */

ThreadPool.h

//
//  ThreadPool.h
//  QuoridorServer
//
//  Created by shiyi on 2016/11/30.
//  Copyright © 2016年 shiyi. All rights reserved.
//

#ifndef ThreadPool_H
#define ThreadPool_H

#include <stdio.h>
#include <iostream>
#include <functional>
#include <thread>
#include <atomic>
#include "SyncQueue.h"

const int MaxTaskCount = 100;

class ThreadPool
{
public:
    using Task = std::function<void()>;

    ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
    {
        if(numThreads < 4)
            numThreads = 4;

        printf("线程池启动-%d线程\n", numThreads);

        Start(numThreads);
    }

    ~ThreadPool()
    {
        Stop();
    }

    void Stop()
    {
        std::call_once(m_flag, [this]{
            StopThreadGroup();
        });
    }

    void AddTask(Task&& task)
    {
        m_queue.Push(std::forward<Task>(task));
    }

    void AddTask(const Task& task)
    {
        m_queue.Push(task);
    }

private:
    void Start(int numThreads)
    {
        m_running = true;
        //创建线程组
        for(int i=0; i<numThreads; i++)
        {
            m_threadGroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
        }

    }

    void RunInThread()
    {
        while(m_running)
        {
            std::queue<Task> queue;
            m_queue.Take(queue);
            std::queue<int> a;

            while(!queue.empty())
            {
                if(!m_running)
                    return;
                auto task = queue.front();
                queue.pop();
                task();
            }
        }
    }

    void StopThreadGroup()
    {
        m_queue.Stop();
        m_running = false;

        for(auto thread : m_threadGroup)
        {
            thread->join();
        }

        m_threadGroup.clear();
    }

private:
    SyncQueue<Task> m_queue;                                    //同步队列
    std::vector<std::shared_ptr<std::thread>> m_threadGroup;    //处理任务的线程组
    atomic_bool m_running;                                      //是否停止
    std::once_flag m_flag;
};

#endif /* ThreadPool_H */

SyncQueue.h

//
//  SyncQueue.h
//  QuoridorServer
//
//  Created by shiyi on 2016/11/30.
//  Copyright © 2016年 shiyi. All rights reserved.
//

#ifndef SyncQueue_H
#define SyncQueue_H

#include <stdio.h>
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>
#include <condition_variable>

using namespace std;

template <typename T>
class SyncQueue
{
public:
    SyncQueue(int maxSize) : m_maxSize(maxSize), m_isStop(false)
    {
    }

    ~SyncQueue(){}

    void Push(const T& x)
    {
        Add(x);
    }

    void Push(T&& x)
    {
        Add(x);
    }

    void Take(T& t)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notEmpty(locker, [this]{
            return m_isStop || m_notEmpty();
        });

        if(m_isStop)
            return;
        t = m_queue.front();
        m_queue.pop();
        m_notFull.notify_one();
    }

    void Take(std::queue<T>& queue)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notEmpty.wait(locker, [this]{
            return m_isStop || NotEmpty();
        });

        if(m_isStop)
            return;
        queue = std::move(m_queue);
        m_notFull.notify_one();
    }

    void Stop()
    {
        {
            std::lock_guard<std::mutex> locker(m_mutex);
            m_isStop = true;
        }
        m_notFull.notify_all();
        m_notEmpty.notify_all();
    }

    bool Empty()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.empty();
    }

    bool Full()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.size() >= m_maxSize;
    }

private:

    bool NotFull()
    {
        bool full = m_queue.size() >= m_maxSize;
        if(full)
            cout<<"缓冲区满,需要等待..."<<this_thread::get_id()<<endl;
        return !full;
    }

    bool NotEmpty()
    {
        bool empty = m_queue.empty();
        if(empty)
            cout<<"缓冲区空,需要等待..."<<this_thread::get_id()<<endl;
        return !empty;
    }

    template<typename F>
    void Add(F&& x)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notFull.wait(locker, [this]{
            return m_isStop || NotFull();
        });
        if(m_isStop)
            return;
        m_queue.push(std::forward<F>(x));
        m_notEmpty.notify_one();
    }

private:
    bool m_isStop;                     //是否停止
    int m_maxSize;                     //同步队列最大的长度
    std::queue<T> m_queue;                  //缓冲区
    std::mutex m_mutex;                     //互斥量
    std::condition_variable m_notEmpty;     //不为空的条件变量
    std::condition_variable m_notFull;      //不满的条件变量
};

#endif /* SyncQueue_H */
如果本文对你有用,可以请作者喝杯茶~
0%