基于epoll的多线程网络服务程序设计

基于epoll的多线程网络服务程序设计——C语言​

采用C语言设计了一个基于epoll的多线程网络服务程序。每个线程都有一个epoll来捕获处于这个线程的socket事件。当子线程数量为0,即只有一个线程,则网络监听服务与socket消息处理处于同一个epoll。当子线程数量大于0时,主线程监听socket连接,当有新的连接到来时将其加入到活跃socket数量最小的子线程的epoll中。

server.h

#ifndef EPOLL_C_SERVER_H
#define EPOLL_C_SERVER_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <unistd.h>
#include <errno.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>

#define RESULT_OK 0
#define RESULT_ERROR -1

/**************************************************************************
* Function    : *MSG_HANDLE
* Input       : socket_fd --> socket文件描述符
*             : arg --> void* 参数
* Output      :
* Return      : 1 处理成功,继续等待下次消息;0 处理完毕完毕该连接;-1 异常错误发生
* Description : 消息处理函数指针
****************************************************************************/
typedef int (*MSG_HANDLE)(int socket_fd,void* arg) ;  

typedef struct
{
    int epoll_fd;
    pthread_t thd_fd;
    
    //消息处理函数,各个线程会调用该函数进行消息处理
    MSG_HANDLE data_func;
    
    //一个线程里面的有效socket数量
    unsigned int active_conection_cnt; 
    //线程互斥锁,用于实时更新有效socket数量
    pthread_mutex_t thd_mutex;  
}socket_thd_struct;   //表示处理socket的子线程

typedef struct
{
    int epoll_fd;
    unsigned short ip_port;
    
    //消息处理函数,当只有一个线程时,会调用该函数进行消息处理
    MSG_HANDLE data_func;

    //子线程数量,可以为0,为0表示server与socket处理处于同一个线程
    unsigned int socket_pthread_count; 
    //子线程结构体指针
    socket_thd_struct* socket_thd;   

}server_struct;  //一个网络服务结构体

/**************************************************************************
* Function    : initServerStruct
* Input       : param_port --> 服务端口号
*             : param_thd_count --> 子线程数量,用于处理连接的client
*             : param_handle --> socket数据处理函数指针
* Output      :
* Return      : 初始化好的server结构体
* Description : 初始化server结构体
****************************************************************************/
server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle);

/**************************************************************************
* Function    : serverRun
* Input       : param_server --> server服务结构体参数
* Output      :
* Return      :RESULT_OK(0):执行成功;RESULT_ERROR(-1):执行失败
* Description : 运行网络服务,监听服务端口
****************************************************************************/
int serverRun(server_struct *param_server);

#endif //EPOLL_C_SERVER_H

server.c

#include "server.h"

static void* socketPthreadRun(void* arg)
{
    socket_thd_struct* pa_sock_st=(socket_thd_struct*)arg;
    int active_counts=0;
    struct epoll_event ev;
    struct epoll_event events[5];
    int ret=0;

    while(1)
    {
        //等待读写事件的到来
        active_counts=epoll_wait(pa_sock_st->epoll_fd,events,5,-1);
        fprintf(stdout,"active count:%d\n",active_counts);

        int index=0;
        for(index=0;index<active_counts;index++)
        {
            if(events[index].events&EPOLLERR) //发生异常错误
            {
                fprintf(stderr,"error happened:errno(%d)-%s\n",errno,strerror(errno));
                epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                close(events[index].data.fd);

                pthread_mutex_lock(&(pa_sock_st->thd_mutex));
                pa_sock_st->active_conection_cnt--;
                pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
            }
            else if(events[index].events&EPOLLRDHUP) //对端异常关闭连接
            {
                fprintf(stdout,"client close this socket\n");
                epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                close(events[index].data.fd);

                pthread_mutex_lock(&(pa_sock_st->thd_mutex));
                pa_sock_st->active_conection_cnt--;
                pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
            }
            else if(events[index].events&EPOLLIN) //读事件到来,进行消息处理
            {
                ret=pa_sock_st->data_func(events[index].data.fd,NULL);
                if(ret==-1)
                {
                    fprintf(stderr,"client socket exception happened\n");
                    epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                    close(events[index].data.fd);

                    pthread_mutex_lock(&(pa_sock_st->thd_mutex));
                    pa_sock_st->active_conection_cnt--;
                    pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
                }
                if(ret==0)
                {
                    fprintf(stdout,"client close this socket\n");
                    epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                    close(events[index].data.fd);

                    pthread_mutex_lock(&(pa_sock_st->thd_mutex));
                    pa_sock_st->active_conection_cnt--;
                    pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
                }
                else if(ret==1)
                {
                    
                }
            }
        }
    }

    pthread_exit(NULL);
}

server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle)
{
    server_struct* serv_st=(server_struct*)malloc(sizeof(server_struct));
    serv_st->ip_port=param_port;
    serv_st->data_func=param_handle;
    serv_st->epoll_fd=epoll_create(256);
    serv_st->socket_pthread_count=param_thd_count;
    serv_st->socket_thd=NULL;

    if(serv_st->socket_pthread_count>0)
    {
        fprintf(stdout,"create client socket sub thread\n");
        serv_st->socket_thd=(socket_thd_struct*)malloc(sizeof(socket_thd_struct)*serv_st->socket_pthread_count);

        int index=0;
        for(index=0;index<serv_st->socket_pthread_count;index++)
        {
            serv_st->socket_thd[index].epoll_fd=epoll_create(256);
            serv_st->socket_thd[index].data_func=param_handle;
            serv_st->socket_thd[index].active_conection_cnt=0;
            serv_st->socket_thd[index].thd_fd=0;
            //创建子线程
            pthread_create(&(serv_st->socket_thd[index].thd_fd),NULL,socketPthreadRun,(void*)&(serv_st->socket_thd[index]));
            //初始化线程互斥锁
            pthread_mutex_init(&(serv_st->socket_thd[index].thd_mutex),NULL);
        }
    }

    return serv_st;
}

int serverRun(server_struct *param_server)
{
    int ret=RESULT_OK;
    int server_socket=0;
    struct sockaddr_in server_addr;
    bzero(&server_addr,sizeof(server_addr));
    struct epoll_event ev;
    struct epoll_event events[5];
    int active_count=0;
    int index=0;
    int new_socket=0;
    struct sockaddr_in client_info;
    socklen_t client_info_len=0;

    server_addr.sin_family=AF_INET;
    server_addr.sin_addr.s_addr=htons(INADDR_ANY);
    server_addr.sin_port=htons(param_server->ip_port);

    server_socket=socket(PF_INET,SOCK_STREAM,0);
    if(server_socket<0)
    {
        fprintf(stderr,"create socket error:errno(%d)-%s\n",errno,strerror(errno));
        return RESULT_ERROR;
    }
    fprintf(stdout,"create server socket ssuccessful\n");

    param_server->epoll_fd=epoll_create(256);

    ev.data.fd=server_socket;
    ev.events=EPOLLIN|EPOLLET;
    if(epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,server_socket,&ev)!=0)
    {
        fprintf(stderr,"server socket add to epoll error:errno(%d)-%s\n",errno,strerror(errno));
        return RESULT_ERROR;
    }
    fprintf(stdout,"server socket add to epoll successful\n");

    if(bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr))!=0)
    {
        fprintf(stderr,"server bind failed:errno(%d)-%s\n",errno,strerror(errno));
        return RESULT_ERROR;
    }
    fprintf(stdout,"server socket bind successful\n");

    if(listen(server_socket,param_server->ip_port)!=0)
    {
        fprintf(stderr,"server listen failed:errno(%d)-%s\n",errno,strerror(errno));
        return RESULT_ERROR;
    }
    fprintf(stdout,"server socket listen successful\n");

    while(1)
    {
        active_count=epoll_wait(param_server->epoll_fd,events,5,-1);
        fprintf(stdout,"active count:%d\n",active_count);

        for(index=0;index<active_count;index++)
        {
            if(events[index].data.fd==server_socket) //新连接过来
            {
                fprintf(stdout,"new socket comming\n");
                client_info_len=sizeof(client_info);
                new_socket=accept(server_socket,(struct sockaddr*)&client_info,&client_info_len);
                if(new_socket<0)
                {
                    fprintf(stderr,"server accept failed:errno(%d)-%s\n",errno,strerror(errno));
                    continue;
                }

                fprintf(stdout,"new socket:%d.%d.%d.%d:%d-->connected\n",((unsigned char*)&(client_info.sin_addr))[0],((unsigned char*)&(client_info.sin_addr))[1],((unsigned char*)&(client_info.sin_addr))[2],((unsigned char*)&(client_info.sin_addr))[3],client_info.sin_port);

                ev.data.fd=new_socket;
                ev.events=EPOLLIN|EPOLLERR|EPOLLRDHUP;

                if(param_server->socket_pthread_count==0)
                {
                    epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,new_socket,&ev);
                }
                else
                {
                    int tmp_index=0;
                    int mix_cnt_thread_id=0;
                    unsigned int act_cnts=0;
                    for(tmp_index=0;tmp_index<param_server->socket_pthread_count;tmp_index++)
                    {
                        pthread_mutex_lock(&(param_server->socket_thd[tmp_index].thd_mutex));
                        act_cnts=param_server->socket_thd[tmp_index].active_conection_cnt;
                        pthread_mutex_unlock(&(param_server->socket_thd[tmp_index].thd_mutex));
                        if(mix_cnt_thread_id>act_cnts)
                        {
                            mix_cnt_thread_id=tmp_index;
                        }
                    }

                    epoll_ctl(param_server->socket_thd[mix_cnt_thread_id].epoll_fd,EPOLL_CTL_ADD,new_socket,&ev);

                    pthread_mutex_lock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex));
                    param_server->socket_thd[mix_cnt_thread_id].active_conection_cnt++;
                    pthread_mutex_unlock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex));
                }

                fprintf(stdout,"add new client socket to epoll\n");
            }
            else if(events[index].events&EPOLLERR || events[index].events&EPOLLRDHUP) //对端关闭连接
            {
                fprintf(stdout,"client close this socket\n");
                epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                close(events[index].data.fd);
            }
            else if(events[index].events&EPOLLIN) //读事件到来,进行消息处理
            {
                fprintf(stdout,"begin recv client data\n");
                ret=param_server->data_func(events[index].data.fd,NULL);
                if(ret==-1)
                {
                    fprintf(stderr,"client socket exception happened\n");
                    epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                    close(events[index].data.fd);
                }
                if(ret==0)
                {
                    fprintf(stdout,"client close this socket\n");
                    epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                    close(events[index].data.fd);
                }
                else if(ret==1)
                {
                    
                }
            }
        }
    }

    close(server_socket);
    return RESULT_OK;
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • epoll封装reactor原理剖析示例详解

    目录 reactor是什么? reactor模型三个重要组件与流程分析 组件 流程 将epoll封装成reactor事件驱动 封装每一个连接sockfd变成ntyevent 封装epfd和ntyevent变成ntyreactor 封装读.写.接收连接等事件对应的操作变成callback 给每个客户端的ntyevent设置属性 将ntyevent加入到epoll中由内核监听 将ntyevent从epoll中去除 读事件回调函数 写事件回调函数 接受新连接事件回调函数 reactor运行 react

  • 基于epoll的多线程网络服务程序设计

    基于epoll的多线程网络服务程序设计——C语言​ 采用C语言设计了一个基于epoll的多线程网络服务程序.每个线程都有一个epoll来捕获处于这个线程的socket事件.当子线程数量为0,即只有一个线程,则网络监听服务与socket消息处理处于同一个epoll.当子线程数量大于0时,主线程监听socket连接,当有新的连接到来时将其加入到活跃socket数量最小的子线程的epoll中. server.h #ifndef EPOLL_C_SERVER_H #define EPOLL_C_SERV

  • Golang中基于HTTP协议的网络服务

    目录 一.HTTP协议的网络服务 1.1 使用http.Get函数访问HTTP协议的网络服务 1.2 使用缺省客户端DefaultClient(类型为*http.Client ) 1.3 使用http.Client访问HTTP协议的网络服务 二.http.Client中的Transport字段 (1)http.Transport类型中的DialContext字段 (2)http.Transport类型中的其它字段 三.为什么会出现空闲的连接 3.1 空闲连接的产生 3.2 杜绝空闲连接的产生 四

  • 基于NIO的Netty网络框架(详解)

    Netty是一个高性能.异步事件驱动的NIO框架,它提供了对TCP.UDP和文件传输的支持,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果. Netty的优点有: a.功能丰富,内置了多种数据编解码功能.支持多种网络协议. b.高性能,通过与其它主流NIO网络框架对比,它的综合性能最佳. c.可扩展性好,可通过它提供的ChannelHandler组件对网络通信方面进行灵活扩展. d.易用性,API使用简单.

  • C#基于委托实现多线程之间操作的方法

    本文实例讲述了C#基于委托实现多线程之间操作的方法.分享给大家供大家参考,具体如下: 有的时候我们要起多个线程,更多的时候可能会有某个线程会去操作其他线程里的属性. 但是线程是并发的,一般的调用是无法实现我们的要求的. 于是,我们在这里就可以用委托,代码如下 private delegate void DelegateInfo(); private delegate void DelegateIsEnd(); //这个是线程调用其他线程的方法 private void Dowork() { //

  • Python基于ThreadingTCPServer创建多线程代理的方法示例

    本文实例讲述了Python基于ThreadingTCPServer创建多线程代理的方法.分享给大家供大家参考,具体如下: #coding=utf8 from BaseHTTPServer import BaseHTTPRequestHandler from SocketServer import ThreadingTCPServer import gzip from StringIO import StringIO import logging logging.basicConfig(level

  • 在VMware+centOS 8上基于http协议搭建Git服务的方法

    一.起因 一定要看 本文最终目的是实现Android终端访问虚拟机中git服务,所以需要搭建http协议的git服务器,而如何搭建http协议的git服务器,前人之述备矣,笔者遂借鉴前人之作这里 二.设备信息 windows10家庭中文版(1903) VMware 15Pro(15.5.0 build-14665864) centOS 8(1905已关闭GUI,VMware采用NAT模式) 三.准备工作 (一)windows防火墙开放80端口控制面板 -> 系统和安全 -> Windows D

  • 基于QT的TCP通信服务的实现

    目录 一.结构 1.1 套接字 1.2 socket通信流程 1.3 QTcpsocket 1.4 QTcpServer 二.设计UI 2.1 客户端UI 2.2 服务器端UI 三.核心代码 四.效果图 一.结构 1.1 套接字 应用层通过传输层进行数据通信时,TCP和UDP会遇到同时为多个应用程序进程提供并发服务的问题.多个TCP连接或多个应用程序进程可能需要 通过同一个TCP协议端口传输数据.为了区别不同的应用程序进程和连接,许多计算机操作系统为应用程序与TCP/IP协议交互提供了称为套接字

  • 使用php来实现网络服务

    作者:samisa 以下文中的翻译名称对照表 : payload: 交谈内容 object: 实例 function: 函数 使用 php来实现网络服务 使用框架: WSO2 WSF/PHP 安装环境: windows 或者 linux (厌恶于眼下计算机文章夹杂无数难懂的翻译以及术语,此处尽量使用口语以及汉语.) WSMessages 类: 在调用网络服务的过程中,需要两个消息,发送的消息和接受的消息,又来有往方能来往不是. WSMessages 这个类就是在 Web services fra

  • Java实现基于NIO的多线程Web服务器实例

    代码地址:https://github.com/iyuanyb/webserver 实现了 静态.动态资源获取: Cookie.Session.HTTP 长连接,及 Session 和 HTTP 长连接的定时清除: 类似 Spring MVC 的注解式编程,如 @RequestMapping @RequestParam 等,方法中可以根据参数名从前台获取数据,可以传递对象,也支持级联属性,如: // GET /page?pageSize=10&pageNum=1 HTTP/1.1 @Reques

  • 基于Spring Cloud Zookeeper实现服务注册与发现

    服务注册 1.添加Spring Cloud Zookeeper依赖: <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId> <exclusions> <exclusion> <grou

随机推荐