reSIProcate学习笔记(七)


I/O多路复用

reSIProcate中有关使用I/O多路复用的方法来支持Socket并发的代码集中在FdPoll.hxx和FdPoll.cxx中。

一段关于FdPoll.hxx的注释说明:

/* The Makefile system may define the following: HAVE_EPOLL: system call epoll() is available. An implementation based upon FdSet (and select()) is always available.
This file and class is somewhat misnamed. It should really be called “SocketEvent” or such. The name “FdPoll” originated from an epoll-specific implementation. */

FdPollGrp类把三种I/O多路复用机制(select, poll, epoll)的共同点组织到了一起,不同机制具体的实现则交由其派生类FdPollImplFdSet、FdPollImplPoll和FdPollImplEpoll完成,其中的枚举类型

typedef enum { FdSetImpl = 0, PollImpl, EPollImpl } ImplType;

指明了具体使用何种机制。

FdPollGrp类中含有大量的纯虚函数:

virtual const char* getImplName() const = 0;
virtual ImplType getImplType() const = 0;

virtual FdPollItemHandle addPollItem(Socket sock, FdPollEventMask newMask, FdPollItemIf *item) = 0;
virtual void modPollItem(FdPollItemHandle handle, FdPollEventMask newMask) = 0;
virtual void delPollItem(FdPollItemHandle handle) = 0;

virtual void registerFdSetIOObserver(FdSetIOObserver& observer) = 0;
virtual void unregisterFdSetIOObserver(FdSetIOObserver& observer) = 0;

// Wait at most {ms} milliseconds. If any file activity has already occurs or
// occurs before {ms} expires, then FdPollItem will be informed (via cb method)
// and this method will return. Returns true iff any file activity occured.
// ms<0: wait forever, ms=0: don’t wait, ms>0: wait this long
// NOTE: “forever” may be a little as 60sec or as much as forever
virtual bool waitAndProcess(int ms=0) = 0;

// Legacy API’s (deprecated) – use waitAndProcess instead
virtual void buildFdSet(FdSet& fdSet) = 0;
virtual bool processFdSet(FdSet& fdset) = 0;

也有几个函数由该类本身实现:

typedef unsigned short FdPollEventMask;

FdPollGrp::processItem(FdPollItemIf *item, FdPollEventMask mask)
{
   try
   {
      item->processPollEvent( mask );
   }
   catch(BaseException& e)
   {
       ErrLog(<<"Exception thrown for FdPollItem: " << e);
   }
   item = NULL; // WATCHOUT: item may have been deleted
   /* If FPEM_Error was reported, should really make sure it was deleted
    * or disabled from polling. Otherwise were in stuck in an infinite loop.
    * But difficult to do that checking robustly until we serials the items. */
}

/* get the epoll-fd (epoll_create()) This is fd (type int), not Socket. It may be -1 if epoll is not enabled. */
virtual int FdPollGrp::getEPollFd() const { return -1; }

FdPollGrp* FdPollGrp::create(const char *implName = NULL)
{
   if ( implName==0 || implName[0]==0 || strcmp(implName,"event")==0 )
      implName = 0;     // pick the first (best) one supported
#ifdef RESIP_POLL_IMPL_EPOLL
   if ( implName==0 || strcmp(implName,"epoll")==0 )
   {
      return new FdPollImplEpoll();
   }
#endif
#ifdef RESIP_POLL_IMPL_POLL
   if ( implName==0 || strcmp(implName,"poll")==0 )
   {
      return new FdPollImplPoll();
   }
#endif
   if ( implName==0 || strcmp(implName,"fdset")==0 )
   {
      return new FdPollImplFdSet();
   }
   resip_assert(0);
   return NULL;
}

// Return candidate impl names with vertical bar (|) between them Intended for help messages
const char* FdPollGrp::getImplList()
{
#ifdef RESIP_POLL_IMPL_EPOLL
    #ifdef RESIP_POLL_IMPL_POLL
        return "event|epoll|fdset|poll";
    #else
        return "event|epoll|fdset";
    #endif
#else
    #ifdef RESIP_POLL_IMPL_POLL
        return "event|fdset|poll";
    #else
        return "event|fdset";
    #endif
#endif
}

FdPollGrp::create(const char *implName)函数返回的就是平台支持的性能最佳的机制。

当某个套接字描述符为执行I/O做好准备时,类FdPollItemIf的成员函数processPollEvent(FdPollEventMask mask)执行相应函数,FdPollItemIf类的派生类FdPollItemBase:

/* This is opaque type used to identify a particular Item. It is assigned when Item is allocated, and then used to modify or destroy the Item. NOTE: FdPollItemFake doesn't exist: it is fictious, thus this type can never be deferenced. */
typedef struct FdPollItemFake* FdPollItemHandle;

class FdPollItemBase : public FdPollItemIf
{
public:
    FdPollItemBase(FdPollGrp *grp, Socket fd, FdPollEventMask mask) : mPollGrp(grp), mPollSocket(fd), mPollHandle(0)
    {
        if(mPollGrp)
        {
            mPollHandle = mPollGrp->addPollItem(fd, mask, this);
        }
    }
    virtual ~FdPollItemBase()
    {
        if(mPollGrp)
        {
            mPollGrp->delPollItem(mPollHandle);
        }
    }
protected:
    FdPollGrp*        mPollGrp;
    Socket            mPollSocket;
    FdPollItemHandle  mPollHandle;
};

该类构造函数和析构函数中调用的正是FdPollGrp类的纯虚函数addPollItem()和delPollItem(),其实现根据FdPollImplFdSet、FdPollImplPoll和FdPollImplEpoll而异,分析这三个派生类之前,先看看位于Socket.hxx文件(@file-brief  Handles cross-platform sockets compatibility.)中的FdSet类,其完整定义摘抄如下:

/* @brief Object-oriented wrapper for your platform's file-descriptor set. */
class FdSet
{
public:
   FdSet() : size(0), numReady(0)
   {
      FD_ZERO(&read);
      FD_ZERO(&write);
      FD_ZERO(&except);
   }
   int select(struct timeval& tv)
   {
      return numReady = ::select(size, &read, &write, &except, &tv);
   }
   int selectMilliSeconds(unsigned long ms)
   {
      struct timeval tv;
      tv.tv_sec = (ms/1000);
      tv.tv_usec = (ms%1000)*1000;
      return select(tv);
   }
   bool readyToRead(Socket fd)
   {
      return (FD_ISSET(fd, &read) != 0);
   }
   bool readyToWrite(Socket fd)
   {
      return (FD_ISSET(fd, &write) != 0);
   }
   bool hasException(Socket fd)
   {
      return (FD_ISSET(fd,&except) != 0);
   }
   void setRead(Socket fd)
   {
      resip_assert( FD_SETSIZE >= 8 );
#ifndef WIN32 // windows fd are not int's and don't start at 0 - this won't work in windows
      resip_assert( fd < (int)FD_SETSIZE ); // redefineing FD_SETSIZE will not work 
#else
      resip_assert(read.fd_count < FD_SETSIZE); // Ensure there is room to add new FD
#endif
      FD_SET(fd, &read);
      size = ( int(fd+1) > size) ? int(fd+1) : size;
   }
   void setWrite(Socket fd)
   {
#ifndef WIN32 // windows fd are not int's and don't start at 0 - this won't work in windows
      resip_assert( fd < (int)FD_SETSIZE ); // redefinitn FD_SETSIZE will not work 
#else
      resip_assert(write.fd_count < FD_SETSIZE); // Ensure there is room to add new FD
#endif
      FD_SET(fd, &write);
      size = ( int(fd+1) > size) ? int(fd+1) : size;
   }
   void setExcept(Socket fd)
   {
#ifndef WIN32 // windows fd are not int's and don't start at 0 - this won't work in windows
      resip_assert( fd < (int)FD_SETSIZE ); // redefinitn FD_SETSIZE will not work 
#else
      resip_assert(except.fd_count < FD_SETSIZE); // Ensure there is room to add new FD
#endif
      FD_SET(fd,&except);
      size = ( int(fd+1) > size) ? int(fd+1) : size;
   }
   void clear(Socket fd)
   {
      FD_CLR(fd, &read);
      FD_CLR(fd, &write);
      FD_CLR(fd, &except);
   }
   void reset()
   {
      size = 0;
      numReady = 0;
      FD_ZERO(&read);
      FD_ZERO(&write);
      FD_ZERO(&except);
   }
   // Make this stuff public for async dns/ares to use
   fd_set read;
   fd_set write;
   fd_set except;
   int size;
   int numReady;  // set after each select call
};

该类对select()函数及所使用的fd_set参数操作进行了封装,把宏FD_ZERO、FD_SET、FD_CLR和FD_ISSET的操作都包装在了类的实现中。

未完待续..

Leave a comment

邮箱地址不会被公开。 必填项已用*标注