reSIProcate学习笔记(六)


Transport(Udp, Tcp, Tls, WebSocket)

reSIProcate的传输层支持多种协议,从TransportType.hxx中可以看到枚举类型及一些函数:

typedef enum 
{
   UNKNOWN_TRANSPORT = 0,
   TLS,
   TCP,
   UDP,
   SCTP,
   DCCP,
   DTLS,
   WS,
   WSS,
   MAX_TRANSPORT
} TransportType;

typedef enum 
{
   V4,
   V6
} IpVersion;

typedef enum
{
   StunDisabled,
   StunEnabled
} StunSetting;

IP地址、端口号和协议类型构成一个元组,传输层依据这些信息足够进行通信,Tuple类就是封装了这些信息,用以表示一个网络地址。Tuple类注释:

/* Tuple includes:
– IP address
– port
– protocol (TransportType)
– TLS hostname (since this is integral to connection establishment)
Internally the class is aware of the struct sockaddr/sin_addr/sin6addr binary representation of the address. The sa_family of struct sockaddr is used to keep track of whether a Tuple is representing a an IPv4 or IPv6 address. */

其针对IPv4和IPv6的两个构造函数如下:

Tuple(const in_addr& pipv4, 
      int pport,
      TransportType ptype, 
      const Data& targetDomain = Data::Empty,
      const Data& netNs = Data::Empty);

Tuple(const in6_addr& pipv6,  
      int pport, 
      TransportType ptype, 
      const Data& targetDomain = Data::Empty,
      const Data& netNs = Data::Empty);

数据成员如下:

union {
    sockaddr mSockaddr;
    sockaddr_in m_anonv4;
#ifdef IPPROTO_IPV6 // enable this if the current platform supports IPV6
    sockaddr_in6 m_anonv6;
#endif
    char pad[RESIP_MAX_SOCKADDR_SIZE]; // this make union same size if v6 is in or out
};
TransportType mTransportType;
Data mTargetDomain; 
Data mNetNs;  // The network namespace to which the address and port are scoped

/* This is a (largely) opaque key that subclasses of Transport will use to help record/find flows. For UDP and DTLS, this is just the FD, and the rest of the information about the flow is carried in the Tuple. For TCP and TLS, the FD of the connection is used. For protocols where using the FD would not be appropriate (SCTP), the transport may use whatever method to generate these it likes. (It is highly recommended that these ids are unique across all instances of a transport type) */
typedef unsigned long FlowKey;
typedef unsigned long TransportKey;
FlowKey mFlowKey;
TransportKey mTransportKey;

bool onlyUseExistingConnection;

FdPollItemIf类和FdSetIOObserver类都是抽象类,其完整定义分别如下:

class FdPollItemIf
{
public:
    FdPollItemIf() { };
    virtual ~FdPollItemIf();
    /* Called by PollGrp when activity is possible */
    virtual void processPollEvent(FdPollEventMask mask) = 0;
};

/* An interface class for elements that use an FdSet to watch for IO. This is in contrast to an element that uses event-driven IO. */
class FdSetIOObserver
{
public:
    FdSetIOObserver() {}
    virtual ~FdSetIOObserver() {}
    /* Add any FDs that we are interested in watching to an fdset, with the understanding that a select() call will be made immediately after.
       @param  fdset The FdSet to be augmented */
    virtual void buildFdSet(FdSet& fdset) = 0;
    /* Returns the maximum timeout this object is willing to tolerate on the select call that is to be made, to prevent starvation of work that is not IO-based.
       @return  The maximum select() timeout to be used, in milliseconds. To indicate that this object does not care, return UINT_MAX. 0 indicates that a poll select() should be performed. */
    virtual unsigned int getTimeTillNextProcessMS() = 0;

    /* Called once select() returns; this allows this object to inspect which of its FDs are ready, and perform any necessary IO with them.
       @param  fdset The FdSet after the call to select(). */
    virtual void process(FdSet& fdset) = 0;
};

留空保留。

下面先瞅一瞅传输层涉及各类之间的继承关系图:

Transport Inheriting

从基类Transport开始分析,它没有从基类FdSetIOObserver继承任何数据成员,其自身的数据成员如下:

Data mInterface;
Tuple mTuple;
CongestionManager* mCongestionManager;
ProducerFifoBuffer<TransactionMessage> mStateMachineFifo; // passed in
bool mShuttingDown;
static const Data transportNames[MAX_TRANSPORT];
Data mTlsDomain;
SharedPtr<SipMessageLoggingHandler> mSipMessageLoggingHandler;
AfterSocketCreationFuncPtr mSocketFunc;
Compression &mCompression;
unsigned mTransportFlags;

ProducerFifoBuffer是个模板类,它是模板类Fifo(@brief  A templated, threadsafe message-queue class.)的封装,其模板参数T传递给了其Fifo对象成员,还有一个数据成员是

typename Fifo<T>::Messages mBuffer;

在Fifo类中有语句typedef typename AbstractFifo<Msg*>::Messages Messages;

在AbstractFifo类中有语句typedef std::deque<T> Messages;

因此ProducerFifoBuffer<TransactionMessage> mStateMachineFifo; 中的mStateMachineFifo的存储结构为std::deque<TransactionMessage>。

Transport类有两个构造函数中下面这个被派生类InternalTransport的构造函数传递参数:

/* @param rxFifo the TransactionMessage Fifo that will receive any ConnectionTerminated or TransportFailure messages.
   @param interfaceObj a "presentation format" representation of the IP address of this transport
   @see Tuple::inet_ntop() for information about "presentation format"
   @param portNum is the port to receive and/or send on
   @param tlsDomain the domain name of the Transport
   @todo Note that because of InternalTransport's constructor, tlsDomain is always set to Data::Empty at construction time, in practice.
   @param socketFunc subclassers can call this function after the socket is created.  This is not currently used by Transport. */
Transport(Fifo<TransactionMessage>& rxFifo, 
          int portNum, 
          IpVersion version, 
          const Data& interfaceObj,
          const Data& tlsDomain = Data::Empty,
          AfterSocketCreationFuncPtr socketFunc = 0,
          Compression &compression = Compression::Disabled,
          unsigned transportFlags = 0,
          const Data& netNs = Data::Empty);

在看Transport类的成员函数之前,先来了解下SendData类,关于该类的重要部分抄摘如下:

Tuple destination;
Data data;
Data transactionId;
Data sigcompId;
bool isAlreadyCompressed;</pre>
<pre>enum SendDataCommand
{
    NoCommand,
    CloseConnection,
    EnableFlowTimer
};
SendDataCommand command;

SendData(const Tuple& dest, const Data& pdata, const Data& tid,
         const Data& scid, bool isCompressed = false) : destination(dest), 
                           data(pdata), transactionId(tid), sigcompId(scid), 
                           isAlreadyCompressed(isCompressed), command(NoCommand)
{ }

reSIProcate默认不使用OpenSigComp进行SIP信令压缩,若要使用OpenSigComp,需要在安装./configure的时候[option]选项里添上–with-sigcomp语句。回过头看Transport类是如何组装SendData对象:

std::auto_ptr<SendData> Transport::makeSendData(const Tuple& dest, const Data& d, const Data& tid, const Data &sigcompId = Data::Empty)
{
   resip_assert(dest.getPort() != -1);
   std::auto_ptr<SendData> data(new SendData(dest, d, tid, sigcompId));
   return data;
}

纯虚函数send()则留给派生类InternalTransport去实现:

void InternalTransport::send(std::auto_ptr<SendData> data)
{
   mTxFifo.add(data.release());
}

其中的mTxFifo在InternalTransport中的声明是Fifo<SendData> mTxFifo; 从这里看send()操作只是把组装好的SendData对象增添到了发送队列中。

纯虚函数process()是核心操作函数,声明如下:

/* If there is work to do, this is the method that does it. If the socket is readable, it is read.  If the socket is writable and there are outgoing messages to be sent, they are sent. Incoming messages are parsed and dispatched to the relevant entity.  SIP messages will be posted to the TransactionMessage Fifo.
   @param fdset is the FdSet after select() has been called. */
virtual void process(FdSet& fdset) = 0;

/* Version of process to be invoked periodically when using callback-based IO (via FdPollGrp). */
virtual void process() = 0;

其实现因UdpTransport和TcpBaseTransport而异,暂且搁在一边不看。

以下四个函数则都是往mStateMachineFifo中添加TransactionMessage,因为ConnectionTerminated、KeepAlivePong、TransportFailure和TcpConnectState都是TransactionMessage的派生类:

/* Posts a ConnectionTerminated message to TransactionMessage Fifo. */
void Transport::flowTerminated(const Tuple& flow)
{
   mStateMachineFifo.add(new ConnectionTerminated(flow));
}

void Transport::keepAlivePong(const Tuple& flow)
{
   mStateMachineFifo.add(new KeepAlivePong(flow));
}

/* Posts a TransportFailure to the TransactionMessage Fifo. */
void Transport::fail(const Data& tid, TransportFailure::FailureReason reason = TransportFailure::Failure, int subCode)
{
   if (!tid.empty())
   {
      mStateMachineFifo.add(new TransportFailure(tid, reason, subCode));
   }
}

/* Posts a TcpConnectState to the TransactionMessage Fifo. */
void Transport::setTcpConnectState(const Data& tid, TcpConnectState::State state)
{
    if (!tid.empty())
    {
        mStateMachineFifo.add(new TcpConnectState(tid, state));
    }
}

 

接下来看直接派生类InternalTransport,按照惯例,先看数据成员:

Socket mFd; // this is a unix file descriptor or a windows SOCKET

/* We use this to interrupt the select call when our tx fifo goes from empty to non-empty; if the fifo is empty when we build our fd set, we will add the read end of this pipe to the fd set, and when a message is added, we will write something to the write end. */
SelectInterruptor mSelectInterruptor;
FdPollItemHandle mInterruptorHandle;

Fifo<SendData> mTxFifo; // owned by the transport
ConsumerFifoBuffer<SendData> mTxFifoOutBuffer;
FdPollGrp *mPollGrp;      // not owned by transport, just used
FdPollItemHandle mPollItemHandle; // owned by the transport

ConsumerFifoBuffer和ProducerFifoBuffer的数据成员完全一致,成员函数中,前者是使用getNext()取出Fifo中的数据,后者是使用add()往Fifo中添加数据。构造函数的成员初始化列表如下:

mFd(INVALID_SOCKET),
mInterruptorHandle(0),
mTxFifoOutBuffer(mTxFifo),
mPollGrp(NULL),
mPollItemHandle(NULL)

静态成员函数socket负责生成返回文件描述符,bind()函数则对其进行绑定:

// shared by UDP, TCP, and TLS
Socket InternalTransport::socket(TransportType type, IpVersion ipVer)
{
   Socket fd;
   switch (type)
   {
      case UDP:
#ifdef USE_IPV6
         fd = ::socket(ipVer == V4 ? PF_INET : PF_INET6, SOCK_DGRAM, IPPROTO_UDP);
#else
         fd = ::socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
#endif
         break;
      case TCP:
      case TLS:
#ifdef USE_IPV6
         fd = ::socket(ipVer == V4 ? PF_INET : PF_INET6, SOCK_STREAM, 0);
#else
         fd = ::socket(PF_INET, SOCK_STREAM, 0);
#endif
         break;
      default:
         InfoLog (<< "Try to create an unsupported socket type: " << Tuple::toData(type));
         resip_assert(0);
         throw Transport::Exception("Unsupported transport", __FILE__,__LINE__);
   }
   if ( fd == INVALID_SOCKET )
   {
      int e = getErrno();
      ErrLog (<< "Failed to create socket: " << strerror(e));
      throw Transport::Exception("Can't create TcpBaseTransport", __FILE__,__LINE__);
   }
#ifdef USE_IPV6
#ifdef __linux__
   int on = 1;
   if (ipVer == V6)
   {
      if ( ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)) )
      {
          int e = getErrno();
          InfoLog (<< "Couldn't set sockoptions IPV6_V6ONLY: " << strerror(e));
          error(e);
          throw Exception("Failed setsockopt", __FILE__,__LINE__);
      }
   }
#endif
#endif
   DebugLog (<< "Creating fd=" << fd << (ipVer == V4 ? " V4/" : " V6/") << (type == UDP ? "UDP" : "TCP"));
   return fd;
}

void InternalTransport::bind()
{
#ifdef USE_NETNS
   DebugLog (<< "Binding to " << Tuple::inet_ntop(mTuple) << " in netns=\"" <<mTuple.getNetNs() << "\"");
#else
   DebugLog (<< "Binding to " << Tuple::inet_ntop(mTuple)); 
#endif
   if ( ::bind( mFd, &mTuple.getMutableSockaddr(), mTuple.length()) == SOCKET_ERROR )
   {
      int e = getErrno();
      if ( e == EADDRINUSE )
      {
         error(e);
         ErrLog (<< mTuple << " already in use ");
         throw Transport::Exception("port already in use", __FILE__,__LINE__);
      }
      else
      {
         error(e);
         ErrLog (<< "Could not bind to " << mTuple);
         throw Transport::Exception("Could not use port", __FILE__,__LINE__);
      }
   }
   // If we bound to port 0, then query OS for assigned port number
   if(mTuple.getPort() == 0)
   {
      socklen_t len = sizeof(mTuple.getMutableSockaddr());
      if(::getsockname(mFd, &mTuple.getMutableSockaddr(), &len) == SOCKET_ERROR)
      {
         int e = getErrno();
         ErrLog (<<"getsockname failed, error=" << e);
         throw Transport::Exception("Could not query port", __FILE__, __LINE__);
      }
   }
   bool ok = makeSocketNonBlocking(mFd);
   if ( !ok )
   {
      ErrLog (<< "Could not make socket non-blocking " << port());
      throw Transport::Exception("Failed making socket non-blocking", __FILE__, __LINE__);
   }
   if (mSocketFunc)
   {
      mSocketFunc(mFd, transport(), __FILE__, __LINE__);
   }
}

UdpTransport中含有osc::Stack *mSigcompStack; 若不使用OpenSigComp则此数据成员弃置不用,其他数据成员如下:

char* mRxBuffer;
MsgHeaderScanner mMsgHeaderScanner;
mutable resip::Mutex  myMutex;
Tuple mStunMappedAddress;
bool mStunSuccess;
ExternalUnknownDatagramHandler* mExternalUnknownDatagramHandler;
bool mInWritable;
bool mInActiveWrite;

MsgHeaderScanne类(@brief  This class scans a message header for its status line (the first non-empty line) and then any number of field name/value pairs, terminated by an empty line. The message header text may be divided into arbitrary chunks. A single instance may be used to scan any number of message headers.)用于分析接收到的SIP消息头部,mStunMappedAddress表示STUN协议所使用的地址。此外还有一些关于收发过程计数的值:

// statistics
unsigned mPollEventCnt;
unsigned mTxTryCnt;
unsigned mTxMsgCnt;
unsigned mTxFailCnt;
unsigned mRxTryCnt;
unsigned mRxMsgCnt;
unsigned mRxKeepaliveCnt;
unsigned mRxTransactionCnt;

它们在构造函数中都被初始化为0。相关的操作函数:

void processRxAll(); // 处理接收到的缓冲区mRxBuffer中所有可读数据
int processRxRecv(char*& buffer, Tuple& sender); // Receive from socket and store results into {buffer}. 其中调用了操作系统UDP协议的recvfrom()调用接口

/* Parse the contents of {buffer} and do something with it. Return true if {buffer} was consumed (absorbed into SipMessage to be free’d later). Note return code doesn’t indicate “success” in parsing the message; rather, it just indicates who owns buffer. */
bool processRxParse(char *buffer, int len, Tuple& sender);
void processTxAll();               // 依次处理mTxFifoOutBuffer中所有SendData
void processTxOne(SendData *data); // 处理mTxFifoOutBuffer中单个SendData,其中调用了操作系统UDP协议的sendto()调用接口
void updateEvents();

UDP接收缓冲区的最大容量被设置成8192字节:static const int MaxBufferSize = 8192;

TcpBaseTransport:

Tcp和WebSocket都是面向连接的协议,因此TCPBaseTransport类中含有:

ConnectionManager mConnectionManager;

两个一次性读写数据量的大小设置:

const size_t TcpBaseTransport::MaxWriteSize = 4096;
const size_t TcpBaseTransport::MaxReadSize = 4096;

构造函数完成的工作是mFd = InternalTransport::socket(TCP, version);

函数init()由TcpTransport的构造函数调用:

void TcpBaseTransport::init()
{
   if ( (mTransportFlags & RESIP_TRANSPORT_FLAG_NOBIND)!=0 )
   {
      return;
   }
   int on = 1;
#if !defined(WIN32)
   if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) )
#else
   if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)) )
#endif
   {
       int e = getErrno();
       InfoLog (<< "Couldn't set sockoptions SO_REUSEPORT | SO_REUSEADDR: " << strerror(e));
       error(e);
       throw Exception("Failed setsockopt", __FILE__, __LINE__);
   }
   bind();
   makeSocketNonBlocking(mFd);

   // do the listen, seting the maximum queue size for compeletly established sockets -- on 
   // linux, tcp_max_syn_backlog should be used for the incomplete queue size(see man listen)
   int e = listen(mFd, 64);
   if (e != 0 )
   {
      int e = getErrno();
      InfoLog (<< "Failed listen " << strerror(e));
      error(e);
      throw Transport::Exception("Address already in use", __FILE__, __LINE__);
   }
}

可见init()完成了socket options的SO_REUSEADDR设置以及TCP协议的bind()、listen()调用。

纯虚函数virtual Connection* createConnection(const Tuple& who, Socket fd, bool server=false) = 0; 交由派生类TcpTransport实现,此函数在processListen()中被调用,调用时形式为 createConnection(tuple, sock, true); 其中第二个参数sock由TCP协议的accept()调用返回:

Socket sock = accept( mFd, &peer, &peerLen);

/* Helper to make a new outgoing TCP connection. Makes the socket, connects it, etc. */
Connection* TcpBaseTransport::makeOutgoingConnection(const Tuple &dest, TransportFailure::FailureReason &failReason, int &failSubCode)
{
   // attempt to open
#ifdef USE_NETNS
      NetNs::setNs(netNs());
#endif
   Socket sock = InternalTransport::socket( TCP, ipVersion());
   if ( sock == INVALID_SOCKET ) // no socket found - try to free one up and try again
   {
      int err = getErrno();
      InfoLog (<< "Failed to create a socket " << strerror(err));
      error(err);
      if(mConnectionManager.gc(ConnectionManager::MinimumGcAge, 1) == 0)
      {
         mConnectionManager.gcWithTarget(1); // free one up
      }
#ifdef USE_NETNS
      NetNs::setNs(netNs());
#endif
      sock = InternalTransport::socket( TCP, ipVersion());
      if ( sock == INVALID_SOCKET )
      {
         err = getErrno();
         WarningLog( << "Error in finding free filedescriptor to use. " << strerror(err));
         error(err);
         failReason = TransportFailure::TransportNoSocket;
         failSubCode = err;
         return NULL;
      }
   }
   resip_assert(sock != INVALID_SOCKET);
   DebugLog (<<"Opening new connection to " << dest);
   char _sa[RESIP_MAX_SOCKADDR_SIZE];
   sockaddr *sa = reinterpret_cast<sockaddr*>(_sa);
   resip_assert(RESIP_MAX_SOCKADDR_SIZE >= mTuple.length());
   mTuple.copySockaddrAnyPort(sa);
#ifdef USE_NETNS
      NetNs::setNs(netNs());
#endif
   if(::bind(sock, sa, mTuple.length()) != 0)
   {
      WarningLog( << "Error in binding to source interface address. " << strerror(errno));
      failReason = TransportFailure::Failure;
      failSubCode = errno;
      return NULL;
   }
   if(!configureConnectedSocket(sock))
   {
      throw Exception("Failed to configure connected socket", __FILE__,__LINE__);
   }
   makeSocketNonBlocking(sock);
   if (mSocketFunc)
   {
      mSocketFunc(sock, transport(), __FILE__, __LINE__);
   }
   const sockaddr& servaddr = dest.getSockaddr();
   int ret = connect( sock, &servaddr, dest.length() );
   // See Chapter 15.3 of Stevens, Unix Network Programming Vol. 1 2nd Edition
   if (ret == SOCKET_ERROR)
   {
      int err = getErrno();
      switch (err)
      {
         case EINPROGRESS:
         case EAGAIN:
#if EAGAIN != EWOULDBLOCK
         case EWOULDBLOCK:
#endif
            break;
         default:
         {
            InfoLog( << "Error on TCP connect to " <<  dest << ", err=" << err << ": " << strerror(err));             error(err);             closeSocket(sock);             failReason = TransportFailure::TransportBadConnect;             failSubCode = err;             return NULL;          }       }    }    // This will add the connection to the manager    Connection *conn = createConnection(dest, sock, false);    resip_assert(conn);    conn->mFirstWriteAfterConnectedPending = true;
   return conn;
}

在makeOutgoingConnection()中,不仅有TCP协议的connect()调用,而且在调用createConnection()函数时第三个参数server传入的是false,表明该函数是客户端发起连接请求的函数。

makeOutgoingConnection()被processAllWriteRequests()函数调用:

void TcpBaseTransport::processAllWriteRequests()
{
   while (mTxFifoOutBuffer.messageAvailable())
   {
      SendData* data = mTxFifoOutBuffer.getNext();
      DebugLog (<< "Processing write for " << data->destination);
      // this will check by connectionId first, then by address
      Connection* conn = mConnectionManager.findConnection(data->destination);
#ifdef WIN32
      if(conn && mPollGrp && mPollGrp->getImplType() == FdPollGrp::PollImpl)
      {
         // Workaround for bug in WSAPoll implementation: see 
         // http://daniel.haxx.se/blog/2012/10/10/wsapoll-is-broken/
         // http://social.msdn.microsoft.com/Forums/windowsdesktop/en-US/18769abd-fca0-4d3c-9884-1a38ce27ae90/wsapoll-and-nonblocking-connects-to-nonexistent-ports?forum=wsk
         // Note:  This is not an ideal solution - since we won't cleanup the connection until 
         // after the connect has timedout and someone else tries to write to the same 
         // destination.  However the only impact to users is that requests will take the 
         // full 32 seconds transaction timeout to get an error vs the 21s connect timeout
         // observered when using the select implemention (vs Poll).  This does save us from
         // having to use some form of timer to periodically check the connect state though.
         if(conn->checkConnectionTimedout())
         {
            // If checkConnectionTimedout returns true, then connection is no longer available.
            // Clear conn so that we create a new connection below.
            conn = 0;
         }
      }
#endif
      // There is no connection yet, so make a client connection
      if (conn == 0 && 
          !data->destination.onlyUseExistingConnection &&
          data->command == 0)  // SendData commands (ie. close connection and enable flow timers) shouldn't cause new connections to form
      {
         TransportFailure::FailureReason failCode = TransportFailure::Failure;
         int subCode = 0;
         if((conn = makeOutgoingConnection(data->destination, failCode, subCode)) == 0)
         {
            DebugLog (<< "Failed to create connection: " << data->destination);
            fail(data->transactionId, failCode, subCode);
            delete data;
            // NOTE: We fail this one but don't give up on others in queue
            return;
         }
         resip_assert(conn->getSocket() != INVALID_SOCKET);
         data->destination.mFlowKey = conn->getSocket();
      }
      if (conn == 0)
      {
         DebugLog (<< "Failed to find connection: " << data->destination);
         fail(data->transactionId, TransportFailure::TransportNoExistConn, 0);
         delete data;
         // NOTE: We fail this one but don't give up on others in queue
      }
      else // have a connection
      {
         // Check if we have written anything or not on the connection.  If not, then this is either the first or 
         // a subsequent transaction trying to use this connection attempt - set TcpConnectState for this 
         // transaction to ConnectStarted
         if (conn->mFirstWriteAfterConnectedPending == true)
         {
             // Notify the transaction state that we have started a TCP connect, so that it can run a TCP connect timer
             setTcpConnectState(data->transactionId, TcpConnectState::ConnectStarted);
         }
         conn->requestWrite(data);
      }
   }
}

processAllWriteRequests()则由process()和process(FdSet& fdSet)函数调用:

void TcpBaseTransport::process()
{
   mStateMachineFifo.flush();
   // called within SipStack's thread. There is some risk of
   // recursion here if connection starts doing anything fancy.
   // For backward-compat when not-epoll, don't handle transmit synchronously
   // now, but rather wait for the process() call
   if (mPollGrp)
   {
      processAllWriteRequests();
   }
}

void TcpBaseTransport::process(FdSet& fdSet)
{
   resip_assert( mPollGrp==NULL );
   processAllWriteRequests();
   // process the connections in ConnectionManager
   mConnectionManager.process(fdSet);
   mStateMachineFifo.flush();
   // process our own listen/accept socket for incoming connections
   if (mFd!=INVALID_SOCKET && fdSet.readyToRead(mFd))
   {
      processListen();
   }
}

类TcpTransport对类TcpBaseTransport中的createConnection()成员函数进行了实现:

Connection* TcpTransport::createConnection(const Tuple& who, Socket fd, bool server = false)
{
   resip_assert(this);
   Connection* conn = new TcpConnection(this, who, fd, mCompression);
   return conn;
}

返回值是Connection类的派生类TcpConnection对象,下图反映了有关连接处理的类的继承关系:

TransportConnection Inheriting
从ConnectionBase类开始谈起,类注释:

/* @brief  Abstracts some of the connection functionality.
Managed connections (see ConnectionManager) derive from Connection.
Non-managed connections may be derived from ConnectionBase. */

所有数据成员:

enum ConnState
{
    NewMessage = 0,
    ReadingHeaders,
    PartialBody,
    SigComp, // This indicates that incoming bytes are compressed.
    WebSocket,
    MAX
};
ConnState mConnState;

typedef enum
{
    Unknown,
    Uncompressed,
    Compressed,
    WebSocketHandshake,
    WebSocketData,
} TransmissionFormat;
TransmissionFormat mSendingTransmissionFormat;
TransmissionFormat mReceivingTransmissionFormat;

Data::size_type mSendPos;
std::list<SendData*> mOutstandingSends;
Transport* mTransport;
Tuple mWho;
TransportFailure::FailureReason mFailureReason;      
int mFailureSubCode;
Compression &mCompression;
osc::Stack *mSigcompStack;
osc::TcpStream *mSigcompFramer;
SipMessage* mMessage;
char* mBuffer;
size_t mBufferPos;
size_t mBufferSize;
WsFrameExtractor mWsFrameExtractor;
static char connectionStates[MAX][32] = { "NewMessage", "ReadingHeaders", "PartialBody" };
UInt64 mLastUsed;
MsgHeaderScanner mMsgHeaderScanner;
static size_t messageSizeMax = RESIP_SIP_MSG_MAX_BYTES;

由于ConnectionBase.cxx中存在以下#include语句

#include “resip/stack/WsConnectionBase.hxx”

故将WsConnectionBase类的完整定义摘抄如下:

class WsConnectionBase
{
   public:
      WsConnectionBase();
      WsConnectionBase(SharedPtr<WsConnectionValidator> mWsConnectionValidator);
      virtual ~WsConnectionBase();

      void setCookies(CookieList& cookies) { mCookies = cookies; };
      const CookieList& getCookies() const { return mCookies; };
      SharedPtr<WsCookieContext> getWsCookieContext() const { return mWsCookieContext; }
      void setWsCookieContext(SharedPtr<WsCookieContext> wsCookieContext) { mWsCookieContext = wsCookieContext; }
      SharedPtr<WsConnectionValidator> connectionValidator() const;

   private:
      CookieList mCookies;
      SharedPtr<WsCookieContext> mWsCookieContext;
      SharedPtr<WsConnectionValidator> mWsConnectionValidator;
};

在Cookie.hxx中有语句typedef std::vector<Cookie> CookieList; 而Cookie类的数据成员就是两个Data类型:Data mName; 和Data mValue; 类WsCookieContext的数据成员如下:

Data mWsSessionInfo;
Data mWsSessionExtra;
Data mWsSessionMAC;
Uri mWsFromUri;
Uri mWsDestUri;
time_t mExpiresTime;

主要构造函数为WsCookieContext(const CookieList& cookieList, const Data& infoCookieName, const Data& extraCookieName, const Data& macCookieName, const Uri& requestUri);

类WsConnectionValidator是一个抽象类,没有数据成员,唯一的纯虚函数如下:

virtual bool validateConnection(const WsCookieContext& wsCookieContext) = 0;

至此,WsConnectionBase类的数据成员就分析完啦,其成员函数都是些对数据成员的get和set操作。

回到ConnectionBase.cxx,先来看看有关WebSocket操作的函数:

bool ConnectionBase::isUsingSecWebSocketKey()
{
   resip_assert(mMessage);
   return mMessage->exists(h_SecWebSocketKey);
}

bool ConnectionBase::isUsingDeprecatedSecWebSocketKeys()
{
   resip_assert(mMessage);
   return mMessage->exists(h_SecWebSocketKey1) && mMessage->exists(h_SecWebSocketKey2);
}

void ConnectionBase::wsParseCookies(CookieList& cookieList, const SipMessage* message)
{
   Data name;
   Data value;
   StringCategories::const_iterator it = message->header(h_Cookies).begin();
   for (; it != message->header(h_Cookies).end(); ++it)
   {
      ParseBuffer pb((*it).value());
      while(!pb.eof())
      {
         const char* anchor =  pb.skipWhitespace();

         pb.skipToChar(Symbols::EQUALS[0]);
         pb.data(name, anchor);

         anchor = pb.skipChar(Symbols::EQUALS[0]);
         if(*(pb.position()) == Symbols::DOUBLE_QUOTE[0])
         {
            anchor = pb.skipChar(Symbols::DOUBLE_QUOTE[0]);
            pb.skipToChar(Symbols::DOUBLE_QUOTE[0]);
            pb.data(value, anchor);
            pb.skipChar(Symbols::DOUBLE_QUOTE[0]);
         }
         else
         {
            pb.skipToOneOf(Symbols::SEMI_COLON, ParseBuffer::Whitespace);
            pb.data(value, anchor);
         }

         Cookie cookie(name, value);
         cookieList.push_back(cookie);
         DebugLog(<< "Cookie: " << cookie);

         if(!pb.eof() && *(pb.position()) == Symbols::SEMI_COLON[0])
         {
            pb.skipChar(Symbols::SEMI_COLON[0]);
         }

         pb.skipWhitespace();
      }
   }
}

std::auto_ptr<Data> ConnectionBase::makeWsHandshakeResponse()
{
   std::auto_ptr<Data> responsePtr(0);
   if(isUsingSecWebSocketKey())
   {
      responsePtr.reset(new Data("HTTP/1.1 101 WebSocket Protocol Handshake\r\n"
         "Upgrade: WebSocket\r\n"
         "Connection: Upgrade\r\n"
         "Sec-WebSocket-Protocol: sip\r\n"));

      // Assuming that OpenSSL implementation of SHA1 is more effient than our internal one
#ifdef USE_SSL
      SHA1Stream wsSha1Stream;
      wsSha1Stream << (mMessage->const_header(h_SecWebSocketKey).value() + Symbols::WebsocketMagicGUID);
      Data wsAcceptKey = wsSha1Stream.getBin(160).base64encode();
#else
      SHA1 sha1;
      sha1.update(mMessage->const_header(h_SecWebSocketKey).value().c_str());
      sha1.update(Symbols::WebsocketMagicGUID);
      Data wsAcceptKey = sha1.finalBin().base64encode();
#endif
      *responsePtr += "Sec-WebSocket-Accept: " + wsAcceptKey + "\r\n\r\n";
   }
   else if(isUsingDeprecatedSecWebSocketKeys())
   {
      ErrLog(<<"WS client wants to use depracated protocol version, unsupported");
   }
   else
   {
      ErrLog(<<"No SecWebSocketKey header");
   }
   return responsePtr;
}

/* Returns true if handshake complete, false if more bytes needed Sets dropConnection = true if an error occurs */
bool ConnectionBase::wsProcessHandshake(int bytesRead, bool &dropConnection)
{
   mConnState = WebSocket;
   dropConnection = false;

   if(mBufferPos + bytesRead > messageSizeMax)
   {
      WarningLog(<<"Too many bytes received during WS handshake, dropping connection.  Max message size = " << messageSizeMax);
      dropConnection = true;
      return false;
   }

   resip_assert(mTransport);
   mMessage = new SipMessage(&mTransport->getTuple());
   resip_assert(mMessage);

   mMessage->setSource(mWho);   
   mMessage->setTlsDomain(mTransport->tlsDomain());

   if (!scanMsgHeader(bytesRead)) 
   {
      return false;
   }

   try
   {
      WsConnectionBase* wsConnectionBase = dynamic_cast<WsConnectionBase*>(this);
      CookieList cookieList;
      if(wsConnectionBase)
      {
         SharedPtr<WsCookieContext> wsCookieContext((WsCookieContext*)0);
         if (mMessage->exists(h_Cookies))
         {
            WsBaseTransport* wst = dynamic_cast<WsBaseTransport*>(mTransport);
            resip_assert(wst);
            try
            {
               wsParseCookies(cookieList, mMessage);
               wsConnectionBase->setCookies(cookieList);
               // Use of resip WsCookieContext capabilities is not mandatory,
               // only try to use it if cookieContextFactory is available
               if(wst->cookieContextFactory().get())
               {
                  Uri& requestUri = mMessage->header(h_RequestLine).uri();
                  wsCookieContext = wst->cookieContextFactory()->makeCookieContext(cookieList, requestUri);
                  wsConnectionBase->setWsCookieContext(wsCookieContext);
               }
            }
            catch(ParseException& ex)
            {
               WarningLog(<<"Failed to parse cookies into WsCookieContext: " << ex);
            }
         }
         SharedPtr<WsConnectionValidator> wsConnectionValidator = wsConnectionBase->connectionValidator();
         if(wsConnectionValidator &&
            (!wsCookieContext.get() || !wsConnectionValidator->validateConnection(*wsCookieContext)))
         {
            ErrLog(<<"WebSocket cookie validation failed, dropping connection");
            // FIXME: should send back a HTTP error code:
            //   400 if the cookie was not in the right syntax
            //   403 if the cookie was well formed but rejected
            //       due to expiry or a bad HMAC
            delete mMessage;
            mMessage = 0;
            mBufferPos = 0;
            dropConnection = true;
            return false;
         }
      }

      std::auto_ptr<Data> wsResponsePtr = makeWsHandshakeResponse();

      if (wsResponsePtr.get())
      {
         DebugLog (<< "WebSocket upgrade accepted, cookie count = " << cookieList.size());

         mOutstandingSends.push_back(new SendData(
                  who(),
                  *wsResponsePtr.get(),
                  Data::Empty,
                  Data::Empty,
                  true));
      }
      else
      {
         ErrLog(<<"Failed to parse WebSocket initialization request");
         delete mMessage;
         mMessage = 0;
         mBufferPos = 0;
         dropConnection = true;
         return false;
      }
   }
   catch(resip::ParseException& e)
   {
      ErrLog(<<"Cannot auth request is missing " << e);
      delete mMessage;
      mMessage = 0;
      mBufferPos = 0;
      dropConnection = true;
      return false;
   }

   delete mMessage;
   mMessage=0;
   mBufferPos = 0;

   return true;
}

函数wsProcessHandshake调用了scanMsgHeader()函数,先摘抄如下:

bool ConnectionBase::scanMsgHeader(int bytesRead)
{
   mMsgHeaderScanner.prepareForMessage(mMessage);
   char *unprocessedCharPtr;
   MsgHeaderScanner::ScanChunkResult scanResult = mMsgHeaderScanner.scanChunk(mBuffer, mBufferPos + bytesRead, &unprocessedCharPtr);
   if (scanResult != MsgHeaderScanner::scrEnd)
   {
      if(scanResult != MsgHeaderScanner::scrNextChunk)
      {
         StackLog(<<"Failed to parse message, more bytes needed");
         StackLog(<< Data(mBuffer, bytesRead));
      }
      delete mMessage;
      mMessage=0;
      mBufferPos += bytesRead;
      return false;
   }
   return true;
}

暂且不深入分析MsgHeaderScanner类如何解析SIP Header,只需知道该类的行为已经被包装在了scanMsgHeader()函数内部了,只需调用该函数即可。

未完待续..

Leave a comment

邮箱地址不会被公开。 必填项已用*标注