长生栈 长生栈
首页
  • 编程语言

    • C语言
    • C++
    • Java
    • Python
  • 数据结构和算法

    • 全排列算法实现
    • 动态规划算法
  • CMake
  • gitlab 安装和配置
  • docker快速搭建wordpress
  • electron+react开发和部署
  • Electron-创建你的应用程序
  • ImgUI编译环境
  • 搭建图集网站
  • 使用PlantUml画时序图
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Living Team

编程技术分享
首页
  • 编程语言

    • C语言
    • C++
    • Java
    • Python
  • 数据结构和算法

    • 全排列算法实现
    • 动态规划算法
  • CMake
  • gitlab 安装和配置
  • docker快速搭建wordpress
  • electron+react开发和部署
  • Electron-创建你的应用程序
  • ImgUI编译环境
  • 搭建图集网站
  • 使用PlantUml画时序图
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 文件操作

  • Linux进程和线程

  • Linux信号

  • 进程间通信

  • Socket

    • C linux socket tcp封装库
    • C linux socket - tcp - 单线程
    • C linux socket - tcp - 多进程
    • C linux socket - tcp - 多线程
    • C linux socket - tcp - select
    • C linux socket - tcp - epoll
    • C linux socket - tcp - 线程池epoll
      • 封装原始socket库
      • 调用-client
      • 调用-server
      • makefile
      • 编译
    • C linux socket - tcp - 非阻塞epoll
    • C linux socket - tcp - loop-epoll
  • C语言
  • Socket
DC Wang
2022-03-25
目录

C linux socket - tcp - 线程池epoll

# C linux socket - tcp - select

# 封装原始socket库

  • myconnect.h
#ifndef __MYCONNECT_H_
#define __MYCONNECT_H_

#include<sys/socket.h>

int Accept(int __fd, struct sockaddr *__restrict__ __addr, socklen_t *__restrict__ __addr_len);
int Bind(int __fd, const struct sockaddr *__addr, socklen_t __len);
int Connect(int __fd, const struct sockaddr *__addr, socklen_t __len);
int Listen(int __fd, int __n);
int Socket(int __domain, int __type, int __protocol);
ssize_t Read(int __fd, void *__buf, size_t __nbytes);
ssize_t Write(int __fd, const void *__buf, size_t __n);
int Close(int __fd);

ssize_t Readn(int __fd, void *__buf, size_t n);
ssize_t Writen(int __fd, const void *__buf, size_t n);
ssize_t Readline(int __fd, void *__buf, size_t maxlen);

#endif

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  • myconnect.c
#include"myconnect.h"
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<errno.h>
#include<sys/socket.h>

#define handle_error(msg) \
           do { perror(msg); exit(EXIT_FAILURE); } while (0)

int Accept(int __fd, struct sockaddr *__restrict__ __addr, socklen_t *__restrict__ __addr_len)
{
    int n;
again:
    if ((n = accept(__fd, __addr, __addr_len)) < 0)
        if((errno==ECONNABORTED)||(errno==EINTR))
            goto again;
        else
            handle_error("accept");
    return n;
}

int Bind(int __fd, const struct sockaddr *__addr, socklen_t __len)
{
    int n;
    if ((n = bind(__fd, __addr, __len)) < 0)
        handle_error("bind");
    return n;
}

int Connect(int __fd, const struct sockaddr *__addr, socklen_t __len)
{
    int n;
    ;
    if ((n = connect(__fd, __addr, __len)) < 0)
        handle_error("connect");
    return n;
}

int Listen(int __fd, int __n)
{
    int n;
    if ((n = listen(__fd, __n)) < 0)
        handle_error("listen");
    return n;
}

int Socket(int __domain, int __type, int __protocol)
{
    int n;
    if ((n = socket(__domain, __type, __protocol)) < 0)
        handle_error("socket");
    return n;
}

/**
 * read返回值:
 *      1.> 0 实际读到的字节数
 *      2.= 0 数据读完(读到文件末尾、管道写端关闭、socket对端关闭)
 *      3.-1  异常
 *              1.errno==EINTR  被信号中断  重启/退出
 *              2.errno==EAGAIN(EWOULDBLOCK)  非阻塞方式读,并且没有数据
 *              3.其他情况  错误。---perror;  exit;
*/
ssize_t Read(int __fd, void *__buf, size_t __nbytes)
{
    ssize_t n;
again:
    if ((n = read(__fd, __buf, __nbytes)) == -1)
        if (errno == EINTR)
            goto again;
        else
            return -1;
    return n;
}

ssize_t Write(int __fd, const void *__buf, size_t __n)
{
    ssize_t n;
again:
    if((n=write(__fd,__buf,__n))==-1)
        if(errno==EINTR)
            goto again;
        else
            return -1;
    return n;
}

int Close(int __fd)
{
    int n;
    if((n=close(__fd))==-1)
        handle_error("close");
    return n;
}

/**
 * 读一定数量字符串,成功返回0,失败返回非零。
 * size_t n:应该读取的字节数
*/
ssize_t Readn(int __fd, void *__buf, size_t n)
{
    size_t nleft;           //unsigned int 剩余未读取的字节数
    ssize_t nread;          //int 实际读到的字节数
    char *ptr;

    ptr = __buf;
    nleft = n;              //n 未读取字节数
    while (nleft > 0)
    {
        if ((nread = read(__fd, ptr, nleft)) < 0)
        {
            if (errno == EINTR)
                nread = 0;
            else
                return -1;
        }
        else if (nread == 0)
            break;

        nleft -= nread;
        ptr += nread;
    }
    return n - nleft;
}

/*写一定数量字符串,成功返回0,失败返回-1或大于0的数*/
ssize_t Writen(int __fd, const void *__buf, size_t n)
{
    size_t nleft;
    ssize_t nwritten;
    const char *ptr = __buf;
    nleft = n;
    while (nleft > 0)
    {
        if((nwritten = write(__fd, ptr, nleft))<0)
        {
            if (errno == EINTR)
                nwritten = 0;
            else
                return -1;
        }
        else if (nwritten == 0)
            break;
        nleft -= nwritten;
        ptr += nwritten;
    }
    return nleft;
}

/*供Readline()使用*/
static ssize_t my_read(int __fd,char*ptr)
{
    static int read_cnt;
    static char *read_ptr;
    static char read_buff[100];

    if (read_cnt <= 0)
    {
again:
        if ((read_cnt = read(__fd, read_buff, sizeof(read_buff))) < 0)
        {
            if(errno==EINTR)
                goto again;
            return -1;
        }
        else if(read_cnt==0)
            return 0;

        read_ptr = read_buff;
    }
    read_cnt--;
    *ptr = *read_ptr++;

    return 1;
}
/*调用my_read(),实现一次读一行,即读到\n结束,末尾\n\0。*/
ssize_t Readline(int __fd, void *__buf, size_t maxlen)
{
    ssize_t n, rc;
    char    c, *ptr;
    ptr = __buf;

    for (n = 1; n < maxlen; n++)
    {
        if ((rc = my_read(__fd, &c)) == 1)
        {
            *ptr++ = c;
            if(c=='\n')
                break;
        }
        else if(rc==0)
        {
            *ptr = 0;
            return n - 1;
        }else
            return -1;
    }
    *ptr = 0;
    return n;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

# 调用-client

  • client.c
#include<stdio.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<ctype.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<string.h>
#include"myconnect.h"

#define SERV_PORT 6868
#define SERV_IP "192.168211.129"

int main()
{
    int cfd;
    struct sockaddr_in serv_addr;
    char buf[BUFSIZ];
    cfd = Socket(AF_INET, SOCK_STREAM, 0);

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(SERV_PORT);
    inet_pton(AF_INET, SERV_IP, &serv_addr.sin_addr.s_addr);

    Connect(cfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
    while(1)
    {
        memset(buf, 0, sizeof(buf));
        fgets(buf, sizeof(buf), stdin);
        Write(cfd, buf, strlen(buf));
        int n=Read(cfd, buf, sizeof(buf));
        if (n > 0)
            printf("%s", buf);
        else
            break;
    }

    close(cfd);

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 调用-server

  • server.c
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include<strings.h>
#include<string.h>
 
#include <errno.h>
  
#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 8006
#define INFTIM 1000
  
//线程池任务队列结构体
 
struct task{
  int fd; //需要读写的文件描述符
 
  struct task *next; //下一个任务
 
};
  
//用于读写两个的两个方面传递参数
 
struct user_data{
  int fd;
  unsigned int n_size;
  char line[MAXLINE];
};
  
//线程的任务函数
 
void * readtask(void *args);
void * writetask(void *args);
  
  
//声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
 
struct epoll_event ev,events[20];
int epfd;
pthread_mutex_t mutex;
pthread_cond_t cond1;
struct task *readhead=NULL,*readtail=NULL,*writehead=NULL;
  
void setnonblocking(int sock)
{
     int opts;
     opts=fcntl(sock,F_GETFL);
     if(opts<0)
     {
          perror("fcntl(sock,GETFL)");
          exit(1);
     }
    opts = opts|O_NONBLOCK;
     if(fcntl(sock,F_SETFL,opts)<0)
     {
          perror("fcntl(sock,SETFL,opts)");
          exit(1);
     } 
}
  
int main()
{
     int i, maxi, listenfd, connfd, sockfd,nfds;
     pthread_t tid1,tid2;
     
     struct task *new_task=NULL;
     struct user_data *rdata=NULL;
     socklen_t clilen;
     
     pthread_mutex_init(&mutex,NULL);
     pthread_cond_init(&cond1,NULL);
     //初始化用于读线程池的线程
 
     pthread_create(&tid1,NULL,readtask,NULL);
     pthread_create(&tid2,NULL,readtask,NULL);
     
     //生成用于处理accept的epoll专用的文件描述符 
 
     epfd=epoll_create(256);
  
     struct sockaddr_in clientaddr;
     struct sockaddr_in serveraddr;
     listenfd = socket(AF_INET, SOCK_STREAM, 0);
     //把socket设置为非阻塞方式
 
     setnonblocking(listenfd);
     //设置与要处理的事件相关的文件描述符
 
     ev.data.fd=listenfd;
     //设置要处理的事件类型
 
     ev.events=EPOLLIN|EPOLLET;
     //注册epoll事件
 
     epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
     
     bzero(&serveraddr, sizeof(serveraddr)); 
     serveraddr.sin_family = AF_INET; 
     serveraddr.sin_port=htons(SERV_PORT);
     serveraddr.sin_addr.s_addr = INADDR_ANY;
     bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
     listen(listenfd, LISTENQ);
     
     maxi = 0;
     for ( ; ; ) {
          //等待epoll事件的发生
 
          nfds=epoll_wait(epfd,events,20,500);
          //处理所发生的所有事件 
 
        for(i=0;i<nfds;++i)
        {
               if(events[i].data.fd==listenfd)
               {
                    
                    connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
                    if(connfd<0){
                      perror("connfd<0");
                      exit(1);
                   }
                    setnonblocking(connfd);
                    
                    char *str = inet_ntoa(clientaddr.sin_addr);
                    //std::cout<<"connec_ from >>"<<str<<std::endl;
 
                    //设置用于读操作的文件描述符
 
                    ev.data.fd=connfd;
                    //设置用于注测的读操作事件
 
                 ev.events=EPOLLIN|EPOLLET;
                    //注册ev
 
                 epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
               }
            else if(events[i].events&EPOLLIN)
            {
                    //printf("reading!/n"); 
 
                    if ( (sockfd = events[i].data.fd) < 0) continue;
                    new_task=new task();
                    new_task->fd=sockfd;
                    new_task->next=NULL;
                    //添加新的读任务
 
                    pthread_mutex_lock(&mutex);
                    if(readhead==NULL)
                    {
                      readhead=new_task;
                      readtail=new_task;
                    } 
                    else
                    { 
                     readtail->next=new_task;
                      readtail=new_task;
                    } 
                   //唤醒所有等待cond1条件的线程
 
                    pthread_cond_broadcast(&cond1);
                    pthread_mutex_unlock(&mutex); 
              }
               else if(events[i].events&EPOLLOUT)
               { 
                 /*
              rdata=(struct user_data *)events[i].data.ptr;
                 sockfd = rdata->fd;
                 write(sockfd, rdata->line, rdata->n_size);
                 delete rdata;
                 //设置用于读操作的文件描述符
                 ev.data.fd=sockfd;
                 //设置用于注测的读操作事件
               ev.events=EPOLLIN|EPOLLET;
                 //修改sockfd上要处理的事件为EPOLIN
               epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
             */
               }
                              
          }
          
     }
}
 
static int count111 = 0;
static time_t oldtime = 0, nowtime = 0;
void * readtask(void *args)
{
    
   int fd=-1;
   unsigned int n;
   //用于把读出来的数据传递出去
 
   struct user_data *data = NULL;
   while(1){
         
        pthread_mutex_lock(&mutex);
        //等待到任务队列不为空
 
        while(readhead==NULL)
             pthread_cond_wait(&cond1,&mutex);
         
        fd=readhead->fd;
        //从任务队列取出一个读任务
 
        struct task *tmp=readhead;
        readhead = readhead->next;
        delete tmp;
        pthread_mutex_unlock(&mutex);
        data = new user_data();
        data->fd=fd;
         
 
        char recvBuf[1024] = {0}; 
        int ret = 999;
        int rs = 1;
 
        while(rs)
        {
            ret = recv(fd,recvBuf,1024,0);// 接受客户端消息
 
            if(ret < 0)
            {
                //由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可//读在这里就当作是该次事件已处理过。
 
                if(errno == EAGAIN)
                {
                    printf("EAGAIN\n");
                    break;
                }
                else{
                    printf("recv error!\n");
         
                    close(fd);
                    break;
                }
            }
            else if(ret == 0)
            {
                // 这里表示对端的socket已正常关闭. 
 
                rs = 0;
            }
            if(ret == sizeof(recvBuf))
                rs = 1; // 需要再次读取
 
            else
                rs = 0;
        }
        if(ret>0){
 
        //-------------------------------------------------------------------------------
 
 
            data->n_size=n;
 
 
            count111 ++;
 
            struct tm *today;
            time_t ltime;
            time( &nowtime );
 
            if(nowtime != oldtime){
                printf("%d\n", count111);
                oldtime = nowtime;
                count111 = 0;
            }
 
            char buf[1000] = {0};
            sprintf(buf,"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s","Hello world!\n");
            send(fd,buf,strlen(buf),0);
            close(fd);
 
 
       }
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284

# makefile

  • makefile
all: server client
server: server.c myconnect.o
	gcc server.c myconnect.o -o server
client: client.c myconnect.o
	gcc client.c myconnect.o -o client
myconnect.o:myconnect.c
	gcc -c myconnect.c
clean:
	rm *.o server client
1
2
3
4
5
6
7
8
9

# 编译

#编译
make
#清除编译文件
#make clean
1
2
3
4

先执行server,再执行client,效果是client输入小写,server转大写后发送回client。

编辑 (opens new window)
#C#socket
上次更新: 2023/03/31, 22:34:04
C linux socket - tcp - epoll
C linux socket - tcp - 非阻塞epoll

← C linux socket - tcp - epoll C linux socket - tcp - 非阻塞epoll→

最近更新
01
ESP32-网络摄像头方案
06-14
02
ESP32-PWM驱动SG90舵机
06-14
03
ESP32-实时操作系统freertos
06-14
更多文章>
Theme by Vdoing | Copyright © 2019-2025 DC Wang All right reserved | 辽公网安备 21021102001125号 | 吉ICP备20001966号-2
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式