Daya Jin's Blog Python and Machine Leaning

C++ SOCKET

2019-09-11


WinSock2

WinSock2是在Windows下实现socket编程的一个库。一个微软官方的简单C/S示例代码见此:(ServerClient)。可以看出一个最简单的单点C/S架构代码就比较复杂,这是因为官方Demo中加入了各种异常情况的处理。考虑到socket编程最需要关注的点在于收发步骤的逻辑实现,这里给出了一组简化版本的跨平台代码:(ServerClient)。

网络开发主要是围绕connecting、sending和receiving的逻辑来进行代码修改,函数API查询地址见此

select模型

上述示例代码有两个缺陷,一是阻塞模式,二是单点通信。在网络编程中有一种IO复用的select模型,由操作系统实现。其原型为:

select(_In_ int nfds,    // 最大fd的ID
       _Inout_opt_ fd_set FAR * readfds,    // 可读fd集合
       _Inout_opt_ fd_set FAR * writefds,    // 可写fd集合
       _Inout_opt_ fd_set FAR * exceptfds,    // 异常fd集合
       _In_opt_ const struct timeval FAR * timeout);    // 阻塞时间,Linux下会重置

 fd_set本质是一个长度为1024的位图,对fd_set的操作依赖如下几个宏:

  • FD_ZERO():清零某个fd集合;
  • FD_SET():把一个fd置位;
  • FD_CLR():把一个fd置零;
  • FD_ISSET():判断一个fd是否置位。

在程序调用select函数之后会陷入内核态,fd_set被完整地复制到内核态中,由内核来监听所有置位的fd,同时程序阻塞,直到超时或者任意fd发生事件。一旦内核检测到有事件发生,内核会修改fd_set仅保留需要处理的fd。因此在调用select之后只需要扫描fd_set中被置位的fd进行处理即可。下面使用select模型实现一个支持多客户端的回显服务器。示例源码见此

Server

首先实现server的回显功能:

int echo(SOCKET clientSock) {
    char data[BUFFER_SIZE] = {};
    if (recv(clientSock, data, BUFFER_SIZE, 0) > 0)
        send(clientSock, data, strlen(data) + 1, 0);
    else
        return -1;

    return 0;
}

基于模版代码,在成功监听端口之后使用select模型来处理事务:

// 4. Listen on the socket
// ...

char sBuf[BUFFER_SIZE] = "hello from server.";
std::vector<SOCKET> Client_pool;    // 客户端队列,#include<vector>

while (true) {
    fd_set fdRead;    // 读fd集合
    FD_ZERO(&fdRead);    // 清空集合
    FD_SET(listenSock, &fdRead);    // 监控ListenSocket
    SOCKET nfds = listenSock;    // 最大fd的值

    // 将所有客户套接字加入fdRead
    for (int i = 0; i < (int)Client_pool.size(); i++)
    {
        FD_SET(Client_pool[i], &fdRead);
        if (nfds < Client_pool[i])
            nfds = Client_pool[i];
    }

    // 调用select监控所有fd
    timeval t_val = { 1,0 };    // 阻塞时间1.0s
    if (select(nfds + 1, &fdRead, 0, 0, &t_val) < 0)
        break;    // error

    // 处理监听套接字
    if (FD_ISSET(listenSock, &fdRead))
    {
        // 5. Accept a connection
        sockaddr_in _cin = {};
        int len = sizeof(sockaddr_in);
#ifdef _WIN32
        SOCKET clientSock = accept(listenSock, (sockaddr*)&_cin, &len);
#else
        SOCKET clientSock = accept(listenSock, (sockaddr*)&_cin, (socklen_t*)&len);
#endif
        if (INVALID_SOCKET == clientSock)
            cout << "scoket invalid!" << endl;
        Client_pool.push_back(clientSock);

        send(clientSock, sBuf, strlen(sBuf) + 1, 0);
        cout << "recv a new client: IP " << inet_ntoa(_cin.sin_addr) << endl;

        FD_CLR(listenSock, &fdRead);    // 从集合中删除
    }

    // 6. Receive and send data
    // 处理有事件发生的客户端socket
    for (int i = Client_pool.size() - 1; i >= 0; i--) {
        if (FD_ISSET(Client_pool[i], &fdRead)) {
            /*判断client是否下线,并删除client的socket*/
            if (echo(Client_pool[i]) == -1) {
                std::vector<SOCKET>::iterator iter = Client_pool.begin() + i;
                if (iter != Client_pool.end())    // 客户连接数非空
                    Client_pool.erase(iter);
            }
        }
    }

    cout << "idle time..." << endl;
}

因为使用了vector数组来维护客户端socket,因此在清理时需要注意:

// 7. Disconnect
for (int i = (int)Client_pool.size() - 1; i >= 0; i--)
#ifdef _WIN32
    closesocket(Client_pool[i]);
#else
    close(Client_pool[i]);
#endif

#ifdef _WIN32
closesocket(listenSock);
WSACleanup();
#else
close(listenSock);
#endif

Client

同样的,首先实现客户端输入功能与接收回显的功能:

void input(SOCKET connSock) {
    char data[BUFFER_SIZE] = {};
    while (true) {
        cin.getline(data, BUFFER_SIZE);
        send(connSock, data, strlen(data) + 1, 0);
    }
}

int handler(SOCKET connSock) {
    char data[BUFFER_SIZE] = {};
    if (recv(connSock, data, BUFFER_SIZE, 0) > 0)
        cout << "recv msg: " << data << endl;
    else
        return -1;    // 服务器丢失

    return 0;
}

在成功连接服务器之后,客户端代码需要完成两件事:使用线程来托管输入功能,使用selecct模型来与服务器交互。

// 3. Connect to the server
// ...

// 4. Send and receive data
char cBuf[BUFFER_SIZE] = {};
if (recv(connSock, cBuf, BUFFER_SIZE, 0))
    cout << "recv msg: " << cBuf << endl;
std::thread t(input, connSock);    // #include <thread>
t.detach();    // 守护线程接管输入

while (true) {
    fd_set fdRead;    // fd读集合
    FD_ZERO(&fdRead);    // 读集合全部复位
    FD_SET(connSock, &fdRead);    // 监听套接字置位

    timeval t_val = { 1,0 };    // 阻塞时间1.0s
    if (select(connSock + 1, &fdRead, 0, 0, &t_val) < 0)
        break;    // 任务结束

    if (FD_ISSET(connSock, &fdRead))
        if (handler(connSock) == -1)
            break;    // 服务器丢失

    FD_CLR(connSock, &fdRead);

    cout << "idle time..." << endl;
}

需要注意的是select模型仍然属于阻塞模型,其阻塞时间由一个timeval格式的结构体来决定。select模型的缺陷很明显:

  • 首先是select能够监控的fd数量是有限的(默认1024);
  • fd_set类型会完整地从用户态复制到内核态,内核返回时又会完整地复制会用户态;
  • 因为每次内核返回都会修改fd_set,因此该数据无法重用;
  • 内核返回信息仅能知道发生了事件,却不知道是哪个fd发生了事件,需要$O(n)$去扫描。

黏包

黏包是网络编程中老生常谈的问题了,主要原因就是因为缓冲区的存在,接收到的数据包都是无区分连续的存储在缓冲区中。为了能够精确地从缓冲区中取出一个单独且完整的数据包,必须要有方法能够从缓冲区的连续数据中区分出包与包之间的间隔。

常用的方法是双方协定一个数据格式,每个数据包分为header与body,其中header大小固定而body大小不定,每个数据包的body大小承载在header中,这样一来接收端就可以先接受header,然后再根据header中的信息再接收body。如下代码演示了一个简单的数据包格式:

class Header {
public:
    int cmd;    // 控制字段
    int length;    // 包体大小
};

class DataPack :public Header {
public:
    char data[2048] = "hello from client";
    DataPack() {
        this->cmd = 0;
        this->length = sizeof(DataPack) - sizeof(Header);
    }
};

缓冲区与阻塞

首先系统的socket自带缓冲区,可称为系统缓冲区。该缓冲区由系统托管,好处就是完全自动化。但是缺点在于很容易阻塞,当CS交互过快并且数据包比较大时,(通常接收端)系统缓冲区很容易被数据填满并得不到及时的清空,就会引发阻塞,这种阻塞即使关闭多余的连接也不会恢复,只能重连。因此为了避免系统缓冲区导致的阻塞,通常会在通信程序中再设立一个单独的程序缓冲区,由程序员管理。因为设立程序缓冲区的目的就是为了不让系统缓冲区阻塞,因此程序缓冲区一般会设的比系统缓冲区大。另一方面,程序缓冲区要尽可能快的将系统缓冲区中的数据搬运到自身,不然就没有存在的意义。

对于客户端而言,只需要设置一个程序缓冲区。程序缓冲区需要自行编写代码维护,首先需要使用一个idx来记录缓冲区中的数据长度,当系统缓冲区存在数据时立马转储进程序缓冲区并更新idx,该行为由select触发;另一方面,当程序缓冲区中的数据大于一个数据头的长度时,读取一个数据头并获取该数据包的长度,然后进入分支:

  • 若程序缓冲区中数据长度不足,则说明该数据包内容并未完整到达,退出等待下一次调用;

  • 若程序缓冲区中数据长度大于该包的完整大小,则完整取出一个数据包,进行响应或处理,然后将程序缓冲区中的数据往前移并更新idx,然后再次判断。

示例代码如下,其中buffer为程序缓冲区。

int len = recv(this->connSock, this->buffer+this->bufIdx, RECV_SIZE, 0);
if (len < 0)
    return -1;    // 服务器丢失

this->bufIdx += len;
if (this->bufIdx > sizeof(Header)) {    // 接收数据头
    DataPack dp;
    memcpy(&dp, this->buffer, sizeof(Header));

    while (this->bufIdx >= (sizeof(Header) + dp.length)) {    // datapack
        memcpy(dp.data, this->buffer + sizeof(Header), dp.length);

        // 缓冲区数据前移
        memcpy(this->buffer, this->buffer + sizeof(Header) + dp.length,
                this->bufIdx - sizeof(Header) - dp.length);
        this->bufIdx -= sizeof(Header) + dp.length;

        //cout << "recv msg : " << dp.data << endl;
    }
}

对于服务端,因为其要接受多个客户端的连接,若仅设置一个程序缓冲区供多客户端发送的话,如何保持数据同步就是一个问题。因此这里使用异步方式,即为每一个连接进来的客户端都专门设置一个程序缓冲区。程序缓冲区的维护同客户端,示例代码如下,其中clientSock是客户套接字对象,其中封装了网络通信所需的功能。

int len = recv(clientSock->sock(), clientSock->buffer + clientSock->bufIdx, RECV_SIZE, 0);
if (len < 0)
    return -1;    // 客户端丢失

clientSock->bufIdx += len;
if (clientSock->bufIdx > sizeof(Header)) {    // 接收数据头
    DataPack dp;
    memcpy(&dp, clientSock->buffer, sizeof(Header));

    while (clientSock->bufIdx >= (sizeof(Header) + dp.length)) {
        memcpy(dp.data, clientSock->buffer + sizeof(Header), dp.length);

        // 缓冲区数据前移
        memcpy(clientSock->buffer, clientSock->buffer + sizeof(Header) + dp.length,
                clientSock->bufIdx - sizeof(Header) - dp.length);
        clientSock->bufIdx -= sizeof(Header) + dp.length;

        //cout << "recv msg : " << dp.data << endl;
        this->echo(clientSock, &dp);
    }
}

设立程序缓冲区的完整代码见此。需要注意的是,上述代码中使用了while循环来处理消息,在消息过多时程序会在此处阻塞,但是跟系统缓冲区阻塞不同,此处的阻塞在消息处理完之后会自行消失。以Windows本机为服务器,Ubuntu虚拟机为客户端,单点通信的数据流量如下图所示:


Similar Posts

上一篇 C++ STL

Content