用流来封装SOCKET通信的一种方法

类别:编程语言 点击:0 评论:0 推荐:
1.1 需求来源 目前XXXXX软件的维护控制工具是命令行输入的,后来因为用户需求变化,还要支持提供一个远程TELNET端口,从而实现远程多用户同时维护功能。因为目前就是通过流来读取命令行上的输入,如果能够将TELNET端口的输入也通过流来读取,目前的实现基本上不做修改。另外,如果真的实现这个功能,原来实现的,将来要实现的所有工具都可以同等对待文件输入,标准输入,网络数据;可以用同样的方式输出到文件,标准输出,网络上去,应用将非常广泛。 1.2 C++流 C++流的输入输出总是要通过一个缓冲区来实现的。见下图 流与缓冲区之间的关系是通过basic_ios来维护的: template class basic_ios: public ios_base{ public: 。。。 basic_streambuf* rdbuf() const; //获取缓冲区 basic_streambuf* rdbuf(basic_streambuf* b); //设置新缓冲区,返回原来的缓冲区 }; 各种不同类型的缓冲区都是从basic_streambuf继承来的,basic_streambuf在出现不同缓冲区策略的地方提供了一些虚函数,例如处理缓冲区上溢和下溢的函数,清除缓冲区的函数。如果我们要提供新的缓冲策略和新的输入目标或输出目标,一个正确的起点就是从basic_streambuf继承实现自己的新类,重载必要的虚函数。然后这个新的streambuf就可以和任何一种已有的流,比如stringstream,strstream等一起使用了。 最简单的情况下,如果我们要求I/O操作是非缓冲的,只需要重载basic_streambuf的上溢和下溢的虚函数,如果我们有自己的缓冲区,一般还要重载一个清除缓冲区的虚函数,它在流关闭的时候或显示调用flush操作的时候将缓冲区的内容发送实际目的地并清空它。 1.3 我们的封装方式 我们需要的缓冲区应该从SOCKET上读取数据和将输入通过SOCKET发送出去。为了我们实现缓冲区的通用性,不能将我们实现的缓冲区和SOCKET的接口调用绑定在一起分开。稍微抽象一下,我们需要的缓冲区依赖一个这样的接口: struct readwriter{ int read(char* s, int len); //完成从实际目的地读取数据的功能,传入保存数据的缓冲区,缓冲区长度为len,返回实际读取的数据长度 int write(const char* s, int len); //完成发送数据到实际目的地的功能,需要发送的数据在长度为len的缓冲区中,返回发送成功的数据的长度。 } 想实现SOCKET的读写操作,做一个简单的适配处理就可以完成,这个适配处理的过程就不在本文提及了。我们的缓冲区依赖上面这个接口,当然因为标准库中的streambuf都是利用模板来实现的,因此上面的接口就作为一个模板参数输入。 另外,写入我们streambuf的数据不应该从streambuf中读出来,这一点是和标准库中的其他streambuf输入输出共用一个字符缓冲区不一样。这就意味着我们需要维护读写两个缓冲区,读缓冲区保存从readwriter获取的数据,写缓冲区保存写往readwriter的数据。这样前面说的缓冲区的上溢应该是指写缓冲区满的情况;缓冲区的下溢应该是指读缓冲区数据为空的情况;清空缓冲区(sync)的操作应该是针对写缓冲区而言。 根据前面的分析,我们可以得出下面的定义: template class streambufadaptor: public basic_stringbuf { RW& _rw; Public: typedef charT char_type; typedef traits::int_type int_type; typedef traits::pos_type pos_type; typedef traits::off_type off_type; typedef traits traits_type; streambufadaptor(RW& rw): _rw(rw){} protected: virtual int sync() { 计算缓冲区中的数据量,将这些数据写入 _rw 中 重新设置写指针到缓冲区的首部 } virtual int_type overflow(charT c) { 首先调用sync虚函数,将缓冲区中的内容清空,并重新设置写指针 保存输入的c到缓冲区 } virtual int_type underflow() { 根据读缓冲区的大小调用 _rw.read 函数读取数据 } }; 完整的实现参见附件中的定义 1.4 streambufadaptor的使用方式 前面提到过,streambuf和stream是在basic_ios中连接在一起的。因为很多流,比如strstream,stringstream等的实现都隐藏了basic_ios::rdbuf(streambuf*)的实现,我们必须将一个实际的流上溯为一个basic_ios进行替换操作。见下面的例子: stringstream strout; RW rw;//RW是自己实现的一个读写类 streambufadaptor mybuf(rw); iostream& streamref = strout; streamref.rdbuf(&mybuf); strout s; …. 上面的使用方式比较繁琐,希望有个简单易用的方式来替换它。借鉴 scopeguard 的实现和使用方式,有下面的方案: 1. 定义一个streamadaptor类,在其构造函数中完成rdbuf的调用,析构函数中恢复 2. 因为我们不直接操作streamadaptor类,利用scopeguard定义宏的技巧。 3. 因为使用人员只应该看到两个界面,一个是输入输出界面的stream,一个是底层数据的实际读写类,至于中间的streambufadaptor可以被隐藏起来。 具体的实现细节参见附件。 最后我们的调用方式如下: stringstream strout; RW rw;//RW是自己实现的一个读写类 //streambufadaptor mybuf(rw); //iostream& streamref = strout; //streamref.rdbuf(&mybuf); STREAM_ADPATOR(strout,rw); //这是我们实现流和SOCKET连接的全部语句 strout s; …. 1.5 附件(完整的实现),供参考 /************************** *** *** author: htj *** *************************/ #include namespace std{ /** Rw have the following interface: interface Rw { int write(const charT* s, streamsize size); int read(charT* s, streamsize size); }; **/ template class streambufadaptor: public basic_streambuf { Rw& _rw; charT *_out; streamsize _out_len; charT *_in; streamsize _in_len; streambufadaptor(const streambufadaptor&); streambufadaptor& operator=(const streambufadaptor&); public: typedef charT char_type; typedef traits::int_type int_type; typedef traits::pos_type pos_type; typedef traits::off_type off_type; typedef traits traits_type; streambufadaptor(Rw& rw,ios_base::openmode mode = ios_base::in | ios_base::out | ios_base::binary): _rw(rw),_out(0),_out_len(0),_in(0),_in_len(0){ setg(_in,_in,_in); setp(_out,_out + _out_len); basic_streambuf::mode_ = mode; } ~streambufadaptor() { if(_out) free_out_buf(); if(_in) free_in_buf(); } private: void init_out_buf() { _out_len = 512; _out = new charT[_out_len]; setp(_out,_out + _out_len); } void free_out_buf() { delete [] _out; _out = 0; _out_len = 0; setp(0,0); } void init_in_buf() { _in_len = 512; _in = new charT[_in_len]; setg(_in,_in,_in); } void free_in_buf() { delete [] _in; _in = 0; _in_len = 0; setg(_in,_in,_in); } protected: virtual int_type overflow(int_type c) { if((basic_streambuf::mode_ & ios_base::out) == 0) { return traits::eof(); } sync(); if(traits::eq_int_type(c,traits::eof()) ) return traits::not_eof(c); else { if(pptr() == 0) init_out_buf(); return sputc(c); } } virtual int_type underflow() { if((basic_streambuf::mode_ & ios_base::in) == 0) { return traits::eof(); } if(egptr() == 0) init_in_buf(); if(egptr() == _in + _in_len) setg(_in,_in,_in); int tmp = _rw.read(egptr(),_in + _in_len - egptr()); if(tmp < 1) return traits::eof(); setg(eback(),egptr(),egptr() + tmp); return traits::to_int_type(*gptr()); } virtual int sync() { if(pptr() > pbase()){ int tmp = pptr() - pbase(); int tmp1 = _rw.write(pbase(),tmp); if(tmp1 < 1) return -1; if(tmp1 < tmp) { memcpy(pbase(),pbase() + tmp1, tmp - tmp1); setp(pbase(),epptr()); pbump(tmp1); }else setp(pbase(),epptr()); } return 0; } }; class streamadaptor_base{}; typedef const streamadaptor_base& StreamAdaptor; template class streamadaptor: public streamadaptor_base { public: typedef typename stream::char_type char_type; typedef typename stream::traits_type traits_type; private: basic_ios& _stream ; streambufadaptor _buff; basic_streambuf* _oldbuf; public: streamadaptor(stream& s,Rw& rw):_stream(s),_buff(rw){ _oldbuf = _stream.rdbuf(&_buff); } ~streamadaptor(){ _stream.rdbuf(_oldbuf); } operator stream&(){ return _stream; } stream& get() { return _stream; } }; template streamadaptor MakeStreamAdaptor(Stream& s,Rw& rw) { return streamadaptor(s,rw); } #define CONCATENATE_DIRECT(s1, s2) s1##s2 #define CONCATENATE(s1, s2) CONCATENATE_DIRECT(s1, s2) #define ANONYMOUS_VARIABLE(str) CONCATENATE(str, __LINE__) #define STREAM_ADAPTOR StreamAdaptor ANONYMOUS_VARIABLE(streamAdaptor) = MakeStreamAdaptor }

本文地址:http://com.8s8s.com/it/it23183.htm