仿muduo库实现并发服务器(1)
一、Channel和Poller协调合作测试
测试思路:

创建一个监听套接字的Channel:lst_channel,对可读回调函数void SetReadCallback(const EventCallback &cb)传入Acceptor()来获取新连接,同时启动可读事件监控:
这里会通过Channel::EnableRead()的Update模块(void Channel::Update() { _poller->UpdateEvent(this); }),将lst_channel添加到Poller的
_channels管理模块并且对监听套接字进行可读事件监控设置epoll_ctl 。当Poller的 Poll(std::vector*active)开始监控,客户端发起连接,可读事件就绪,由active返回这个活跃lst_channel,得到的lst_channel就可以调用 HandleEvent()进行事件处理,这里是可读事件触发所以调用_read_callback()(_event_callback为空)。
由于之前对监听套接字lst_channel设置的可读回调函数是Acceptor(),那么就会执行这个函数:
获取新链接,这里先不创建Socket套接字,直接创建通信channel对newfd描述符进行事件管理。同时对这个通信channel设置可读、可写、关闭、错误、任意事件的回调函数,并且启动可读事件监控,因为Poller的 Poll一直在不断监控,所以一旦客户端给服务端发送数据,这个通信channel的可读事件被触发,按照下面的tcp_svr.cc测试代码:服务端成功读取信息,就会开启可写事件监控,触发可写事件给客户端发送特定信息。////注意当客户端关闭时,是可写事件被触发,recv接口返回值为0,再调用HandleClose函数,移除这个通信channel的所有监控,释放channel。
Channel模块和Poller模块代码参考
class Poller;
//描述符事件管理
class Channel
{
private:
int _fd;
Poller *_poller;
uint32_t _events; //当前需要监控的事件
uint32_t _revents; //当前连接触发的事件
using EventCallback = std::function;//由connection模块传入
EventCallback _read_callback; //可读事件被触发的回调函数
EventCallback _write_callback; //可写事件被触发的回调函数
EventCallback _error_callback; //错误事件被触发的回调函数
EventCallback _close_callback; //连接断开事件被触发的回调函数
EventCallback _event_callback; //任意事件被触发的回调函数
public:
Channel(int fd, Poller *poller):_poller(poller), _fd(fd), _events(0), _revents(0) {}
int Fd() { return _fd; }
//哪些事件被触发
void SetREvents(uint32_t events) { _revents = events; }
//需要监控哪些事件:传给poller
uint32_t Event() { return _events; }
//设置回调函数
void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }
void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }
//当前是否监控了可读
bool ReadAble() { return (_events & EPOLLIN); }
//当前是否监控了可写
bool WriteAble() { return (_events & EPOLLOUT); }
//启动读事件监控
void EnableRead() { _events |= EPOLLIN; Update(); }
//启动写事件监控
void EnableWrite() { _events |= EPOLLOUT; Update(); }
//关闭读事件监控
void DisableRead() { _events &= ~EPOLLIN; Update(); }
//关闭写事件监控
void DisableWrite() { _events &= ~EPOLLOUT; Update(); }
//关闭所有事件监控
void DisableAll() { _events = 0; Update(); }
//移除监控
void Remove();
void Update();
//事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么时间自己决定怎么处理
void HandleEvent()
{
if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
/*不管任何事件,都调用的回调函数*/
if(_event_callback) _event_callback();
if(_read_callback) _read_callback();
}
//有可能会释放连接的操作事件,一次只处理一个
if(_revents & EPOLLOUT)
{
if(_write_callback) _write_callback();
if(_event_callback) _event_callback(); //放到事件处理完毕后调用,刷新活跃度;
//如果先执行后,客户端断开,_write_callback认为还是有效,写会出错
}
else if(_revents & EPOLLERR)
{
if(_event_callback) _event_callback();
if(_error_callback) _error_callback();//一旦出错就会释放连接,后面就不能再进行处理任意事件,所以放前面
}
else if(_revents & EPOLLHUP)
{
if(_event_callback) _event_callback();
if(_close_callback) _close_callback();
}
}
};
//描述符IO事件监控
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
int _epfd; //epoll操作句柄
struct epoll_event _evs[MAX_EPOLLEVENTS]; //监控时保存当前所有活跃事件
std::unordered_map _channel; //管理描述符和描述符对应的事件管理Channel对象
private:
//对epoll的直接操作
void Update(Channel* channel, int op)
{
//int epoll_ctl(int epfd, int op, int fd,struct epoll_event *_Nullable event);
struct epoll_event event;
event.data.fd = channel->Fd();
event.events = channel->Event();
int ret = epoll_ctl(_epfd, op, channel->Fd(), &event);
if(ret < 0)
{
ERR_LOG("EPOLLCTL FAILED!");
abort();
}
}
//判断一个Channel是否已经添加事件监控
bool HasChannel(Channel* channel)
{
auto it = _channel.find(channel->Fd());
if(it != _channel.end()) return true;
return false;
}
public:
Poller()
{
_epfd = epoll_create(1);
if(_epfd < 0)
{
ERR_LOG("EPOLL CREATE FAILED!");
abort();
}
}
//添加或修改监控事件
void UpdateEvent(Channel* channel)
{
if (!HasChannel(channel))
{
//不在就EPOLL_CTL_ADD,并且加入_channel
Update(channel, EPOLL_CTL_ADD);
_channel[channel->Fd()] = channel;
}
//在就EPOLL_CTL_MOD
else Update(channel, EPOLL_CTL_MOD);
}
//移除监控
void RemoveEvent(Channel* channel)
{
auto it = _channel.find(channel->Fd());
if (it != _channel.end())
{
_channel.erase(it);
}
Update(channel, EPOLL_CTL_DEL);
}
//开始监控,返回活跃链接
void Poll(std::vector *active)
{
//int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);//-1:阻塞监控
if(nfds < 0)
{
if(errno == EINTR)
{
return;
}
ERR_LOG("EPOLL WAIT ERROR:%s
", strerror(errno));
abort();
}
for(int i = 0; i < nfds; i++)
{
auto it = _channel[_evs[i].data.fd];
it->SetREvents(_evs[i].events);//设置实际就绪的事件
active->push_back(it);
}
}
};
void Channel::Remove() { _poller->RemoveEvent(this); }
void Channel::Update() { _poller->UpdateEvent(this); }
注意:
void Channel::Remove() { _poller->RemoveEvent(this); }
void Channel::Update() { _poller->UpdateEvent(this); }
这里必须写在类外,如果写在Channel类内,由于编译器是从上到下编译,只声明了class Poller是不够的,因为Poller类的具体设计在下面,此时编译器是不知道_poller有哪些成员变量和成员函数的,这里会出错;只能先暂时声明void Remove(); void Update() 这两个函数,当走到最后才能对这两个函数的实现进行编译。
客户端和服务端测试代码
//tcp_srv.cc:服务端测试
#include "../source/server.hpp"
void HandleClose(Channel *channel)
{
std::cout << "close: " << channel->Fd() << std::endl;
channel->Remove();//移除监控
delete channel;//释放Channel
}
void HandleWrite(Channel *channel)
{
char *data = "你好bugubugu";
int ret = send(channel->Fd(), data, strlen(data), 0);
if(ret < 0) HandleClose(channel);
else channel->DisableWrite();//回应对方一次就关闭写监控
}
void HandleError(Channel *channel)
{
HandleClose(channel);
}
void HandleEvent(Channel *channel)
{
std::cout << "有事件被触发" << std::endl;
}
void HandleRead(Channel *channel)
{
char buf[1024] = {0};
int ret = recv(channel->Fd(), buf, 1023, 0);
if(ret <= 0) HandleClose(channel);
else
{
channel->EnableWrite();
std::cout << buf << std::endl;
}
}
void Acceptor(Poller *poller, Channel* lis_channel)
{
int newfd = accept(lis_channel->Fd(), nullptr, nullptr);
if(newfd < 0) return;
Channel* channel = new Channel(newfd, poller);
channel->SetReadCallback(std::bind(HandleRead, channel));//为通信套接字设置可读事件的回调函数
channel->SetWriteCallback(std::bind(HandleWrite, channel));//为通信套接字设置可写事件的回调函数
channel->SetCloseCallback(std::bind(HandleClose, channel));//为通信套接字关闭可读事件的回调函数
channel->SetErrorCallback(std::bind(HandleError, channel));//为通信套接字设置错误事件的回调函数
channel->SetEventCallback(std::bind(HandleEvent, channel));//为通信套接字设置任意事件的回调函数
channel->EnableRead();//启动可读事件监控
}
int main()
{
Socket lis_sock;
lis_sock.CreateServer(8080);
Poller poller;
//为监听套接字,创建Channel进行事件的管理和处理
Channel channel(lis_sock.Fd(), &poller);
//设置回调函数:获取新链接,为新连接创建Channel,设置回调函数并且添加进行监控
channel.SetReadCallback(std::bind(Acceptor, &poller, &channel));
channel.EnableRead();
while(1)
{
std::vector actives;
poller.Poll(&actives);
for(auto & e : actives)
{
e->HandleEvent();
}
}
lis_sock.Close();
return 0;
}
-----------------------------------------------------------------------------------------
//tcp_cli.cc:客户端测试
#include "../source/server.hpp"
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8080, "127.0.0.1");
while (1)
{
std::string s = "我是bugubugu!";
cli_sock.Send(s.c_str(), s.size());
char buffer[1024] = {0};
cli_sock.Recv(buffer, 1023);
DBG_LOG("%s", buffer);
sleep(1);
}
return 0;
}
二、EventLoop
Poller模块只是EventLoop的子模块,EventLoop才是对所有描述符进行事件监控和处理的模块。
eventfd
1、认识eventfd:是一种事件通知机制,可以创建一个描述符用于实现事件通知,本质就是创建eventfd就会在内核创建一个计数器的结构:每次向eventfd中写入(write)一个数值来表示事件通知次数,也可以使用read进行数据读取,读取到的数据就是通知的次数。这里和信号量不一样,信号量是加一再减一;而对于eventfd,假设每次给eventfd写入一个1,就表示通知一次,连续write三次之后,从read读取的数字是3,读取之后计数清0。
在EventLoop模块中实现线程间的事件通知功能。
2、eventfd的使用:
#include
int eventfd(unsigned int initval, int flags);
功能:创建一个eventfd对象,实现事件通知
参数:initial:计算初值;flags:EFD_CLOEXEC -- 禁止进程复制;EFD_NONBLOCK -- 启动非阻塞属性
返回值:返回一个文件描述符用于操作(eventfd通过read/write/close进行操作,read/write进行IO时的数据只能是一个8字节数据)
EventLoop模块设计思想
1、eventloop和线程是一一对应的:即1 Thread = 1 Poller = N Channels,
- 1 个线程 (Thread) 拥有 1 个 Poller (EventLoop)。
- 1 个 Poller 管理 成千上万个 Channel。
- 结论:1 个线程同时管理成千上万个 Channel(连接)。
2、为什么是一一对应?EventLoop监控一个连接,这个连接一旦就绪,就要进行事件处理。但是如果这个描述符在多个线程中触发了事件,进行处理时,因为是多个线程对一个Fd进行操作,就会存在安全问题,因此需要将一个连接即一个Fd的事件监控、连接事件处理以及其他操作都放在同一个线程中进行。
3、如何保证一个连接的所有操作都在 EventLoop 对应的线程里?
并不需要将对连接的所有操作一律加入任务队列,而是应该采用 runInLoop 的机制进行判断:每当要执行某个操作(如 send或 close)时,先检查当前调用线程是否就是该 EventLoop 所在线程。
- 如果是(InLoop):说明不存在跨线程竞争,直接执行该操作,无需入队(零开销)。
- 如果不是(NotInLoop/外来任务):说明是其他线程(如业务线程池)想操作该连接,此时必须将操作封装成任务(Functor),加入任务队列并唤醒 EventLoop 线程,由 EventLoop 线程在稍后的阶段统一执行
举个例子:
(1)处理IO事件(InLoop):这里处理的是“来自客户的请求”(读数据、新连接)。这是由 epoll_wait 直接触发的,不经过任务队列,直接在Channel::HandleEvent 里执行。
(2)处理任务队列(NotInLoop/外来任务):
假设你的服务器收到了一个客户端 Client-zhangsan 的连接,服务器会创建唯一的一个对象Channel 来代表 Client-zhangsan ,这个Channel 被分配给了 IO 线程 B(Poller) 管理;现在,外人线程 A(比如计算线程) 正在处理Client-zhangsan 发来的请求(比如计算1+1=2 ),计算完了,线程 A 拿着结果 2,想发回给 Client-zhangsan。但是,外人线程 A 手里并没有一个属于它自己的“Channel 副本”,它想发数据,必须找到唯一的那个属于线程 B 的 Channel。
外人 A 把数据打包成一个任务, 把这个任务扔进 IO 线程 B 的任务队列;唤醒EventLoop 线程 B 醒来,由 B 自己调用send。
4、eventloop具体的处理流程:
(1) 等待事件:在线程中调用 epoll_wait监控描述符,等待事件就绪(此时线程可能阻塞)。
(2) 处理已就绪的 IO 事件:当描述符就绪(如可读/可写),立即在当前线程直接调用对应的事件处理函数HandleEvent()。(注意:这一步通常不进任务队列,因为这里本来就是 IO 线程自己,直接处理效率最高)
(3)执行任务队列(处理外来任务):IO 事件处理完毕后,检查任务队列。 如果有来自其他线程投递的任务(如业务线程计算完结果后的发送请求),则在当前线程依次取出并执行这些任务。
用一个图来表示就是

这样可以保证对于连接的所有操作,都在一个线程进行,不涉及安全问题(注意任务队列的入队和出队操作必须加互斥锁(Mutex))
接口设计功能
- 事件监控:使用Poller模块,有事件就绪则进行事件处理
- 执行任务队列中的任务:一个线程安全的任务队列
注意:有可能因为一直等待描述符IO事件就绪,导致执行线程阻塞,这时候任务队列中的任务将得不到执行,因此得有一个事件通知的东西,能够唤醒事件监控的阻塞;
当事件就绪,需要处理的时候,处理过程中,如果对连接要进行某些操作,这些操作必须在EventLoop对应的线程中执行,保证对连接的各项操作都是线程安全的。
1.如果执行的操作本就在线程中,不需要将操作压入队列了,可以直接执行
2.如果执行的操作不在线程中,才需要加入任务池,等到事件处理完了然后执行任务
代码分析
class EventLoop
{
private:
std::thread::id _thread_id; //当前线程id:判断操作是否要加入任务队列还是直接执行
Poller _poller;//进行所有描述符的事件监控
using Functor = std::function;
std::vector _tasks;//任务池
std::mutex _mutex;//实现任务池操作的线程安全
int _event_fd; //eventfd: 唤醒IO事件监控可能导致的阻塞
public:
void RunAllTask()
{
std::vector tasks;
{
std::unique_lock _lock(_mutex);
tasks.swap(_tasks);//共享资源
}
for(auto &f : tasks)//线程私有
f();
}
};
void RunAllTask() :用 task 变量 + swap 将共享资源转移为线程私有,有利于
(1)把持有锁的时间降到最低:假设把任务执行放到锁的作用域,_task 任务执行时间太长,在这段时间里其他线程想往 _task 塞任务,是不是就得一直等待锁,服务器效率低下。使用局部变量交换任务后,f() 就不是共享资源了,不影响其他线程对 _task 变量的访问。
(2)避免死锁:还是一样,如果全程加锁,当我执行任务A时,锁被 RunAllTask() 拿着,当A里面需要实现往_task塞任务函数时,这个函数也需要申请锁,就会造成 RunAllTask() 在等待A运行结束,而任务A里因为调用塞任务函数在等待 RunAllTask() 运行结束。
下面给的代码是EventLoop的部分实现代码
class EventLoop
{
private:
std::thread::id _thread_id; //当前EventLoop线程id:判断操作是否要加入任务队列还是直接执行
Poller _poller;//进行所有描述符的事件监控
using Functor = std::function;
std::vector _tasks;//任务池
std::mutex _mutex;//实现任务池操作的线程安全
int _event_fd; //eventfd: 唤醒IO事件监控可能导致的阻塞
Channel _event_channel;
public:
static int CreatEventFd()
{
int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if(fd < 0)
{
ERR_LOG("CreatEventFd fail");
abort();
}
return fd;
}
void ReadEventFd()
{
uint64_t val = 0;
int ret = read(_event_fd, &val, sizeof(val));
if (ret <= 0)
{
if (errno == EAGAIN || errno == EINTR)
{
return;
}
ERR_LOG("Read EventFd Fail");
abort();
}
}
void WakeUpEventFd()
{
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));
if (ret <= 0)
{
if (errno == EAGAIN || errno == EINTR)
{
return;
}
ERR_LOG("Write EventFd Fail");
abort();
}
}
public:
void RunAllTask()
{
std::vector tasks;
{
std::unique_lock _lock(_mutex);
tasks.swap(_tasks);
}
for(auto &f : tasks)
f();
}
EventLoop():_thread_id(std::this_thread::get_id()),
_event_fd(CreatEventFd()),
_event_channel(_event_fd, this)
{
//给eventfd添加可读事件回调函数,读取eventfd事件通知次数
_event_channel.SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));
//启动EventFd的读事件监控
_event_channel.EnableRead();
}
//事件监控-->就绪事件处理-->执行任务
void Start()
{
//事件监控
std::vector actives;
_poller.Poll(&actives);
//就绪事件处理
for(auto &e : actives)
{
e->HandleEvent();
}
//执行任务
RunAllTask();
}
//判断要执行的任务是否处于EventLoop线程,是就执行,不是就加入任务队列
void RunInLoop(const Functor & cb)
{
if(IsInLoop()) cb();
else QueueInLoop(cb);
}
//判断当前线程是否是EventLoop对应线程
bool IsInLoop()
{
return (std::this_thread::get_id() == _thread_id);
}
//将任务压入线程池
void QueueInLoop(const Functor & cb)
{
{
std::unique_lock lock(_mutex);
_tasks.push_back(cb);
}
//有可能没有事件就绪,事件监控阻塞着,需要一个事件通知的函数,触发eventfd可读事件
//给eventfd写一个数据触发
WakeUpEventFd();
}
//添加、修改描述符的事件监控
void UpdateEvent(Channel * channel)
{
_poller.UpdateEvent(channel);
}
//移除描述符的事件监控
void RemoveEvent(Channel * channel)
{
_poller.RemoveEvent(channel);
}
};
三、定时器模块的整合
整合思想
- timerfd:实现内核每隔一段时间,给进程一次超时事件(timerfd可读)
- timerwheel: 实现每次执行Runtimetask,都可以执行一波到期的定时任务
- 要实现一个完整的秒级定时器,就需要将这两个功能整合到一起:timerfd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timerwheel的runtimertask,执行一下所有的过期定时任务
而timefd的事件监控与触发,可以融合 EventLoop 来实现:当 EventLoop 的 poll 监控到线程往timefd 写入时,就会触发可读事件处理,即读取 timefd 超时次数和执行 timerwheel 的 runtimertask。所以TimeWheel 模块需要引入 EventLoop 对timefd 进行监控,引入channel 对timefd 设置可读事件回调函数已经开启可读事件监控。
其实只要是描述符:定时器timerfd,监听fd,连接fd,eventfd都需要和 eventloop(对Poller的封装)糅合在一起才能监控,并且需要channel管理事件:设置可读回调函数等等。
具体步骤
- 在TimerWheel的初始化阶段,会执行 CreateTimerFd() :创建和启动定时器。这里会调用 timerfd_settime (内核里的计时器就开始走字了,内核会往这个 timerfd 的文件缓冲区里写入数据),返回 timerfd。
- 创建 _timer_channel 绑定这个 fd 并进行管理:设置可读回调函数并且对这个 fd 开启监控,此时_timer_channel 已经通过 void Channel::Update() { _loop->UpdateEvent(this); } 将 timerfd 可读监控加入epoll 的监控里。
- 一旦poll 监控到 timerfd 有事件触发 (启动计时器那块就会触发可读事件),就会执行EvnetHandle(),即执行可读回调函数 ReadTimeFd() 和 RunTimerTask(),实现读取 timefd 超时次数和执行 timerwheel 的 RunTimerTask()。
注意:定时器中有_timers, _wheel成员,定时器信息和_wheel的操作有可能在多线程中进行,因此需要考虑线程安全问题。 如果不想加锁,那就把对定时器的所有操作,都放到一个线程中进行,对于 RunTimerTask() ,与TimerAdd、TimerRefresh、TimerCancel相比,它是eventloop线程本身的,不需要像其他三个函数是外部线程委托的进行判断。
区分同步异步
TimerWheel类:HasTimerTask 不能用 RunInLoop 是因为HasTimerTask 可能是外来线程给的业务,那我塞到RunInLoop只能是加入到任务队列,并不能保证马上执行,但是代码继续往下走,外来线程需要知道任务是否存在。解决方法就是我们自己控制 HasTimerTask 只能让eventloop线程调用,实现串行执行。
底层:同步是需求要立马答复比如线程A需要马上知道hastimer的返回值,而异步适合的需求是不需要马上执行任务,RunInLoop 的设计初衷就是用来处理那些“不需要立刻知道结果”的事情。
下面是把定时器 TimerFd 加入到 TimerWheel模块,实现一个完整的秒级定时器
using TaskFunc = std::function;
using ReleaseFunc = std::function;
// 定时任务
class TimerTask
{
private:
bool _cancel;
uint64_t _id; // 定时器任务对象ID
uint32_t _timeout; // 定时任务的超时时间
TaskFunc _task_cb; // 定时器任务对象要执行的定时任务
ReleaseFunc _release; // 用于删除TimerWheel(_timers)中保存的定时器对象信息
public:
TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &task_cb)
: _id(id), _timeout(timeout), _task_cb(task_cb), _cancel(false) {}
~TimerTask()
{
if (_cancel == false)
{
_task_cb();
}
_release();
}
void Cancel() { _cancel = true; }
void SetRelease(const ReleaseFunc &release) { _release = release; }
uint32_t DelayTime() { return _timeout; }
};
class TimerWheel
{
private:
using PtrTask = std::shared_ptr;
using WeakTask = std::weak_ptr;
int _capacity; // 最大延迟时间
std::vector> _wheel;
int _tick; // 当前秒针,走到哪里释放哪里的对象,相当于执行哪里的任务
std::unordered_map _timers; // 定时器
int _timerfd; // 定时器描述符:可读事件实现读取超时次数和执行定时任务
EventLoop *_loop;
std::unique_ptr _timer_channel;
private:
void RemoveTimer(uint64_t id)
{
auto pos = _timers.find(id);
if (pos != _timers.end())
{
_timers.erase(pos);
}
}
int CreatTimerFd()
{
int fd = timerfd_create(CLOCK_MONOTONIC, 0);
if (fd < 0)
{
ERR_LOG("timerfd_create error");
abort();
}
struct itimerspec * itime = new itimerspec;
//第一次超时时间
itime->it_value.tv_sec = 1;
itime->it_value.tv_nsec = 0;
//第一次之后超时时间
itime->it_interval.tv_sec = 1;
itime->it_interval.tv_nsec = 0;
//启动定时器
timerfd_settime(fd, 0, itime, nullptr);
return fd;
}
void ReadTimeFd()
{
uint64_t times;
int n = read(_timerfd, ×, 8);
if(n < 0)
{
ERR_LOG("read errpr");
abort();
}
}
void TimerAddInLoop(uint64_t id, uint32_t timeout, const TaskFunc &task_cb)
{
PtrTask pt(new TimerTask(id, timeout, task_cb)); // 这里不能用等号
int pos = (_tick + timeout) % _capacity;
_wheel[pos].push_back(pt);
// 添加管理信息和定义删除管理信息的函数
_timers[id] = WeakTask(pt);
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
}
void TimerRefreshInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return;
}
////通过保存的定时器对象_timers的weak_ptr构造一个shared_ptr出来,添加到轮子中
PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr
int timeout = pt->DelayTime();
int pos = (_tick + timeout) % _capacity;
_wheel[pos].push_back(pt);
}
void TimerCancelInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return;
}
PtrTask pt = it->second.lock();
pt->Cancel();
}
////这个函数应该每秒钟被执行一次,相当于秒针向后走了一步
//与TimerAdd、TimerRefresh、TimerCancel相比,它是eventloop线程本身的,不需要像其他三个业务一样进行判断
void RunTimerTask()
{
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理 定时器任务对象 的shared_ptr释放掉
}
//读取定时器内容,执行任务
void OnTime()
{
ReadTimeFd();
RunTimerTask();
}
public:
TimerWheel(EventLoop *loop) : _tick(0), _capacity(60), _wheel(_capacity), _loop(loop),
_timerfd(CreatTimerFd()), _timer_channel(new Channel(_timerfd, loop))
{
_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));//
_timer_channel->EnableRead();
}
//定时器中有_timers, _wheel成员,定时器信息和_wheel的操作有可能在多线程中进行,因此需要考虑线程安全问题
//如果不想加锁,那就把对定时器的所有操作,都放到一个线程中进行
// 添加定时任务
void TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &task_cb);
// 刷新/延迟定时任务
void TimerRefresh(uint64_t id);
//取消定时任务执行
void TimerCancel(uint64_t id);
//只能让loop线程自己执行这个接口(串行实现同步),不让其他线程调用
bool HasTimerTask(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return false;
}
return true;
}
};
注:我们需要将TimerWheel 加入EventLoop模块,让EventLoop提供添加定时任务、刷新定时任务、取消定时任务接口。在类TimerWheel里,添加定时任务、刷新定时任务、取消定时任务这里涉及对EventLoop 对象的成员函数访问,为避免编译报错,将它们的实现放在最后。
下面给出对EventLoop的修改代码
class EventLoop
{
private:
...
TimerWheel _timerwheel;
public:
void TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &task_cb)
{
_timerwheel.TimerAdd(id, timeout, task_cb);
}
// 刷新/延迟定时任务
void TimerRefresh(uint64_t id)
{
_timerwheel.TimerRefresh(id);
}
//取消定时任务执行
void TimerCancel(uint64_t id)
{
_timerwheel.TimerCancel(id);
}
bool HasTimerTask(uint64_t id)
{
return _timerwheel.HasTimerTask(id);
}
};
测试EventLoop 和 TimerWheel
将TimerWheel 加入EventLoop模块后进行联合调试,我们对非活跃连接的超时实现释放操作(//tcp_srv.cc:服务端测试 的HandleClose()函数),在获取到活跃连接后,需要先添加定时销毁任务,将任务Id 和 TimerWheel 的_timer 绑定,再启动通信 channel 的读事件监控。
原因:因为每次通信 channel 事件触发都要在 HandleEvent 里对定时销毁任务进行更新操作,如果先启动监控,立即有事件,那执行更新操作时是需要访问 _timer 数组的,那我此时拿着任务 Id 也找不到TimerTask任务指针,不就出现逻辑问题了吗?任务并没有被加入到 _wheel 和 _timer。
下面给出tcp_srv.cc修改部分
void HandleClose(Channel *channel)
{
DBG_LOG("%d",channel->Fd());
channel->Remove();//移除监控
delete channel;//释放Channel
}
void HandleEvent(Channel *channel, EventLoop *loop, uint64_t id)
{
loop->TimerRefresh(id);
}
void Acceptor(EventLoop *loop, Channel* lis_channel)
{
int newfd = accept(lis_channel->Fd(), nullptr, nullptr);
if(newfd < 0) return;
uint64_t task_id = rand() % 10000;
Channel* channel = new Channel(newfd, loop);
channel->SetReadCallback(std::bind(HandleRead, channel));//为通信套接字设置可读事件的回调函数
channel->SetWriteCallback(std::bind(HandleWrite, channel));//为通信套接字设置可写事件的回调函数
channel->SetCloseCallback(std::bind(HandleClose, channel));//为通信套接字关闭可读事件的回调函数
channel->SetErrorCallback(std::bind(HandleError, channel));//为通信套接字设置错误事件的回调函数
channel->SetEventCallback(std::bind(HandleEvent, channel, loop, task_id));//为通信套接字设置任意事件的回调函数
//10s后对非活跃连接进行释放
//释放任务的添加要放在监控之前,如果先开启监控,立即有事件,在刷新释放任务里找不到这个任务
loop->TimerAdd(task_id, 10, std::bind(HandleClose, channel));
channel->EnableRead();//启动可读事件监控
}
四、EventLoop模块简单流程图

//tcp_svr.cc
int main()
{
Socket lis_sock;
lis_sock.CreateServer(8080);
EventLoop loop;
//为监听套接字,创建Channel进行事件的管理和处理
Channel lst_channel(lis_sock.Fd(), &loop);
//设置回调函数:获取新链接,为新连接创建Channel,设置回调函数并且添加进行监控
lst_channel.SetReadCallback(std::bind(Acceptor, &loop, &lst_channel));
lst_channel.EnableRead();
while(1)
{
loop.Start();
}
lis_sock.Close();
return 0;
}
首先创建了一个监听套接字,再接着创建了一个loop 对象,这里需要明白loop 对象里面有什么:_poll 变量是进行监控描述符就绪事件的、_task 是存放外来线程的任务的任务池、_event_chanel是管理事件通知_event_fd 的、_timer_wheel就是一个定时器,用来执行定时任务的;我们创建了监听 lst_channel 为其设置Acceptor 可读回调函数,通过EnableRead() 将监听描述符挂到 _poller上,对监听描述符进行监听;当有新连接产生时,lst_channel的可读事件触发,执行Acceptor 函数:
void Acceptor(EventLoop *loop, Channel* lis_channel)
{
int newfd = accept(lis_channel->Fd(), nullptr, nullptr);
if(newfd < 0) return;
uint64_t task_id = rand() % 10000;
Channel* new_channel = new Channel(newfd, loop);
new_channel->SetReadCallback(std::bind(HandleRead, new_channel));//为通信套接字设置可读事件的回调函数
new_channel->SetWriteCallback(std::bind(HandleWrite, new_channel));//为通信套接字设置可写事件的回调函数
new_channel->SetCloseCallback(std::bind(HandleClose, new_channel));//为通信套接字关闭可读事件的回调函数
new_channel->SetErrorCallback(std::bind(HandleError, new_channel));//为通信套接字设置错误事件的回调函数
new_channel->SetEventCallback(std::bind(HandleEvent, new_channel, loop, task_id));//为通信套接字设置任意事件的回调函数
//10s后对非活跃连接进行释放
//释放任务的添加要放在监控之前,如果先开启监控,立即有事件,在刷新释放任务里找不到这个任务
loop->TimerAdd(task_id, 10, std::bind(HandleClose, new_channel));
new_channel->EnableRead();//启动可读事件监控
}
为新连接的描述符创建一个new_channel,设置可读可写、错误、任意事件回调函数,还有添加定时销毁任务(对规定时间内非活跃连接的释放),通过new_channel->EnableRead() 将new_channel的描述符挂到 _poller上,对通信描述符进行监听,当可读事件被触发时,HandleEvent 和 HandleRead 被执行:进行定时销毁任务的刷新和读取数据,假设一段时间后,可读事件未触发,任务未刷新,那么就会执行销毁任务将 new_channel 释放。
_evnet_channel创建后也会给eventfd添加可读事件回调函数(读取eventfd事件通知次数)并启动EventFd的读事件监控;
_timer_wheel的初始化也会添加可读事件回调函数(读取定时器内容,执行任务)启动_timer_fd 的读事件监控(timerfd_settime(fd, 0, itime, nullptr)会一秒触发一次可读事件)。









