最新资讯

  • 【仿Mudou库one thread per loop式并发服务器实现】SERVER服务器模块实现

【仿Mudou库one thread per loop式并发服务器实现】SERVER服务器模块实现

2026-01-28 17:02:05 栏目:最新资讯 3 阅读

SERVER服务器模块实现

  • 1. Buffer模块
  • 2. Socket模块
  • 3. Channel模块
  • 4. Poller模块
  • 5. EventLoop模块
    • 5.1 TimerQueue模块
    • 5.2 TimeWheel整合到EventLoop
    • 5.1 EventLoop与线程结合
    • 5.2 EventLoop线程池
  • 6. Connection模块
  • 7. Acceptor模块
  • 8. TcpServer模块

1. Buffer模块

Buffer模块:缓冲区模块
提供的功能:存储数据,取出数据

实现思想:

  1. 实现缓冲区得有一块内存空间,采用vector,vector底层其实使用的就是一个线性的内存空间。不用string的原因是因为,网络传输数据什么都有而string在读取遇 ‘’ 就停止了。
  2. 要素:
  • 默认的空间大小
  • 当前的读取数据位置
  • 当前的写入数据位置
  1. 操作:
    a. 写入数据:
    当前写入位置指向哪里,就从哪里开始写入。
    如果后续剩余空闲空间不够了,考虑整体缓冲区空闲空间是否足够(因为读位置也会向后偏移,前边有可能会有空闲空间)。
    足够:将数据移动到起始位置即可
    不够:扩容,从当前写位置开始扩容足够大小
    数据一旦写入成功,当前写位置,就要向后偏移
    b. 读取数据:
    当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读
    可读数据大小:当前写入位置,减去当前读取位置
// 缓存区Buffer模块
#define BUFFER_SIZE 1024
class Buffer
{
private:
    std::vector<char> _buffer; // 使⽤vector进⾏内存空间管理
    uint64_t _read_idx;        // 读偏移
    uint64_t _write_idx;       // 写偏移

public:
    Buffer() : _read_idx(0), _write_idx(0), _buffer(BUFFER_SIZE) {}

    // 获得数组起始位置,注意并不是对象的位置
    // void* Begin()
    char *Begin()
    {
        return &(*_buffer.begin());
    }

    // 获取当前写⼊起始地址, _buffer的空间起始地址,加上写偏移量
    // void* ReadPos()
    char *WritePosition()
    {
        return Begin() + _write_idx;
    }

    // 获取当前读取起始地址
    // void* WritePos()
    char *ReadPosition()
    {
        return Begin() + _read_idx;
    }

    // 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移
    uint64_t TailIdleSize()
    {
        return _buffer.size() - _write_idx;
        // 这里也可以用_buffer.size() - _write_idx
        // 因为vector构造使用的函数即开辟空间也初始化了
        // 也就是说size()和capacity()是一样的大小
        // 后续也用resize()会调整空间
    }

    // 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间
    uint64_t HeadIdleSize()
    {
        return _read_idx;
    }

    // 获取可读数据⼤⼩ = 写偏移 - 读偏移
    uint64_t ReadAbleSize()
    {
        return _write_idx - _read_idx;
    }

    // 确保可写空间⾜够(整体空闲空间够了就移动数据,否则就扩容)
    void EnsureWriteSpace(uint64_t len)
    {
        // 如果末尾空闲空间⼤⼩⾜够,直接返回
        if (TailIdleSize() >= len)
            return;
        // 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够, 够了就将数据移动到起始位置
        if (HeadIdleSize() + TailIdleSize() >= len)
        {
            // 将数据移动到起始位置
			// 把当前数据大小先保存起来
            uint64_t size = ReadAbleSize();     
            // 把可读数据拷贝到起始位置                     
            std::copy(ReadPosition(), ReadPosition() + size, Begin()); 
            // 将读偏移归0
            _read_idx = 0;   
            // 将写位置置为可读数据大小,因为当前的可读数据大小就是写偏移量                                        
            _write_idx = size;                                       
        }
        else
        {
            // 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可
            _buffer.resize(_write_idx + len);
        }
    }

    // 将读偏移向后移动
    void MoveReadOffset(uint64_t len)
    {
        if (len == 0) return; 
        _read_idx += len;
    }

    // 将写偏移向后移动
    void MoveWriteOffset(uint64_t len)
    {
        //向后移动的大小,必须小于当前后边的空闲空间大小
        assert(len <= TailIdleSize());
        _write_idx += len;
    }

    // 写入数据
    void Write(const void *date, uint64_t len)
    {
        // 1. 保证有足够空间,2. 拷⻉数据进去
        if (len == 0)
            return;
        EnsureWriteSpace(len);
        // void* 不能解引用 和 ++
        const char *d = (const char *)date;
        //copy是一个模板函数,类型需要匹配,因此上面读写位置返回都是char*
        std::copy(d, d + len, WritePosition());
    }

    void WriteAndPush(const void *date, uint64_t len)
    {
        Write(date, len);
        MoveWriteOffset(len);
    }

    void WriteString(const std::string &data)
    {
        return Write(data.c_str(), data.size());
    }

    void WriteStringAndPush(const std::string &data)
    {
        WriteString(data);
        MoveWriteOffset(data.size());
    }

    void WriteBuffer(Buffer &data)
    {
        return Write(data.ReadPosition(), data.ReadAbleSize());
    }

    void WriteBufferAndPush(Buffer &data)
    {
        WriteBuffer(data);
        MoveWriteOffset(data.ReadAbleSize());
    }

    // 读取数据
    void Read(void *buff, uint64_t len)
    {
        // 要求要获取的数据大小必须⼩于可读数据大小
        assert(len <= ReadAbleSize());
        std::copy(ReadPosition(), ReadPosition() + len, (char *)buff);
    }

    void ReadAndPop(void *buff, uint64_t len)
    {
        Read(buff, len);
        MoveReadOffset(len);
    }

    std::string ReadString(uint64_t len)
    {
        //要求要获取的数据大小必须小于可读数据大小
        assert(len <= ReadAbleSize());
        std::string str;
        str.resize(len);
        // str.c_str()返回的是const char*
        Read(&str[0], len);
        return str;
    }

    std::string ReadStringAndPop(uint64_t len)
    {
        assert(len <= ReadAbleSize());
        std::string str = ReadString(len);
        MoveReadOffset(len);
        return str;
    }

    char *FindCRLE()
    {
        char *p = (char *)memchr(ReadPosition(), '
', ReadAbleSize());
        return p;
    }

    // 通常获取⼀⾏数据,这种情况针对是
    std::string GetLine()
    {
        char *p = FindCRLE();
        if (p == nullptr)
            return "";
        return ReadString(p - ReadPosition() + 1); //+是为了把
也读出来
    }

    std::string GetLineAndPop()
    {
        std::string str = GetLine();
        MoveReadOffset(str.size());
        return str;
    }

    void Clear()
    {
        _read_idx = 0;
        _write_idx = 0;
    }
};

这里补充一个按照日志等级打印的模块,方便后面调试。顺便把这个项目用到的所有头文件都写出来。

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include 
#include 
#include 

// 日志打印
#define NORMAL 0
#define DEBUG 1
#define WARRING 2
#define FATAL 3

// 根据等级打印日志
#define LOG(level, format, ...)                                                               
    {                                                                                         
        if (level >= FATAL)                                                                   
        {                                                                                     
            time_t t = time(NULL);                                                            
            struct tm *m = localtime(&t);                                                     
            char ts[32] = {0};                                                                
            strftime(ts, 31, "%H:%M:%S", m);                                                  
            fprintf(stdout, "[%p %s %s:%d]" format "
", (void*)pthread_self(),ts, __FILE__, __LINE__, ##__VA_ARGS__); 
        }                                                                                     
    }

// ... 表示可变参数,可以传0个、1个、多个
// _VA_ARGS__ 将...的内容原封不动抄到__VA_ARGS__位置
// ##__VA_ARGS__ ...没有参数,就移除这个_VA_ARGS_,相对于没有这个东西

#define LOG_NORMAL(format, ...) LOG(NORMAL, format, ##__VA_ARGS__)
#define LOG_DEBUG(format, ...) LOG(DEBUG, format, ##__VA_ARGS__)
#define LOG_WARRING(format, ...) LOG(WARRING, format, ##__VA_ARGS__)
#define LOG_FATAL(format, ...) LOG(FATAL, format, ##__VA_ARGS__)

2. Socket模块

Socket模块:对socket套接字操作进行分资

功能:

  1. 创建套接字
  2. 绑定地址信息
  3. 开始监听
  4. 向服务器发起连接
  5. 获取新连接
  6. 接收数据
  7. 发送数据
  8. 关闭套接字
  9. 创建一个服务端连接
  10. 创建一个客户端连接
  11. 设置套接字选项—开启地址端口重用
  12. 设置套接字阻塞属性-设置为非阻塞
// 套接字Socket模块
#define LISTEN_SIZE 1024

class Socket
{
private:
    int _sockfd;

public:
    Socket() : _sockfd(-1) {}
    Socket(int fd) : _sockfd(fd) {}
    ~Socket() { Close(); }
    int Fd() { return _sockfd; }
    // 创建套接字
    bool Create()
    {
        _sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (_sockfd < 0)
        {
            LOG_FATAL("CREATE SOCKET FAILED!!");
            return false;
        }
        return true;
    }
    // 绑定ip+port
    bool Bind(const uint16_t &port)
    {
        struct sockaddr_in peer;
        memset(&peer, 0, sizeof peer);
        peer.sin_family = AF_INET;
        peer.sin_port = htons(port);
        peer.sin_addr.s_addr = INADDR_ANY; // 任意地址绑定
        if (bind(_sockfd, (sockaddr *)&peer, sizeof peer) < 0)
        {
            LOG_FATAL("BIND ADDRESS FAILED!");
            return false;
        }
        return true;
    }
    // 开始监听
    bool Listen(int backlog = LISTEN_SIZE)
    {
        if (listen(_sockfd, backlog) < 0)
        {
            LOG_FATAL("SOCKET LISTEN FAILED!");
            return false;
        }
        return true;
    }
    // 向服务器发起连接
    bool Connect(const uint16_t &port, const std::string ip)
    {
        struct sockaddr_in peer;
        memset(&peer, 0, sizeof peer);
        peer.sin_family = AF_INET;
        peer.sin_port = htons(port);
        peer.sin_addr.s_addr = inet_addr(ip.c_str());
        if (connect(_sockfd, (sockaddr *)&peer, sizeof peer) < 0)
        {
            LOG_FATAL("CONNECT SERVER FAILED!");
            return false;
        }
        return true;
    }
    // 获取连接
    int Accpet()
    {
        // 不关注是那个ip和port发起的连接
        int newfd = accept(_sockfd, nullptr, nullptr);
        if (newfd < 0)
        {
            LOG_DEBUG("SOCKET ACCEPT FAILED!");
            return -1;
        }
        return newfd;
    }
    // 阻塞读取数据
    ssize_t Recv(void *buffer, size_t len, int flag = 0)
    {
        ssize_t ret = recv(_sockfd, buffer, len, flag);
        if (ret <= 0)
        {
            // EAGAIN 当前socket的接收缓冲区中没有数据了,在⾮阻塞的情况下才会有这个错误
            // EINTR 表⽰当前socket的阻塞等待,被信号打断了,
            if (errno == EAGAIN || errno == EINTR)
            {
                return 0; // 表⽰这次接收没有接收到数据
            }
            LOG_FATAL("SOCKET RECV FAILED!");
            return -1;
        }
        return ret; // 实际接收的数据长度
    }
    // 非阻塞读取数据
    ssize_t NonBlockRecv(void *buffer, size_t len)
    {
        return Recv(buffer, len, MSG_DONTWAIT); // MSG_DONTWAIT 表⽰当前接收为非阻塞
    }
    // 发出数据
    ssize_t Send(const void *buffer, size_t len, int flag = 0)
    {
        ssize_t ret = send(_sockfd, buffer, len, flag);
        if (ret < 0)
        {
            if (errno == EAGAIN || errno == EINTR)
            {
                return 0;
            }
            LOG_FATAL("SOCKET SEND FAILED!");
            return -1;
        }
        return ret; // 实际发送的数据长度
    }
    // 非阻塞发出数据
    ssize_t NonBlockSend(const void *buffer, size_t len)
    {
        if(len == 0) return 0;
        return Send(buffer, len, MSG_DONTWAIT); // MSG_DONTWAIT 表⽰当前接收为非阻塞
    }
    // 关闭套接字
    void Close()
    {
        if (_sockfd != -1)
        {
            close(_sockfd);
            _sockfd = -1;
        }
    }
    // 创建一个服务端连接
    bool CreateServer(const uint16_t &port,bool block_flag = false)
    {
        // 1.创建套接字
        if (Create() == false)
            return false;
        // 2.设置非阻塞
        if(block_flag) NonBlock();
        // 3.绑定
        if (Bind(port) == false)
            return false;
        // 4.监听
        if (Listen() == false)
            return false;
        // 5.开启地址端口复用
        ReuseAddress();

        return true;
    }
    // 创建一个客服端连接
    bool CreateClient(const uint16_t &port, const std::string ip)
    {
        // 1.创建套接字
        if (Create() == false)
            return false;
        // 2.发起连接
        if (Connect(port, ip) == false)
            return false;

        return true;
    }
    // 设置套接字选项--开启地址端口复用
    void ReuseAddress()
    {
        int opt = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof opt);
    }
    // 设置套接字属性 -- 非阻塞
    void NonBlock()
    {
        int fl = fcntl(_sockfd, F_GETFL, 0);
        fcntl(_sockfd, F_SETFL, fl | O_NONBLOCK);
    }
};

3. Channel模块


Channel类设计
目的:对描述符的监控事件管理

  1. 成员:
    因为后边使用epoll进行事件监控,
    EPOLLIN 可读
    EPOLLOUT 可写
    EPOLLRDHUP 连接断开
    EPOLLPRI 优先数据
    EPOLLERR 出错了
    EPOLLHUP 挂断
    而以上的事件都是一个数值uint32_t进行保存,要进行事件管理,就需要有一个uint32t类型的成员保存当前需要监控的事件。
    事件处理这里,因为有五种事件需要处理,就需要五个回调函数。

功能:

  1. 事件管理:
    描述符是否可读
    描述符是否可写
    对描述符监控可读
    对描述符监控可写
    解除可读事件监控
    解除可写事件监控
    解除所有事件监控

  2. 事件触发后的处理的管理
    a.需要处理的事件:可读,可写,挂断,错误,任意
    b.事件处理的回调函数

// 描述符事件管理Channel模块
class EventLoop;

class Channel
{
private:

    int _fd; //当前需要监控的文件描述符
    EventLoop *_loop; //放在自己所在的EventLoop监控
    uint32_t _events;  // 当前需要监控的事件
    uint32_t _revents; // 当前连接触发的事件
    using EventCallback = std::function<void()>;
    EventCallback _read_callback;  // 可读事件被触发的回调函数
    EventCallback _write_callback; // 可写事件被触发的回调函数
    EventCallback _error_callback; // 错误事件被触发的回调函数
    EventCallback _close_callback; // 连接断开事件被触发的回调函数
    EventCallback _event_callback; // 任意事件被触发的回调函数

public:
    // Channel(Poller *poller,int fd) : _fd(fd), _events(0), _revents(0), _poller(poller) {}
    Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}
    int Fd()
    {
        return _fd;
    }
    uint32_t Events() // 获得想要监控的事情
    {
        return _events;
    }
    void SetREvents(uint32_t events) // 设置实际就绪的事情
    {
        _revents = events;
    }
    void SetReadCallback(const EventCallback &cb)
    {
        _read_callback = cb;
    }
    void SetWriteCallback(const EventCallback &cb)
    {
        _write_callback = cb;
    }
    void SetErrorCallback(const EventCallback &cb)
    {
        _error_callback = cb;
    }
    void SetCloseCallback(const EventCallback &cb)
    {
        _close_callback = cb;
    }
    void SetEventCallback(const EventCallback &cb)
    {
        _event_callback = cb;
    }

    // 当前是否监控了可读
    bool ReadAble()
    {
        return _events & EPOLLIN;
    }
    // 当前是否监控了可写
    bool WriteAble()
    {
        return _events & EPOLLOUT;
    }
    // 启动读事件监控
    void EnableRead()
    {
        _events |= EPOLLIN;
        Update();
    }
    // 启动写事件监控
    void EnableWrite()
    {
        _events |= EPOLLOUT;
        Update();
    }
    // 关闭读事件监控
    void DisableRead()
    {
        _events &= ~EPOLLIN;
        Update();
    }
    // 关闭写事件监控
    void DisableWrite()
    {
        _events &= ~EPOLLOUT;
        Update();
    }
    // 关闭所有事件监控,文件描述符还在红黑树上,但是没有事件让它监控
    void DisableAll()
    {
        _events = 0;
        Update();
    }
    // 移除监控,文件描述符从红黑树中删除
    void Remove();
    // 添加到监控
    void Update();

    // 事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事情如何处理由调用它的人决定
    void HandleEvent()
    {
        // 读事件发现连接出现问题,是不去释放连接的,因为我们还看看是否还有数据待发送
        // 等把数据都发送完了,或者发送出错了,再去关闭连接
        // 写/出错/挂断 都会关闭连接,我们只需要关闭一次就好了
		
		//关于把任意事件回调放在最后,在测试阶段会有说明。
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            // if (_event_callback)
            //     _event_callback();
            if (_read_callback)
                _read_callback();
        }

        // 有可能会释放连接的操作事件,⼀次只处理⼀个
        if (_revents & EPOLLOUT)
        {       
            // if (_event_callback)
            //     _event_callback();
            if (_write_callback)
                _write_callback();
        }
        else if (_revents & EPOLLERR)
        {
            // if (_event_callback)
            //     _event_callback();
            if (_error_callback)
                _error_callback();
        }
        else if (_revents & EPOLLHUP)
        {
            // if (_event_callback)
            //     _event_callback();
            if (_close_callback)
                _close_callback();
        }

        if (_event_callback)
        {
            _event_callback();
        }
    }
};

//这里先不做接释,等到了EventLoop在细说。
void Channel::Remove()
{
    return _loop->RemoveEvent(this);
}

void Channel::Update()
{
    return _loop->UpdateEvent(this);
}

客户端异常退出,比如断电断网,客户端底层操作系统没有办法及时服务器进行四次挥手,因此服务器也收不到任何信息,除非服务器主动去读或者写,客户端会给服务器发送一个RST,服务器读写的返回值都是-1,错误码设置为ECOMNNRESET,并且在epoll模型下的服务器会触发EPOLLERR和EPOLLHUOP事件。

客户端正常调用close或者ctrl+c终止进程,客户端操作系统有机会和服务器进行四次挥手,read返回值是0,如果发送缓存区还有数据可能首次发送回成功,第二次再次发送返回值是-1,并且会触发SIGPIPE信号,如果服务器不对这个信号做处理,就会导致服务器奔溃退出。因此我们还需要对这个信号进行特殊处理。

/*避免服务器因为给断开连接的客⼾端进⾏send触发异常导致程序崩溃,因此忽略SIGPIPE信号*/
/*定义静态全局是为了保证构造函数中的信号忽略处理能够在程序启动阶段就被直接执⾏*/
class NetWork
{
    public:
        NetWork()
        {
            LOG_DEBUG("SIGPIPE INIT");
            signal(SIGPIPE,SIG_IGN);
        }
};

static NetWork nw;

4. Poller模块

Poller模块:描述符IO事件监控模块
意义:通过epoll实现对描述符的lO事件监控
功能:

  1. 添加/修改描述符的事件监控(不存在则添加,存在则修改)
  2. 移除描述符的事件监控

封装思想:

  1. 必须拥有一个epoll的操作句柄
  2. 拥有一个structepoll_event结构数组,监控时保存所有的活跃事件
  3. 使用hash表管理描述符与描述符对应的事件管理Channel对象

逻辑流程:

  1. 对描述符进行监控,通过Channel才能知道描述符需要监控什么事件
  2. 当描述符就绪了,通过描述符在hash表中找到对应的Channel(得到了Channel才能知道什么事件如何处理)当描述符就绪了,返回就绪描述符对应的Channel
// 描述符I/O事件监控Poller模块
#define MAX_EPOLLEVEVTS 1024
class Poller
{
private:
    int _epfd;
    struct epoll_event _evs[MAX_EPOLLEVEVTS];
    std::unordered_map<int, Channel *> _channels;

private:
    // 对epoll直接操作
    void Update(Channel *channel, int op)
    {
        int fd = channel->Fd();
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = channel->Events();
        int ret = epoll_ctl(_epfd, op, fd, &ev);
        if (ret < 0)
        {
            LOG_WARRING("EPOLLCTL FAILED!");
        }
        return;
    }

public:
    Poller()
    {
        _epfd = epoll_create(MAX_EPOLLEVEVTS);
        if (_epfd < 0)
        {
            LOG_FATAL("EPOLL CREATE FAILED!");
            abort();//退出程序
        }
    }
    // 添加/修改监控事件
    void UpdateEvent(Channel *channel)
    {
        auto it = _channels.find(channel->Fd());
        if (it == _channels.end())
        {
            // 不存在则添加
            _channels.insert(std::make_pair(channel->Fd(), channel));
            return Update(channel, EPOLL_CTL_ADD);
        }
        // 存在则修改
        return Update(channel, EPOLL_CTL_MOD);
    }
    // 移除监控
    void RemoveEvent(Channel *channel)
    {
        auto it = _channels.find(channel->Fd());
        if (it != _channels.end())
        {
            _channels.erase(it);
        }
        Update(channel, EPOLL_CTL_DEL);
    }

    // 开始监控,返回就绪事件
    void Poll(std::vector<Channel *> *active)
    {
        int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVEVTS, -1);
        if (nfds < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            LOG_FATAL("EPOLL WAIT ERROR:%s
", strerror(errno));
            abort();//退出程序
        }

        for (int i = 0; i < nfds; ++i)
        {
            auto it = _channels.find(_evs[i].data.fd);
            assert(it != _channels.end());
            it->second->SetREvents(_evs[i].events); // 设置实际就绪的事件
            active->push_back(it->second);
        }
        return;
    }
};

5. EventLoop模块

首先要先介绍一下eventfd,eventfd是一种事件通知机制。创建一个描述符用于实现事件通知。eventfd本质在内核里边管理的就是一个计数器。创建eventfd就会在内核中创建一个计数器(结构)。

每当向evenfd中写入一个数值—用于表示事件通知次数,可以使用read进行数据的读取,读取到的数据就是通知的次数。

假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,
再去read读取出来的数字就是3,读取之后计数清0。

用处:在EventLoop模块中实现线程间的事件通知功能

#include
#include
#include
#include


/*int eventfd(unsigned int initval, int flags);
功能:创建一个eventfd对象,实现事件通知
参数:
	initval:计数初值
	flags:
	EFD_CLOEXEC-禁止进程复制
	EFDNONBLOCK-启动非阻塞属性
返回值:返回一个文件描述符用于操作
eventfd也是通过read/write/close进行操作的。
注意点:read&write进行IO的时候数据只能是一个8字节数据*/

int main()
{
    int evfd = eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK);
    if(evfd < 0)
    {
        std::cout<<"EVENTFD FAIL!"<<std::endl;
        abort();
    }
    uint64_t len = 1;
    int ret = write(evfd,&len,sizeof len);
    ret = write(evfd,&len,sizeof len);
    ret = write(evfd,&len,sizeof len);
    if(ret < 0)
    {
        std::cout<<"WRITE FAIL!"<<std::endl;
        abort();
    }

    uint64_t val;
    ret = read(evfd,&val,sizeof val);
    if(ret < 0)
    {
        std::cout<<"WRITE FAIL!"<<std::endl;
        abort();
    }
    std::cout<<val<<std::endl;
    return 0;
}

EventLoop:进行事件监控,以及事件处理的模块
关键点:这个模块与线程是一一对应关联的,一个EventLoop对应一个线程

监控了一个连接,而这个连接一旦就绪,就要进行事件处理。但是如果这个描述符,在多个EventLoop线程中都触发了事件,然后进行处理,就会存在线程安全问题。就比如说即时通信,当某个EventLoop管理的连接触发了读事件,然后要把这条消息发送给所有在线的连接,此时就会涉及到多个EventLoop线程,就有线程安全的问题。如果不想加锁怎么办,毕竟加锁解锁有一定的消耗。不加锁怎么保证多线程之间线程安全问题?

我们可以将一个EventLoop管理所有连接的事件监控,以及连接事件处理,以及其他操作都放在同一个线程中进行!

如何保证一个连接的所有操作都在EventLoop对应的线程中?

解决方案:给EventLoop模块中,添加一个任务队列

对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列中。

EventLoop处理流程:

  1. 在线程中对描述符进行事件监控
  2. 有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中)
  3. 所有的就绪事件处理完了,这时候再去将任务队列中的所有任务一一执行

这样能够保证对于连接的所有操作,都是在一个线程中进行的,不涉及线程安全问题但是对于任务队列的操作有线程安全的问题,只需要给task的操作加一把锁即可。

这里还有一个注意点:当事件就绪,需要处理的时候,处理过程中,如果对连接要进行某些操作:这些操作必须在EventLoop对应的线程中执行,保证对连接的各项操作都是线程安全的。

  1. 如果执行的操作本就在线程中,不需要将操作压入队列了,可以直接执行
  2. 如果执行的操作不再线程中,才需要加入任务池,等到事件处理完了然后执行任务

上面是整个项目最核心的知识点。

EventLoop模块的成员:

  1. 事件监控
    使用Poller模块,有事件就绪则进行事件处理
  2. 执行任务队列中的任务
    一个线程安全的任务队列

注意点:

因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此得有一个事件通知的东西,能够唤醒事件监控的阻塞,上面eventfd在这里就派上用场了。

// EvevtLoop模块,对事件进行监控,处理

class EventLoop
{
private:
    using Functor = std::function<void()>;
    std::thread::id _thread_id;  // EvevtLoop对应的线程ID
    int _event_fd;               // eventfd唤醒IO事件监控有可能导致的阻塞
    Channel _event_channel;      // 对_event_fd做事件管理
    Poller _poller;              // 对所有事件进行监控
    std::vector<Functor> _tasks; // 任务池
    std::mutex _mutex;           // 实现任务池操作的线程安全

public:
    // 执行任务池中所有任务
    void RunAllTask()
    {
        // 为了不让每次拿任务都加锁解锁,因此一次把任务都拿出来
        // 这也是不用队列做任务池的原因
        std::vector<Functor> functor;
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.swap(functor);
        }
        // 依次执行任务
        for (auto &f : functor)
        {
            f();
        }
    }

    static int CreateEventFd()
    {
        int efd = eventfd(0, O_CLOEXEC | O_NONBLOCK);
        if (efd < 0)
        {
            LOG_FATAL("CREATE EVENTFD FAILED!!");
            abort();
        }
        return efd;
    }

    void ReadEvent()
    {
        uint64_t res = 0;
        int ret = read(_event_fd, &res, sizeof res);
        if (ret <= 0)
        {
            if (errno == EINTR || errno == EWOULDBLOCK)
            {
                return;
            }
            LOG_FATAL("READ EVENTFD FAILED!!");
            abort();
        }
    }

    void WeakUpEventfd()
    {
        uint64_t val = 1;
        int ret = write(_event_fd, &val, sizeof val);
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            LOG_FATAL("WRITE EVENTFD FAILED!!");
            abort();
        }
    }

public:
    EventLoop()
        : _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(this, _event_fd), _time_wheel(this)
    {
        // 给eventfd添加可读事件的回调,就绪了,读取eventfd事件通知次数
        _event_channel.SetReadCallback(std::bind(&EventLoop::ReadEvent, this));
        // 启动eventfd读事件监控
        _event_channel.EnableRead();
        //std::cout<<"_event_fd: "<<_event_fd<
    };

    // 1.事件监控 2.就绪事件处理 3.执行任务
    void Statr()
    {
        while(1)
        {
            // 1.事件监控
            std::vector<Channel *> active;
            _poller.Poll(&active);
            // 2.就绪事件处理
            for (auto &channel : active)
            {
                channel->HandleEvent();
            }
            // 3.执行任务
            RunAllTask();
        }
    }

    // 判断当前线程是否是EventLoop对应的线程
    bool IsInLoop()
    {
        return _thread_id == std::this_thread::get_id();
    }

    // 判断将要执行的任务是否处于当前线程,如果是则执行,不是则压入队列
    void RunInLoop(const Functor &cb)
    {
        if (IsInLoop())
        {
            return cb();
        }
        return QueueInLoop(cb);
    }

    void AssertInLoop() 
    {
        assert(_thread_id == std::this_thread::get_id());
    }

    // 将操作压入任务池
    void QueueInLoop(const Functor &cb)
    {
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.push_back(cb);
        }
        // 唤醒有可能因为没有事件就绪,而导致的epoll阻塞
        // 其实就是给eventfd写入一个数据,eventfd就会触发可读事件
        WeakUpEventfd();
    }

    // 添加/修改描述符的事件监控
    void UpdateEvent(Channel *channel)
    {
        _poller.UpdateEvent(channel);
    }

    // 移除描述符监控
    void RemoveEvent(Channel *channel)
    {
        _poller.RemoveEvent(channel);
    }
};

5.1 TimerQueue模块

一个EventLoop模块里面也有一个TimerQueue对象用于定时任务的管理,里面放着每个连接的非活跃超时销毁的任务。这里有个问题,如何让整个程序启动之后,时间轮一秒走一步呢?

可以将时间轮和Linux下的定时器time_create和time_settime整合在一起。

定时器模块的整合:

timerfd:实现内核每隔一段时间,给进程一次超时事件(timerfd可读)
timerwheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务

要实现一个完整的秒级定时器,就需要将这两个功能整合到一起

timerfd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timerwheel的runtimertask,执行一下所有的过期定时任务。

// 定时器模块
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;

class TimerTask
{
private:
    uint64_t _id;      // 定时器任务对象ID
    uint32_t _timeout; // 定时任务的超时时间
    bool _cancel;      // 定时任务是否取消,false不取消,true取消
    TaskFunc _task_cb;      // 定时器对象要执行的定时任务
    ReleaseFunc _release_cb;   /// 用于删除TimerWheel中保存的定时器对象信息
   

public:
    TimerTask(uint64_t id, uint32_t time,const TaskFunc& cb) : _id(id), _timeout(time), _task_cb(cb), _cancel(false) {}
    ~TimerTask()
    {
        if (_cancel == false)
        {
            _task_cb();
        }
        _release_cb();
    }
    uint32_t DelayTime()
    {
        return _timeout;
    }
    void SetRelease(const ReleaseFunc &rb)
    {
        _release_cb = rb;
    }
    void Cancel()
    {
        _cancel = true;
    }
};


class TimeWheel
{
private:
    using WeakPtr = std::weak_ptr<TimerTask>;
    using SharedPtr = std::shared_ptr<TimerTask>;
    int _tick;                                  // 滴答指针,当前的秒,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务
    int _capacity;                              // 表盘最大数量---其实就是最大延迟时间
    std::vector<std::vector<SharedPtr>> _wheel; // 时间轮
    std::unordered_map<uint64_t, WeakPtr> _timers; // 通过id找到任务对象,方便后续刷新任务
    // unordered_map第二个参数不能用shared_ptr,如果还用用shared_ptr去指向原始对象,那么就和之前用shared_ptr指向原始对象
    // 而产生的引用计数根本就不是同一个引用计数,这样如果unordered_map第二个参数引用计数是1,而其他指向它引用计数是2
    // 万一unordered_map第二个参数shared_ptr被释放了,这个原始对象就会被释放,这是由问题的.

    // 如何保证定时器的滴答指针,在程序启动之后,每秒走一次
    // 与linux下得到定时器time_create和time_settime结合一起用
    // time_settime超时时间间隔设置为1秒,每秒都会给timefd写超时次数
    // 用EventLoop把timefd管理起来,启动读事件监控,一写读就绪,然后就让滴答指针往后走一步
    EventLoop *_loop;
    int _timerfd;             // 定时器描述符--可读事件回调就是读取计数器,执行定时任务
    Channel _timerfd_channel; //

    // 释放时间轮上任务对象
    void RemoveTimer(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return; 
        }
        _timers.erase(it);
    }

    static int CreateTimerFd()
    {
        int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
        if (timerfd < 0)
        {
            LOG_FATAL("CREATE TIMEFD FAILED!!");
            abort();
        }

        struct itimerspec item;
        item.it_value.tv_sec = 1;
        item.it_value.tv_nsec = 0; // 第一次超时时间
        item.it_interval.tv_sec = 1;
        item.it_interval.tv_nsec = 0; // 第⼀次之后的超时间隔时间
        timerfd_settime(timerfd, 0, &item, nullptr);
        return timerfd;
    }

    int ReadTimeFd()
    {
        //有可能因为其他描述符的事件处理花费事件⽐较⻓,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次
        //read读取到的数据times就是从上⼀次read之后超时的次数
        uint64_t times = 0;
        int ret = read(_timerfd, &times, 8);
        if (ret < 0)
        {
            LOG_FATAL("READ TIMERFD FAILED!!");
            abort();
        }
        //std::cout<<"cnt:"<
        return times;
    }

    // 这个函数应该每秒钟被执行一次,相当于秒针向后走了一步
    void RunTimerTask()
    {
        _tick = (_tick + 1) % _capacity;
        // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉
        _wheel[_tick].clear(); // 执行任务,释放shared_ptr指针,当引用计数到0,调用Task析构,执行定时任务
    }

    void OneTime()
    {
        //根据实际超时的次数,执⾏对应的超时任务
        int times = ReadTimeFd();
        for (int i = 0; i < times; ++i)
        {
            RunTimerTask();
        }
    }

    // 设置定时任务
    void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb)
    {
        SharedPtr spt(new TimerTask(id, delay, cb));
        spt->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id));
        int pos = (_tick + delay) % _capacity;
        _wheel[pos].push_back(spt);
        _timers[id] = WeakPtr(spt);
    }

    // 刷新/延迟定时任务
    void TimerRefreshInLoop(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return;//没找着定时任务,没法刷新,没法延迟
        }
        // spt是临时对象,出栈后会自动销毁,不会增加引用计数
        SharedPtr spt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr
        int delaytime = spt->DelayTime();
        int pos = (_tick + delaytime) % _capacity;
        _wheel[pos].push_back(spt);
    }

    // 取消定时任务,等时间到了,释放任务对象,但是析构里面的超时任务不去执行
    void TimerCancelInLoop(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return;//没找着定时任务,没法刷新,没法延迟
        }
        // spt是临时对象,出栈后会自动销毁,不会增加引用计数
        SharedPtr spt = it->second.lock();
        //如果没有if判断,超时先去执行的释放任务,然后才删除unordered_map管理的对象,
        //释放任务里面有如果有定时任务就去取消这一步,因为对象还没有删除,所有会进来这个函数
        //但TimerTask对象正在析构的时候,没有if判断 就去执行spt->Cancel()程序就会奔溃
        if(spt) spt->Cancel();
    }

public:
    TimeWheel(EventLoop *loop)
        : _tick(0), _capacity(60), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerFd()), _timerfd_channel(_loop, _timerfd)
    {
        // timerfd读事件就绪回调函数
        _timerfd_channel.SetReadCallback(std::bind(&TimeWheel::OneTime, this));
        _timerfd_channel.EnableRead(); // 启动timerfd读事件监控
    }

    ~TimeWheel() {}

    // 定时器中有个_timers成员,定时器信息的操作有可能在多个EventLoop线程中进行,因此需要考虑线程安全的问题
    // 但是我们又不想加锁,那就把对定时器的所有操作,都放在对应的线程中进行
    // 一个线程都是串行化执行的,那就不存在线程安全问题了

    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);
    void TimerRefresh(uint64_t id);
    void TimerCancel(uint64_t id);

    // 虽然这个接口存在线程安全问题,这个接口实际上不能被外界使用者调用,只能在模块EventLoop线程内执行
    bool HasTimer(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return false;
        }
        return true;
    }
};

5.2 TimeWheel整合到EventLoop

// EvevtLoop模块,对事件进行监控,处理

class EventLoop
{
private:
    using Functor = std::function<void()>;
    std::thread::id _thread_id;  // 线程ID
    int _event_fd;               // eventfd唤醒IO事件监控有可能导致的阻塞
    Channel _event_channel;      // 对_event_fd做事件管理
    Poller _poller;              // 对所有事件进行监控
    std::vector<Functor> _tasks; // 任务池
    std::mutex _mutex;           // 实现任务池操作的线程安全
    TimeWheel _time_wheel;       // 定时器模块

public:
    // 执行任务池中所有任务
    void RunAllTask()
    {
        // 为了不让每次拿任务都加锁解锁,因此一次把任务都拿出来
        // 这也是不用队列做任务池的原因
        std::vector<Functor> functor;
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.swap(functor);
        }
        // 依次执行任务
        for (auto &f : functor)
        {
            f();
        }
    }

    static int CreateEventFd()
    {
        int efd = eventfd(0, O_CLOEXEC | O_NONBLOCK);
        if (efd < 0)
        {
            LOG_FATAL("CREATE EVENTFD FAILED!!");
            abort();
        }
        return efd;
    }

    void ReadEvent()
    {
        uint64_t res = 0;
        int ret = read(_event_fd, &res, sizeof res);
        if (ret <= 0)
        {
            if (errno == EINTR || errno == EWOULDBLOCK)
            {
                return;
            }
            LOG_FATAL("READ EVENTFD FAILED!!");
            abort();
        }
    }

    void WeakUpEventfd()
    {
        uint64_t val = 1;
        int ret = write(_event_fd, &val, sizeof val);
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            LOG_FATAL("WRITE EVENTFD FAILED!!");
            abort();
        }
    }

public:
    EventLoop()
        : _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(this, _event_fd), _time_wheel(this)
    {
        // 给eventfd添加可读事件的回调,就绪了,读取eventfd事件通知次数
        _event_channel.SetReadCallback(std::bind(&EventLoop::ReadEvent, this));
        // 启动eventfd读事件监控
        _event_channel.EnableRead();
        //std::cout<<"_event_fd: "<<_event_fd<
    };

    // 1.事件监控 2.就绪事件处理 3.执行任务
    void Statr()
    {
        while(1)
        {
            // 1.事件监控
            std::vector<Channel *> active;
            _poller.Poll(&active);
            // 2.就绪事件处理
            for (auto &channel : active)
            {
                channel->HandleEvent();
            }
            // 3.执行任务
            RunAllTask();
        }
    }

    // 判断当前线程是否是EventLoop对应的线程
    bool IsInLoop()
    {
        return _thread_id == std::this_thread::get_id();
    }

    // 判断将要执行的任务是否处于当前线程,如果是则执行,不是则压入队列
    void RunInLoop(const Functor &cb)
    {
        if (IsInLoop())
        {
            return cb();
        }
        return QueueInLoop(cb);
    }

    void AssertInLoop() 
    {
        assert(_thread_id == std::this_thread::get_id());
    }

    // 将操作压入任务池
    void QueueInLoop(const Functor &cb)
    {
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.push_back(cb);
        }
        // 唤醒有可能因为没有事件就绪,而导致的epoll阻塞
        // 其实就是给eventfd写入一个数据,eventfd就会触发可读事件
        WeakUpEventfd();
    }

    // 添加/修改描述符的事件监控
    void UpdateEvent(Channel *channel)
    {
        _poller.UpdateEvent(channel);
    }

    // 移除描述符监控
    void RemoveEvent(Channel *channel)
    {
        _poller.RemoveEvent(channel);
    }

    // 添加定时任务
    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
    {
        return _time_wheel.TimerAdd(id, delay, cb);
    }
    // 刷新定时任务
    void TimerRefresh(uint64_t id)
    {
        return _time_wheel.TimerRefresh(id);
    }
    // 删除定时任务
    void TimerCancel(uint64_t id)
    {
        return _time_wheel.TimerCancel(id);
    }
    // 是否有某个定时任务存在
    bool HasTimer(uint64_t id)
    {
        return _time_wheel.HasTimer(id);
    }
};

//Chaneel模块之前只有EventLoop的声明,没有内部函数。因此需要放在EventLoop后面,编译才不会报错
void Channel::Remove()
{
    return _loop->RemoveEvent(this);
}

void Channel::Update()
{
    return _loop->UpdateEvent(this);
}

//同理TimeWheel模块也是一样
void TimeWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{
    _loop->RunInLoop(std::bind(&TimeWheel::TimerAddInLoop, this, id, delay, cb));
}

void TimeWheel::TimerRefresh(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimeWheel::TimerRefreshInLoop, this, id));
}

void TimeWheel::TimerCancel(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimeWheel::TimerCancelInLoop, this, id));
}

5.1 EventLoop与线程结合

目标:将EventLoop模块与线程整合起来

EventLoop模块与线程是——对应的。

EventLoop模块实例化的对象,在构造的时候就会初始化_thread_id,
而后边当运行一个连接操作的时候判断当前是否运行在连接自己的所在EventLoop模块对应的线程中,就是将线程ID与EventLoop模块中的thread_id进行一个比较,相同就表示在同一个线程,不同就表示当前运行线程并不是自己这个连接所在EventLoop线程。

上面的含义:EventLoop模块在实例化对象的时候,必须在线程内部

EventLoop实例化对象时会设置自己的thread_id,如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置。

存在问题:在构造EventLoop对象,到设置新的thread_id期间将是不可控的。因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象

造一个新的模块:LoopThread
这个模块的功能:将EventLoop与thread整合到一起

思想:

  1. 创建线程
  2. 在线程中实例化EventLoop对象

功能:可以向外部返回所实例化的EventLoop

//一个EventLoop一个线程,将EventLoop和线程整合在一起
class LoopThread
{
    private:
        //互斥锁+条件变量,用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop
        std::mutex _mutex;//互斥锁
        std::condition_variable _cond;//条件变量
        EventLoop* _loop;//EventLoop指针变量,这个对象需要在线程内部实例化
        std::thread _thread;//EventLoop对应的线程
    private:
        //实例化EventLoop对象,唤醒_cond有可能阻塞的线程,并且开始运行EventLoop模块的功能
        void ThreadEntry()
        {
            //由LoopThread管理这个变量的生命周期
            EventLoop loop;
            {
                std::unique_lock<std::mutex> _mtx(_mutex);
                _loop = &loop;
                _cond.notify_all();
            }
            loop.Statr();
        }

    public:
        //创建线程,设定线程入口函数
        LoopThread():_loop(nullptr),_thread(std::thread(&LoopThread::ThreadEntry,this)){}
        //返回当前线程关联的EventLoop对象的指针,得到EventLoop指针相对于找到这个线程,把连接绑定到对应EventLoop线程中
        EventLoop* GetLoop()
        {
            EventLoop* loop = nullptr;
            {
                std::unique_lock<std::mutex> _mtx(_mutex);
                _cond.wait(_mtx,[&](){ return _loop != nullptr;});//_loop为nullptr就一直阻塞
                loop = _loop;
            }
            return loop;
        }
};

5.2 EventLoop线程池

针对LoopThread设计一个线程池:

LoopThreadPool模块:对所有的LoopThread进行管理及分配

功能:

  1. 线程数量可配置(0个或多个)
    注意事项:在服务器中,主从Reactor模型是主线程只负责新连接获取,从属线程负责新连接的事件监控及处理
    因此当前的线程池,有可能从属线程会数量为0,也就是实现单Reactor服务器,一个线程及负责获取连接,也负责连接的处理
  2. 对所有的线程进行管理,其实就是管理0个或多个LoopThread对象
  3. 提供线程分配的功能
    当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理
    假设有0个从属线程,则直接分配给主线程的EventLoop,进行处理
    假设有多个从属线程,则采用RR轮转思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)
//针对LoopThread做一个线程池
class LoopThreadPoll
{
    private:
        int _thread_count;//从属线程的数量
        int _next_loop_idx;//给连接分配那个EventLoop
        EventLoop* _base_loop;//主EventLoop,运行在主线程,从属EventLoop线程数量为0,则所有操作都在主EventLoop中进行
        std::vector<LoopThread*> _threads;//保存所有LoopThread对象
        std::vector<EventLoop*> _loops;//从属EventLoop线程数量大于0则从_loops进行线程EventLoop分配
    public:
        LoopThreadPoll(EventLoop* loop):_thread_count(0),_next_loop_idx(0),_base_loop(loop){};
        //设置从属EventLoop线程数量
        void SetThreadCount(int count){
            _thread_count = count;
        }
        //创建从属线程
        void Create()
        {
            if(_thread_count > 0)
            {
                _threads.resize(_thread_count);
                _loops.resize(_thread_count);
                for(int i = 0; i < _thread_count; ++i)
                {
                    _threads[i] = new LoopThread();
                    _loops[i] = _threads[i]->GetLoop();
                }
            }

        }
        //获取从属线程EvetnLoop指针
        EventLoop* NextLoop()
        {
            if(_thread_count == 0) {
                return _base_loop;
            }
            //RR轮转
            _next_loop_idx = (_next_loop_idx + 1) % _thread_count;
            return _loops[_next_loop_idx];
            
        }
};

6. Connection模块


Connection模块:

目的:对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成。

管理:

  1. 套接字的管理,能够进行套接字的操作
  2. 连接事件的管理,可读,可写,错误,挂断,任意
  3. 缓冲区的管理,便于socket数据的接收和发送
  4. 协议上下文的管理,记录请求数据的处理过程
  5. 回调函数的管理
    a. 因为连接接收到数据之后该如何处理,需要由用户决定,因此必须有业务处理回调函数
    b. 一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数
    c. 一个连接关闭前,该如何处理,由用户决定,因此必须由关闭连接回调函数。
    d. 任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数

功能:

  1. 发送数据—给用户提供的发送数据接口,并不是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控
  2. 关闭连接—给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
  3. 启动非活跃连接的超时销毁功能
  4. 取消非活跃连接的超时销毁功能
  5. 协议切换—一个连接接收数据后如何进行业务处理,取决于上下文,以及数据的业务处理回调函数
// 通用类型Any
class Any
{

    class holder
    {
    public:
        virtual ~holder() {}
        virtual const std::type_info &type() = 0;
        virtual holder *clone() = 0;
    };

    template <class T>
    class placeholder : public holder
    {
    public:
        placeholder(const T &val) : _val(val) {}


        // 获取子类对象保存的数据类型
        virtual const std::type_info &type()
        {
            return typeid(T);
        }

        // 针对当前的对象自身,克隆出一个新的子类对象
        virtual holder *clone()
        {
            return new placeholder(_val);
        }
    public:
        T _val;
    };

private:
    holder *_content;

public:
    Any() : _content(nullptr) {}

    ~Any()
    {
        delete _content;
    }

    template <class T>
    Any(const T &val)
    {
        _content = new placeholder<T>(val);
    }

    Any(const Any &other)
    {
        _content = _content != nullptr ? other._content->clone() : nullptr;
    }

    // 返回子类对象保存的数据的指针
    template <class T>
    T *Get()
    {
        assert(typeid(T) == _content->type());
        // 父类指针指向子类中属于父类的,找不到子类中成员,因此需要转成子类指针
        return &(((placeholder<T> *)_content)->_val);
    }

    Any &swap(Any &other)
    {
        std::swap(_content, other._content);
        return *this;
    }

    // 赋值运算符的重载函数
    template <class T>
    Any &operator=(const T &val)
    {
        Any(val).swap(*this);
        return *this;
    }

    Any &operator=(const Any &other)
    {
        Any(other).swap(*this);
        return *this;
    }
};



class Connection;
// DISCONNECTED 连接关闭状态  CCONNECTING 连接建立成功成功,待处理状态
// CONNECTED 连接建立完成,各种设置已完成,可以通信状态     DISCONNECTING 连接待关闭状态
typedef enum
{
    DISCONNECTED,
    CONNECTING,
    CONNECTED,
    DISCONNECTING
} ConStatus;
//使用shared_ptr管理每个Connection对象,防止用户把Connection在某个地方删除
//最终导致程序奔溃退出
using PtrConnection = std::shared_ptr<Connection>;
//enable_shared_from_this里面保存了一个weak_ptr,可以从weak_ptr得到一个shared_ptr
class Connection: public std::enable_shared_from_this<Connection>
{
private:
    uint64_t _conn_id; // 连接唯一ID,便于连接的管理和查找
    // uint64_t _timer_fd; // 定时器ID,必须是唯一,这块为了简化操作使用_conn_id作为定时器id
    int _sockfd;                   // 连接关联的文件描述符
    bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为false
    EventLoop *_loop;              // 连接所关联的一个EventLoop
    ConStatus _status;             // 连接状态
    Socket _socket;                // 套接字管理
    Channel _channel;              // 连接事件管理
    Buffer _in_buffer;             // 输入缓存区,存放从socket中读取到的数据
    Buffer _out_buffer;            // 输出缓存区,存放经过业务处理要给对端发出的数据
    Any _context;                  // 请求接收处理上下文

    // 这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)
    // 换句话说,这⼏个回调都是组件使用者使用的
    
    //连接建立回调
    using ConnectedCallback = std::function<void(const PtrConnection &)>;
    //接收到数据回调
    using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
    //关闭连接回调
    using ClosedCallback = std::function<void(const PtrConnection &)>;
    //任意事件回调
    using AnyeEventCallback = std::function<void(const PtrConnection &)>;

    ConnectedCallback _connected_callback;
    MessageCallback _message_callback;
    ClosedCallback _closed_callback;
    AnyeEventCallback _event_callback;

    //组件内的连接关闭回调--组件内设置的,因为服务器组件内也会把所有的连接管理起来,⼀旦某个连接要关闭
    //就应该从管理的地⽅移除掉自己的信息
    ClosedCallback _server_closed_callback;

private:
    //五个channel的事件回调函数
    //描述符可读事件触发后调⽤的函数,接收socket数据放到接收缓冲区中,然后调⽤_message_callback进行业务处理
    void HandleRead()
    {   
        //1.接收socket缓存区数据,放到缓存区
        char buffer[65535];
        ssize_t ret = _socket.NonBlockRecv(buffer,65535);
        if(ret < 0)
        {  
            //出错,不能直接关闭连接
            return ShutdownInLoop();
        }
        //这里的ret等于0表示的是没有读取到数据,⽽并不是连接断开了,连接断开返回的是-1
        //将数据放⼊输⼊缓冲区,写⼊之后顺便将写偏移向后移动
        _in_buffer.WriteAndPush(buffer,ret);
        //2.调用_message_callback进行业务处理
        if(_in_buffer.ReadAbleSize() > 0)
        {
            if(_message_callback){
                //shared_from_this--从当前对象自身获取自身的shared_ptr管理对象
                return _message_callback(shared_from_this(),&_in_buffer);
            }
        }

    }
    //描述符可写事件出发后调用的函数,将发送缓存区中数据进行发送
    void HandleWrite()
    {
        //_out_buffer中保存的是待发送的数据
        ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(),_out_buffer.ReadAbleSize());
        if(ret < 0)
        {
            //发送错误关闭连接,如果接收缓存区还有数据先处理一下
            if(_in_buffer.ReadAbleSize() > 0)
            {
                if(_message_callback){
                    //shared_from_this--从当前对象自身获取自身的shared_ptr管理对象
                    _message_callback(shared_from_this(),&_in_buffer);
                }
            }
            //这时候就是实际的关闭释放操作了
            //先把释放连接任务压到任务队列中,等所有就绪事件处理后,在处理任务队列中释放连接任务
            //return ReleaseInLoop();
            return Release();
        }
        //千万不要忘了,将读偏移向后移动
        _out_buffer.MoveReadOffset(ret);
        if(_out_buffer.ReadAbleSize() == 0)
        {
            _channel.DisableWrite();// 没有数据待发送了,关闭写事件监控
            if(_status == DISCONNECTING)//如果当前是连接待关闭状态,还有数据,发送完数据释放连接,没有数据则直接释放
            {
                //先把释放连接任务压到任务队列中,等所有就绪事件处理后,在处理任务队列中释放连接任务
                //return ReleaseInLoop();
                return Release();
            }
        }
        return;
    }
    //描述符触发挂断事件
    void HandleClose()
    {
        //⼀旦连接挂断了,套接字就什么都⼲不了了,因此有数据待处理就处理⼀下,完毕关闭连接
        if(_in_buffer.ReadAbleSize() > 0)
        {
            if(_message_callback){
                //shared_from_this--从当前对象⾃⾝获取⾃⾝的shared_ptr管理对象
                _message_callback(shared_from_this(),&_in_buffer);
            }
        }
        //这时候就是实际的关闭释放操作了
        //先把释放连接任务压到任务队列中,等所有就绪事件处理后,在处理任务队列中释放连接任务
        //return ReleaseInLoop();
        return Release();
    }
    //描述符触发出错事件
    void HandleError()
    {
        return HandleClose();
    }

    //描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务; 2. 调⽤组件使用者的任意事件回调
    void HandleEvent()
    {
        if(_enable_inactive_release == true)
        {
            _loop->TimerRefresh(_conn_id);
        }
        if(_event_callback)
        {
            _event_callback(shared_from_this());
        }
    }

    //连接获取后,所处的状态下要进行各种设置(启动读事件监控,调用连接建立阶段回调函数)
    void EstablishedInLoop()
    {
        // 1. 修改连接状态
        assert(_status == CONNECTING);//当前的状态必须⼀定是上层的半连接状态
        _status = CONNECTED;//当前函数执⾏完毕,则连接进⼊已完成连接状态
        // 2. 启动读事件监控 
        _channel.EnableRead();// ⼀旦启动读事件监控就有可能会⽴即触发读事件,如果这时候启动了⾮活跃连接销毁
        //3. 调用组件调用者设置的连接建立阶段的回调函数
        if(_connected_callback){
           _connected_callback(shared_from_this());
        }
    }

    //这个接口才是实际的释放接口
    void ReleaseInLoop()
    {
        //1. 修改连接状态,将其置为DISCONNECTED
        _status == DISCONNECTED;
        //2. 移除连接的事件监控
        _channel.Remove();
        //3. 关闭描述符
        _socket.Close();
        //4. 如果当前定时器队列中还有定时销毁任务,则取消任务
        if(_loop->HasTimer(_conn_id))
        {
            CancelInactiveReleaseInLoop();
        }
        //5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,
        // 再去处理会出错,因此先调用用户的回调函数
        if(_closed_callback){
            _closed_callback(shared_from_this());
        }
        //移除服务器内部管理的连接信息
        if(_server_closed_callback){
            _server_closed_callback(shared_from_this());
        }
    }
    //这个接⼝并不是实际的发送接⼝,⽽只是把数据放到了发送缓冲区,启动了可写事件监控
    void SendInLoop(Buffer& buf)
    {
        if(_status == DISCONNECTED) return;
        _out_buffer.WriteBufferAndPush(buf);
        if(_channel.WriteAble() == false)
        {
            _channel.EnableWrite();
        }
    }
    //这个关闭操作并⾮实际的连接释放操作,需要判断还有没有数据待处理,待发送
    void ShutdownInLoop()
    {
        // 设置连接为半关闭状态
        _status = DISCONNECTING;
        if(_in_buffer.ReadAbleSize() > 0)
        {
            if(_message_callback){
                _message_callback(shared_from_this(),&_in_buffer);
            }
        }

        //要么就是写⼊数据的时候出错关闭,要么就是没有待发送数据,直接关闭
        if(_out_buffer.ReadAbleSize() > 0)
        {
            if(_channel.WriteAble() == false)
            {
                _channel.EnableWrite();
            }
        }

        if(_out_buffer.ReadAbleSize() == 0)
        {
            //ReleaseInLoop();
            Release();
        }
    }
    //启动⾮活跃连接超时释放规则
    void EnableInactiveReleaseInLoop(int sec)
    {
        //1. 将判断标志 _enable_inactive_release 置为true
        _enable_inactive_release = true;
        //2. 如果当前定时销毁任务已经存在,那就刷新延迟⼀下即可
        if(_loop->HasTimer(_conn_id))
        {
            return _loop->TimerRefresh(_conn_id);
        }
        //3. 如果不存在定时销毁任务,则新增


        /* 业务处理超时,查看服务器的处理情况
        当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间)
        1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放
        假设现在  12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度
            1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度
            2. 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉
            这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)
            因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务队列中,
            等到事件处理完了执行任务池中的任务的时候,再去释放*/ 
        //_loop->TimerAdd(_conn_id,sec,std::bind(&Connection::ReleaseInLoop,this));

        _loop->TimerAdd(_conn_id,sec,std::bind(&Connection::Release,this));
        
    }
    void CancelInactiveReleaseInLoop()
    {
        _enable_inactive_release = false;
        if(_loop->HasTimer(_conn_id))
        {
            _loop->TimerCancel(_conn_id);
        }
    }
    void UpgradeInLoop(const Any& context,const ConnectedCallback& conn,const MessageCallback& msg,
        const ClosedCallback& closed,const AnyeEventCallback& event)
    {
        _context = context;
        _connected_callback = conn;
        _message_callback = msg;
        _closed_callback = closed;
        _event_callback = event;
    }


public:
    Connection(EventLoop* loop,uint64_t conn_id,int sockfd)
        :_loop(loop)
        ,_conn_id(conn_id)
        ,_sockfd(sockfd)
        ,_enable_inactive_release(false)
        ,_status(CONNECTING)
        ,_socket(_sockfd)
        ,_channel(_loop,_sockfd)
    {
        _channel.SetReadCallback(std::bind(&Connection::HandleRead,this));
        _channel.SetWriteCallback(std::bind(&Connection::HandleWrite,this));
        _channel.SetErrorCallback(std::bind(&Connection::HandleError,this));
        _channel.SetCloseCallback(std::bind(&Connection::HandleClose,this));
        _channel.SetEventCallback(std::bind(&Connection::HandleEvent,this));
    }
    ~Connection()
    {
        LOG_DEBUG("RELEASE CONNECTION:%p", this);
    }
    //获得管理的文件描述符
    int Fd()
    {
        return _sockfd;
    }
    //获取连接ID
    int Id()
    {
        return _conn_id;
    }
    //当前是否处于CONNECTED状态
    bool Connected()
    {
        return (_status == CONNECTED);
    }
    //设置上下文--连接建立完成时进行调用
    void SetContent(const Any& context)
    {
        _context = context;
    }
    //获取上下文
    Any* GetContent()
    {
        return &_context;
    }
    void SetConnectedCallback(const ConnectedCallback& cb)
    {
        _connected_callback = cb;
    }
    void SetMessageCallback(const MessageCallback& cb)
    {
        _message_callback = cb;
    }
    void SetClosedCallback(const ClosedCallback& cb)
    {
        _closed_callback = cb;
    }
    void SetAnyeEventCallback(const AnyeEventCallback& cb)
    {
        _event_callback = cb;
    }
    void SetSevClosedCallback(const ClosedCallback& cb)
    {
        _server_closed_callback = cb;
    }

    //多线程执行,涉及到线程安全的问题,我们都放在EventLoop线程下的任务队列中执行

    //连接建立就绪后,启动读监控,调用_connected_callback
    void Established()
    {
        _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));
    }
    //发送数据,实际是将数据放到发送缓存区,启动写事件监控
    void Send(const void* data,size_t len)
    {
        //外界传⼊的data,可能是个临时的空间,我们现在只是把发送操作压⼊了任务池,有可能并没有被⽴即执⾏
        //因此有可能执⾏的时候,data指向的空间有可能已经被释放了
        Buffer buf;
        buf.WriteAndPush(data,len);
        //std::move(buf))是将右值参数(如 std::move(buf))会被移动构造到 std::bind 内部‌,生成一个 ‌独立存储的 Buffer 对象‌。
        //当异步任务执行时,std::bind 会将存储的 Buffer 对象以 "‌左值‌" 形式传递给 SendInLoop
        //所以void SendInLoop(Buffer& buf),Buffer应为左值引用
        //并且这里std::move(buf))想要减少一次拷贝,减少的是将buf拷贝到bind内部生成Buffer对象这次拷贝
        _loop->RunInLoop(std::bind(&Connection::SendInLoop,this,std::move(buf)));
    }
    //提供给组件使用者的关闭窗口,并不是真实关闭,需要判断输入输出有没有数据待处理
    void Shutdown()
    {
        _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop,this));
    }
    void Release()
    {
        //定时超时释放连接的任务,压到任务队列中
        //甚至可以所有的释放连接的地方都先先把任务压到任务队列中,等所有就绪事件处理自后再去处理任务队列中的任务
        //这样可以把Channel类中HandleEvent中改改,不用每个事件执行之前都先去执行任意事件的回调,而是最后在执行任意事件回调
        //这样更符合逻辑,当事件就绪处理自后在刷新活跃度,如果超时在释放
        //现在不会在处理就绪事件的时候去释放连接了,所有不用担心因为释放连接销毁Connection对象而导致调用任意事件回调而导致程序奔溃了
        _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop,this));

    }
    //启动非活跃消费,并定义多长时间五通信就是非活跃,添加定时任务
    void EnableInactiveRelease(int sec)
    {
        _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop,this,sec));
    }
    //取消非活跃销毁
    void CancelInactiveRelease()
    {
        _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop,this));
    }

    //切换协议---重置上下⽂以及阶段性回调处理函数 -- ⽽是这个接⼝必须在EventLoop线程中⽴即执⾏
    //防备新的事件触发后,处理的时候,切换任务还没有被执⾏--会导致数据使⽤原协议处理了。
    void Upgrade(const Any& content,const ConnectedCallback& conn,const MessageCallback& msg,
                const ClosedCallback& closed,const AnyeEventCallback& event)
    {
        _loop->AssertInLoop();
        _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,content,conn,msg,closed,event));

    }
};

7. Acceptor模块

Acceptor模块:对监听套接字进行管理

  1. 创建一个监听套接字
  2. 启动读事件监控
  3. 事件触发后,获取新连接
  4. 调用新连接获取成功后的回调函数
  5. 为新连接创建Connection进行管理(这一步不是Acceptor模块操作,应该是服务器模块)
    因为Acceptor模块只进行监听连接的管理,因此获取到新连接的描述符之后,对于新连接描述符如何处理其实并不关心

对于新连接如何处理,应该是服务器模块来管理的
服务器模块,实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数

//Acceptor模块,对监听套接字进行管理
class Acceptor
{
    private:
        EventLoop* _loop;//对监听套接字进行监控
        Socket _socket;//创建监听套接字
        Channel _channel;//对监听套接字进行事件管理

        using AcceptCallback = std::function<void(int)>;
        AcceptCallback _accept_callback;
    private:
        //监听套接字读事件就绪回调函数 --- 获取新连接,调用_accept_callback函数进行新连接处理
        void HandleRead()
        {
            int newfd = _socket.Accpet();
            if(newfd < 0)
            {
                return;
            }
            if(_accept_callback) _accept_callback(newfd);
        }

        int CreateServer(uint16_t port)
        {
            int ret = _socket.CreateServer(port);
            assert(ret == true);
            return _socket.Fd();
        }

    public:
        Acceptor(EventLoop* loop,uint16_t port):_loop(loop),_socket(CreateServer(port)),_channel(_loop,_socket.Fd())
        {
            _channel.SetReadCallback(std::bind(&Acceptor::HandleRead,this));
        }

        void SetAcceptCallback(const AcceptCallback& cb)
        {
            _accept_callback = cb;
        }
        //不能将启动读事件监控,放到构造函数,必须设置在回调函数后,再去启动
        //否则有可能造成启动监控之后,立刻就有了事件,处理的时候,回调函数还没有设置,新连接得不到处理,且资源泄漏
        void Listen()
        {
            _channel.EnableRead();
        }      
};

8. TcpServer模块


TcpServer模块:对所有模块的整合,通过TcpServer模块实例化的对象,可以非常简单的完成一个服务器的搭建

管理:

  1. Acceptor对象,创建一个监听套接字
  2. EventLoop对象,baseloop对象,实现对监听套接字的事件监控
  3. std:unordered_map_conns,实现对所有新建连接的管理
  4. LoopThreadPool对象,创建loop线程池,对新建连接进行事件监控及处理

功能:

  1. 设置从属线程池数量
  2. 启动服务器
  3. 设置各种回调函数(连接建立完成,消息,关闭,任意),用户设置给TcpServer,TcpServer设置给获取的新连接
  4. 是否启动非活跃连接超时销毁功能
  5. 给baseloop添加定时任务功能(如果用户需要的话可以设置)

流程:

  1. 在TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象(baseloop)
  2. 将Acceptor挂到baseloop上进行事件监控
  3. 一旦Acceptor对象就绪了可读事件,则执行读事件回调函数获取新建连接
  4. 对新连接,创建一个Connection进行管理
  5. 对连接对应的Connection设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)
  6. 启动Connection的非活跃连接的超时销毁规则
  7. 将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的Eventloop中进行事件监控
  8. 一旦Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的消息回调
class TcpServer
{
    private:
        uint64_t _next_id; //这是一个自动增长的连接ID
        uint16_t _port;//服务器端口
        int _timeout;//这是非活跃连接的统计时间---多长时间无通信就是非活跃连接
        bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志
        EventLoop _baseloop;//这是主线程的EventLoop对象,负责监听事件的处理
        Acceptor _acceptor;//这是监听套接字的管理对象
        LoopThreadPoll _pool;//这是从属EventLoop线程池
        std::unordered_map<uint64_t,PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象

        using Functor = std::function<void()>;

        //连接建立回调
        using ConnectedCallback = std::function<void(const PtrConnection &)>;
        //接收到数据回调
        using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
        //关闭连接回调
        using ClosedCallback = std::function<void(const PtrConnection &)>;
        //任意事件回调
        using AnyeEventCallback = std::function<void(const PtrConnection &)>;

        ConnectedCallback _connected_callback;
        MessageCallback _message_callback;
        ClosedCallback _closed_callback;
        AnyeEventCallback _event_callback;

    private:
        //为新连接构造一个Connection进行管理
        void NewConnection(int fd)
        {
            LOG_DEBUG("NEW CONNECTION");
            _next_id++;
            PtrConnection conn(new Connection(_pool.NextLoop(),_next_id,fd));
            conn->SetConnectedCallback(_connected_callback);
            conn->SetClosedCallback(_closed_callback);
            conn->SetMessageCallback(_message_callback);
            conn->SetAnyeEventCallback(_event_callback);
            conn->SetSevClosedCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1));
            if(_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//启动非活跃销毁
            conn->Established();//就绪初始化
            _conns.insert(std::make_pair(_next_id,conn));

        }

        void RemoveConnectionInLoop(const PtrConnection& conn)
        {   
            //int id = conn->Id();
            auto it = _conns.find(conn->Id());
            if(it != _conns.end())
            {
                _conns.erase(it);
            }
        }

        //给Connection模块的_server_closed_callback设置一个回调函数
        //从管理Connection的_conns中移除管理信息
        //这里才是真正释放Connection的地方
        void RemoveConnection(const PtrConnection& conn)
        {
            _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop,this,conn));
        } 

        void RunAfterInLoop(const Functor& task, int delay)
        {
            _next_id++;
            _baseloop.TimerAdd(_next_id,delay,task);
        }

    public:
        TcpServer(uint16_t port)
        :_port(port)
        ,_next_id(0)
        ,_timeout(0)
        ,_enable_inactive_release(false)
        ,_acceptor(&_baseloop,_port)
        ,_pool(&_baseloop)
        {
            _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1));
            _acceptor.Listen();
        }
        //设置从属EventLoop线程数量
        void SetThreadCount(int count)
        {
            _pool.SetThreadCount(count);
        }
        void SetConnectedCallback(const ConnectedCallback& cb)
        {
            _connected_callback = cb;
        }
        void SetMessageCallback(const MessageCallback& cb)
        {
            _message_callback = cb;
        }
        void SetClosedCallback(const ClosedCallback& cb)
        {
            _closed_callback = cb;
        }
        void SetAnyeEventCallback(const AnyeEventCallback& cb)
        {
            _event_callback = cb;
        }
        //启动非活跃定时销毁任务
        void EnableInactiveRelease(int timeout)
        {
            _enable_inactive_release = true;
            _timeout = timeout;
        }
        //提供给主线程添加定时任务的接口,是否调用由用户决定
        void RunAfter(const Functor& task, int delay)
        {
            _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,task,delay));
        }
        //启动服务器
        void Start()
        {
            //创建从属EventLoop线程池
            _pool.Create();
            //启动监听套接字读事件监控
            _baseloop.Statr();
        }     
};

本文地址:https://www.yitenyun.com/495.html

搜索文章

Tags

#服务器 #python #pip #conda #ios面试 #ios弱网 #断点续传 #ios开发 #objective-c #ios #ios缓存 香港站群服务器 多IP服务器 香港站群 站群服务器 #远程工作 #kubernetes #笔记 #平面 #容器 #linux #学习方法 #运维 #进程控制 #fastapi #html #css #Trae #IDE #AI 原生集成开发环境 #Trae AI #低代码 #爬虫 #音视频 #docker #后端 #数据库 #内网穿透 #网络 #cpolar #开发语言 #云原生 #iventoy #VmWare #OpenEuler #人工智能 #node.js #Conda # 私有索引 # 包管理 #开源 #RTP over RTSP #RTP over TCP #RTSP服务器 #RTP #TCP发送RTP #MobaXterm #ubuntu #kylin #vscode #mobaxterm #深度学习 #计算机视觉 #android #腾讯云 #c# #数信院生信服务器 #Rstudio #生信入门 #生信云服务器 #安全 #nginx #tcp/ip #物联网 #websocket #多个客户端访问 #IO多路复用 #回显服务器 #TCP相关API #学习 #web安全 #算法 #大数据 #java #jar #claude #华为 #ModelEngine #金融 #大模型 #mcp #金融投资Agent #Agent #github #git #windows #我的世界 #n8n #本地部署 #ssh #hadoop #hbase #hive #zookeeper #spark #kafka #flink #qt #C++ #云计算 #我的世界服务器搭建 #minecraft #udp #c++ #c语言 #网络协议 #ide #todesk #jenkins #需求分析 #scala #测试用例 #测试工具 #压力测试 #Dell #PowerEdge620 #内存 #硬盘 #RAID5 #架构 #unity #游戏引擎 #MCP #MCP服务器 #性能优化 #缓存 #面试 #http #cpp #项目 #高并发 #gemini #gemini国内访问 #gemini api #gemini中转搭建 #Cloudflare #stm32 #macos #NPU #CANN #screen 命令 #mvp #个人开发 #设计模式 #vue.js #前端 #vue #阿里云 #AI编程 #JumpServer #堡垒机 #单元测试 #集成测试 #pycharm #京东云 #振镜 #振镜焊接 #gpu算力 #DisM++ # GLM-4.6V # 系统维护 #SRS #流媒体 #直播 #守护进程 #复用 #screen #unity3d #游戏 #服务器框架 #Fantasy #YOLOFuse # Base64编码 # 多模态检测 #umeditor粘贴word #ueditor粘贴word #ueditor复制word #ueditor上传word图片 #智能手机 #mamba #科技 #自然语言处理 #神经网络 #libosinfo #java-ee #自动化 #maven #gitlab #课程设计 #spring boot #centos #mysql #sql #SSH反向隧道 # Miniconda # Jupyter远程访问 #chatgpt #codex #Dify #ARM架构 #鲲鹏 #yum #jmeter #功能测试 #软件测试 #自动化测试 #职场和发展 #EMC存储 #存储维护 #NetApp存储 #三维 #3D #三维重建 #单片机 #NAS #Termux #Samba #Linux #万悟 #联通元景 #智能体 #镜像 #微信小程序 #小程序 #微信 #健身房预约系统 #健身房管理系统 #健身管理系统 #react.js #apache #鸭科夫 #逃离鸭科夫 #鸭科夫联机 #鸭科夫异地联机 #开服 #北京百思可瑞教育 #百思可瑞教育 #北京百思教育 #risc-v #嵌入式硬件 #php #deepseek #部署 #flask #运维开发 #GPU服务器 #8U #硬件架构 #智能路由器 #5G #东方仙盟 #C2000 #TI #实时控制MCU #AI服务器电源 #SAP #ebs #metaerp #oracle ebs #1024程序员节 #编辑器 #搜索引擎 #DeepSeek #蓝耘智算 #910B #昇腾 #AIGC #ida #pytorch #Anaconda配置云虚拟环境 #cursor #Nacos #web #微服务 #elasticsearch #django #web3.py #麒麟OS #信息与通信 #信号处理 #tcpdump #RustDesk #IndexTTS 2.0 #本地化部署 #毕业设计 #车辆排放 #ms-swift # 大模型 # 模型训练 #Android #Bluedroid #transformer #javascript #银河麒麟高级服务器操作系统安装 #银河麒麟高级服务器V11配置 #设置基础软件仓库时出错 #银河麒高级服务器系统的实操教程 #生产级部署银河麒麟服务系统教程 #Linux系统的快速上手教程 #AI #工具集 #oracle #sqlite #epoll #电气工程 #C# #PLC #PyTorch # Triton # 高并发部署 #golang #rdp #经验分享 #大模型部署 #mindie #大模型推理 #创业创新 #业界资讯 #laravel #openlayers #bmap #tile #server #milvus #langchain #大模型开发 #程序员 #TCP #客户端 #嵌入式 #DIY机器人工房 #redis #CosyVoice3 # 语音合成 #负载均衡 #spring #tomcat #intellij-idea #计算机网络 #x86_64 #数字人系统 #ssl #windows11 #microsoft #系统修复 #信令服务器 #Janus #MediaSoup #其他 #机器学习 #Puppet # IndexTTS2 # TTS #rtsp #转发 #CVE-2025-61686 #网络安全 #漏洞 #路径遍历高危漏洞 #SQL注入主机 #语音识别 #说话人验证 #声纹识别 #CAM++ #idm #unix #webrtc #分布式 #YOLO # GPU租赁 # 自建服务器 #json #devops #戴尔服务器 #戴尔730 #装系统 #rust #fpga开发 #ThingsBoard MCP #asp.net #sqlserver #密码学 #大模型教程 #AI大模型 #大模型学习 #结构体 #HeyGem # 服务器IP访问 # 端口映射 #制造 #遛狗 #ping通服务器 #读不了内网数据库 #bug菌问答团队 #jvm #bug #数据分析 #推荐算法 #shell #渗透测试 #黑客技术 #计算机 #文件上传漏洞 #mcu #adb #A2A #GenAI #数据安全 #注入漏洞 #SSE # AI翻译机 # 实时翻译 #debian # 一锤定音 # 大模型微调 #CUDA #Triton #SSH公钥认证 # PyTorch # 安全加固 #聊天小程序 #vllm #心理健康服务平台 #心理健康系统 #心理服务平台 #心理健康小程序 #dify #企业开发 #ERP #项目实践 #.NET开发 #C#编程 #编程与数学 #语言模型 #昇腾300I DUO #fiddler #nodejs #练习 #基础练习 #数组 #循环 #九九乘法表 #计算机实现 #dynadot #域名 #esb接口 #走处理类报异常 #opencv #数据挖掘 #Qwen3-14B # 大模型部署 # 私有化AI #ffmpeg #交互 #vnstat #监控 #文心一言 #AI智能体 #银河麒麟部署 #银河麒麟部署文档 #银河麒麟linux #银河麒麟linux部署教程 #vp9 #idea #intellij idea #攻防演练 #Java web #红队 #支付 #银河麒麟 #系统升级 #信创 #国产化 #SSH跳板机 # Python3.11 #LVDS #高速ADC #DDR #API限流 # 频率限制 # 令牌桶算法 #驱动开发 #黑群晖 #虚拟机 #无U盘 #纯小白 #华为云 #screen命令 #leetcode #Gunicorn #WSGI #Flask #并发模型 #容器化 #Python #性能调优 #蓝湖 #Axure原型发布 #AI 推理 #NV #llama #chrome #门禁 #梯控 #智能一卡通 #门禁一卡通 #消费一卡通 #智能梯控 #一卡通 #源代码管理 #处理器 #超时设置 #客户端/服务器 #网络编程 #管道Pipe #system V #ai #ai编程 #数据结构 #机器人 #排序算法 #muduo库 #gitea #uv #uvx #uv pip #npx #Ruff #pytest # 目标检测 #操作系统 #国产化OS #react native #CVE-2025-68143 #CVE-2025-68144 #CVE-2025-68145 #html5 #SSH # 批量管理 #ASR #SenseVoice #星图GPU #中间件 #YOLO26 #目标检测 #MQTT协议 #C语言 #vivado license #springboot #知识库 #prometheus #grafana #svn #证书 #可信计算技术 #web server #请求处理流程 #openHiTLS #TLCP #DTLCP #商用密码算法 #ONLYOFFICE #MCP 服务器 #测评 #CCE #Dify-LLM #Flexus # 双因素认证 # TensorFlow #毕设 #交通物流 #服务器繁忙 #serverless #RAID #RAID技术 #磁盘 #存储 #rocketmq #selenium #scrapy #postgresql #连接数据库报错 #蓝牙 #LE Audio #BAP #硬件工程 #智能家居 #pyqt #DNS #嵌入式编译 #ccache #distcc #企业微信 #puppeteer #SPA #单页应用 #C #Spring AI #STDIO传输 #SSE传输 #WebMVC #WebFlux #bootstrap #链表 #仙盟创梦IDE #动态规划 #xlwings #Excel #swagger #visual studio code #dlms #dlms协议 #逻辑设备 #逻辑设置间权限 #安全威胁分析 #mariadb #spring cloud #nfs #iscsi #paddleocr #STDIO协议 #Streamable-HTTP #McpTool注解 #服务器能力 #prompt #wsl #LangGraph #CLI #JavaScript #langgraph.json #文件管理 #文件服务器 #树莓派4b安装系统 #numpy #电脑 #KMS激活 #jdk #排序 #wordpress #雨云 #ddos #LobeChat #vLLM #GPU加速 #系统架构 #翻译 #开源工具 #CSDN #ansible #数据仓库 #海外服务器安装宝塔面板 #HistoryServer #Spark #YARN #jobhistory #https #LoRA # lora-scripts # 模型微调 #ZooKeeper #ZooKeeper面试题 #面试宝典 #深入解析 #ComfyUI # 推理服务器 #智能体来了 #传统行业 #AI赋能 #n8n解惑 #编程助手 #elk #rabbitmq #.netcore #esp32 arduino #决策树 #简单数论 #埃氏筛法 #openEuler #Hadoop #微PE # GLM-4.6V-Flash-WEB # AI部署 #材料工程 #数码相机 #智能电视 #VMware创建虚拟机 #远程更新 #缓存更新 #多指令适配 #物料关联计划 #挖漏洞 #攻击溯源 #编程 #内存接口 # 澜起科技 # 服务器主板 #blender #warp # 显卡驱动备份 #模拟退火算法 #计算机毕业设计 #程序定制 #毕设代做 #课设 #源码 #VMware #开关电源 #热敏电阻 #PTC热敏电阻 #能源 #防毒面罩 #防尘面罩 #文件传输 #电脑文件传输 #电脑传输文件 #电脑怎么传输文件到另一台电脑 #电脑传输文件到另一台电脑 #eureka #mongodb #wireshark #广播 #组播 #并发服务器 #.net #net core #kestrel #web-server #asp.net-core #nacos #银河麒麟aarch64 #uni-app #m3u8 #HLS #移动端H5网页 #APP安卓苹果ios #监控画面 直播视频流 #Prometheus #uvicorn #uvloop #asgi #event #日志分析 #Zabbix #语音合成 # 服务器迁移 # 回滚方案 #二值化 #Canny边缘检测 #轮廓检测 #透视变换 #DooTask #大模型入门 #homelab #Lattepanda #Jellyfin #Plex #Emby #Kodi #yolov12 #研究生life #Clawdbot #notepad++ #es安装 #postman #gpu #nvcc #cuda #nvidia #gpt #Socket #TensorRT # 推理优化 #zabbix #企业存储 #RustFS #对象存储 #高可用 #KMS 激活 #log4j #Jetty # CosyVoice3 # 嵌入式服务器 #模块 #MC #MC群组服务器 #RXT4090显卡 #RTX4090 #深度学习服务器 #硬件选型 #群晖 #音乐 #flutter #IntelliJ IDEA #Spring Boot #高级IO #select #neo4j #NoSQL #SQL #Llama-Factory # 大模型推理 #Coturn #TURN #STUN #身体实验室 #健康认知重构 #系统思维 #微行动 #NEAT效应 #亚健康自救 #ICT人 #云服务器 #个人电脑 #漏洞挖掘 #echarts # 服务器IP # 端口7860 #SSH别名 # CUDA #CS2 #debian13 #建筑缺陷 #红外 #数据集 #BoringSSL #时序数据库 #SMARC #ARM #远程控制 #Docker #云计算运维 #asp.net大文件上传 #asp.net大文件上传下载 #asp.net大文件上传源码 #ASP.NET断点续传 #asp.net上传文件夹 #asp.net上传大文件 #excel # 代理转发 # 跳板机 #Reactor # ARM服务器 # 鲲鹏 #LangFlow # 智能运维 # 性能瓶颈分析 #空间计算 #原型模式 #http头信息 #VibeVoice # 云服务器 #无人机 #ci/cd #k8s #junit #web服务器 #turn #ICE #信创国产化 #达梦数据库 # 公钥认证 #树莓派 #温湿度监控 #WhatsApp通知 #IoT #MySQL #clickhouse #pdf #代理 #TCP服务器 #开发实战 #全文检索 #数据访问 #银河麒麟服务器系统 #远程桌面 #harmonyos #鸿蒙PC #GPU #AutoDL ##租显卡 #I/O模型 #并发 #水平触发、边缘触发 #多路复用 #eclipse #servlet #arm64 #新人首发 #汽车 #word #SSH复用 # 远程开发 #可撤销IBE #服务器辅助 #私钥更新 #安全性证明 #双线性Diffie-Hellman #Kylin-Server #国产操作系统 #服务器安装 #磁盘配额 #存储管理 #形考作业 #国家开放大学 #系统运维 #自动化运维 #Android16 #音频性能实战 #音频进阶 #短剧 #短剧小程序 #短剧系统 #微剧 #DHCP #C++ UA Server #SDK #Windows #跨平台开发 #hibernate #nosql #agent #ai大模型 #散列表 #哈希算法 #机器视觉 #6D位姿 #UOS #海光K100 #统信 #dba #mssql #VMWare Tool #wpf #串口服务器 #Modbus #MOXA #GATT服务器 #蓝牙低功耗 #lucene # RTX 3090 #CNAS #CMA #程序文件 #IO # ControlMaster #网络安全大赛 #信息可视化 #硬件 #Fun-ASR # 语音识别 # WebUI #密码 #firefox #safari #outlook #错误代码2603 #无网络连接 #2603 #windbg分析蓝屏教程 #jupyter #le audio #低功耗音频 #通信 #连接 #实时检测 #卷积神经网络 #googlecloud #nmodbus4类库使用教程 #DAG #docker-compose #目标跟踪 #PowerBI #企业 #云服务器选购 #Saas #CPU #线程 #具身智能 #SSH密钥 #c++20 #Buck #NVIDIA #算力 #交错并联 #DGX #ETL管道 #RAG #向量存储 #数据预处理 #DocumentReader #HarmonyOS APP #内存治理 #IFix # 远程连接 #spring ai #oauth2 #gerrit #opc ua #opc #数据可视化 #rtmp #Miniconda # 环境迁移 #matplotlib #安全架构 #AI电商客服 #ROS # 局域网访问 # 批量处理 #指针 #anaconda #虚拟环境 #ui #cosmic #GB28181 #SIP信令 #SpringBoot #视频监控 #WT-2026-0001 #QVD-2026-4572 #smartermail # GLM-TTS # 数据安全 #xshell #host key # 高温监控 #TTS私有化 # IndexTTS # 音色克隆 #fs7TF #iBMC #UltraISO #npu #arm开发 #Modbus-TCP #大剑师 #nodejs面试题 #系统管理 #服务 # 树莓派 # ARM架构 #H5 #跨域 #发布上线后跨域报错 #请求接口跨域问题解决 #跨域请求代理配置 #request浏览器跨域 #视频 #ip #远程软件 #游戏机 #ceph #银河麒麟操作系统 #openssh #华为交换机 #信创终端 #ambari #arm #UDP的API使用 #ESP32 # OTA升级 # 黄山派 #内网 #挖矿 #Linux病毒 #网安应急响应 # 网络延迟 # GLM # 服务连通性 #azure #aws # Connection refused #teamviewer #代理服务器 #rsync # 数据同步 #LLM #bash # 高并发 #设计师 #图像处理 #游戏美术 #技术美术 #数据恢复 #视频恢复 #视频修复 #RAID5恢复 #流媒体服务器恢复 #TTS #claudeCode #content7 #go # GPU集群 #跳槽 #工作 #Gateway #认证服务器集成详解 #sql注入 #服务器开启 TLS v1.2 #IISCrypto 使用教程 #TLS 协议配置 #IIS 安全设置 #服务器运维工具 #uniapp #合法域名校验出错 #服务器域名配置不生效 #request域名配置 #已经配置好了但还是报错 #uniapp微信小程序 #框架搭建 #odoo #状态模式 #AI-native #Tokio #华为od #华为机试 #Java #Apple AI #Apple 人工智能 #FoundationModel #Summarize #SwiftUI #大语言模型 #SSH跳转 #多线程 #套接字 #I/O多路复用 #字节序 #weston #x11 #x11显示服务器 #研发管理 #禅道 #禅道云端部署 #计算几何 #斜率 #方向归一化 #叉积 #samba #RSO #机器人操作系统 #appche #Ubuntu #glibc #muduo #TcpServer #accept #高并发服务器 #远程开发 #ftp #sftp #YOLO识别 #YOLO环境搭建Windows #YOLO环境搭建Ubuntu # 轻量化镜像 # 边缘计算 #深度优先 #DFS #集成学习 #OpenHarmony #版本控制 #Git入门 #开发工具 #代码托管 #fabric #winscp #后端框架 #Ansible # 批量部署 #JNI #pxe #媒体 #远程连接 # 数字人系统 # 远程部署 # TURN # NAT穿透 #cpu #MCP服务器注解 #异步支持 #方法筛选 #声明式编程 #自动筛选机制 #前端框架 #WinSCP 下载安装教程 #SFTP #FTP工具 #服务器文件传输 #手机h5网页浏览器 #安卓app #苹果ios APP #手机电脑开启摄像头并排查 #语音生成 #AI写作 #AI部署 # ms-swift #free #vmstat #sar #PN 结 #服务器线程 # SSL通信 # 动态结构体 #RWK35xx #语音流 #实时传输 #node #超算中心 #PBS #lsf #rustdesk #p2p #报表制作 #职场 #用数据讲故事 #lvs #spine #TRO #TRO侵权 #TRO和解 #运维工具 #网络攻击模型 #进程 #进程创建与终止 #Discord机器人 #云部署 #程序那些事 #r语言 #mybatis #Node.js # child_process #服务器IO模型 #非阻塞轮询模型 #多任务并发模型 #异步信号模型 #多路复用模型 #KMS #slmgr #系统安全 #ipmitool #BMC #宝塔面板部署RustDesk #RustDesk远程控制手机 #手机远程控制 # 黑屏模式 # TTS服务器 #铁路桥梁 #DIC技术 #箱梁试验 #裂纹监测 #四点弯曲 #ollama #llm #可再生能源 #绿色算力 #风电 #领域驱动 #麦克风权限 #访问麦克风并录制音频 #麦克风录制音频后在线播放 #用户拒绝访问麦克风权限怎么办 #uniapp 安卓 苹果ios #将音频保存本地或上传服务器 #移动端h5网页 #调用浏览器摄像头并拍照 #开启摄像头权限 #拍照后查看与上传服务器端 #摄像头黑屏打不开问题 #express #cherry studio #文件IO #输入输出流 #GLM-4.6V-Flash-WEB # AI视觉 # 本地部署 #工业级串口服务器 #串口转以太网 #串口设备联网通讯模块 #串口服务器选型 #embedding #IndexTTS2 # 阿里云安骑士 # 木马查杀 #AI应用编程 # 自动化运维 #入侵 #日志排查 #kmeans #聚类 #若依 #前端开发 #EN4FE #人大金仓 #Kingbase #代理模式 #Spring AOP #自由表达演说平台 #演说 #程序员创富 #程序人生 #3d #多进程 #python技巧 #蓝桥杯 #范式 #企业级存储 #网络设备 #iot #软件工程 #生信 #Smokeping #Karalon #AI Test #策略模式 #pve #租显卡 #训练推理 #YOLOv8 # Docker镜像 #流程图 #论文阅读 #论文笔记 #图论 #国产开源制品管理工具 #Hadess #一文上手 #okhttp #小艺 #鸿蒙 #搜索 #bigtop #hdp #hue #kerberos #pencil #pencil.dev #设计 #zotero #WebDAV #同步失败 #健康医疗 #轻量化 #低配服务器 #Anything-LLM #IDC服务器 #私有化部署 #大模型应用 #API调用 #PyInstaller打包运行 #服务端部署 #raid #raid阵列 #java大文件上传 #java大文件秒传 #java大文件上传下载 #java文件传输解决方案 #PyCharm # 远程调试 # YOLOFuse #工程实践 #欧拉 #journalctl #AI应用 #图像识别 #Langchain-Chatchat # 国产化服务器 # 信创 #高考 #生产服务器问题查询 #日志过滤 #Autodl私有云 #深度服务器配置 #麒麟 #V11 #kylinos # 水冷服务器 # 风冷服务器 #VoxCPM-1.5-TTS # 云端GPU # PyCharm宕机 #儿童AI #图像生成 #Qwen #API #pjsip #taro #openresty #lua #wps #Linux多线程 #Beidou #北斗 #SSR #Syslog #系统日志 #日志监控 #simulink #matlab #aiohttp #asyncio #异步 #SSH保活 #信息安全 #信息收集 #软件 #本地生活 #电商系统 #商城 #everything #poll #人脸识别sdk #视频编解码 #人脸识别 #AI生成 # outputs目录 # 自动化 #Playbook #AI服务器 #stl #漏洞修复 #IIS Crypto #sglang #AI论文写作工具 #学术写作辅助 #论文创作效率提升 #AI写论文实测 #数字化转型 #实体经济 #商业模式 #软件开发 #数智红包 #商业变革 #创业干货 #SSH Agent Forwarding # 容器化 #AB包 #性能 #优化 #RAM #Go并发 #高并发架构 #Goroutine #系统设计 #Tracker 服务器 #响应最快 #torrent 下载 #2026年 #Aria2 可用 #迅雷可用 #BT工具通用 #FASTMCP #高斯溅射 #UEFI #BIOS #Legacy BIOS #产品运营 #eBPF #联机教程 #局域网联机 #局域网联机教程 #局域网游戏 #交换机 #三层交换机 #Harbor #AI智能棋盘 #Rock Pi S #边缘计算 #云开发 #PTP_1588 #gPTP #C/C++ #c++高并发 #百万并发 #FTP服务器 #uip # 权限修复 #进程等待 #wait #waitpid # HiChatBox # 离线AI #MinIO服务器启动与配置详解 #gateway #Comate #SMTP # 内容安全 # Qwen3Guard #X11转发 #平板 #零售 #智能硬件 #vncdotool #链接VNC服务器 #如何隐藏光标 #H5网页 #网页白屏 #H5页面空白 #资源加载问题 #打包部署后网页打不开 #HBuilderX #CTF #改行学it #b树 #Deepoc #具身模型 #开发板 #未来 #插件 #开源软件 #FHSS #NFC #智能公交 #服务器计费 #FP-增长 #算力建设 #tdengine #涛思数据 #memory mcp #Cursor #服务器解析漏洞 #Proxmox VE #虚拟化 #网路编程 #smtp #smtp服务器 #PHP #声源定位 #MUSIC #tensorflow # 远程访问 #飞牛nas #fnos #Streamlit #AI聊天机器人 #memcache #ServBay #ansys #ansys问题解决办法 #ranger #MySQL8.0 #智能体对传统行业冲击 #行业转型 #分布式数据库 #集中式数据库 #业务需求 #选型误 #Socket网络编程 #chat #HarmonyOS #雨云服务器 #Minecraft服务器 #教程 #MCSM面板 # 服务器配置 # GPU # 串口服务器 # NPort5630 #mtgsig #美团医药 #美团医药mtgsig #美团医药mtgsig1.2 #Python办公自动化 #Python办公 #copilot #个人博客 #硬盘克隆 #DiskGenius # 键鼠锁定 #opc模拟服务器 #工程设计 #预混 #扩散 #燃烧知识 #层流 #湍流 #量子计算 #MinIO #sentinel #反向代理 #政务 #参数估计 #矩估计 #概率论 #adobe #数据迁移 #powerbi #个人助理 #数字员工 #gmssh #宝塔 #1panel #Exchange #系统安装 #IPv6 #POC #问答 #交付 #scikit-learn #随机森林 #闲置物品交易系统 #静脉曲张 #腿部健康 #运动 #Minecraft #PaperMC #我的世界服务器 #AI Agent #开发者工具 #kong #Kong Audio #Kong Audio3 #KongAudio3 #空音3 #空音 #中国民乐 #计算机外设 #边缘AI # Kontron # SMARC-sAMX8 #jetty #scanf #printf #getchar #putchar #cin #cout #ET模式 #非阻塞 #凤希AI伴侣 #remote-ssh #SA-PEKS # 关键词猜测攻击 # 盲签名 # 限速机制 #产品经理 #就业 #docker安装seata #CMake #Make #OpenAI #故障 #多模态 #微调 #超参 #LLamafactory # IndexTTS 2.0 #全链路优化 #实战教程 #database #vps