1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| #include "msgbuffer.h" #include <boost/asio.hpp> #include <boost/system/error_code.hpp> #include <cstddef> #include <memory> #include <queue> #include <iostream>
using namespace boost::asio; using namespace boost::asio::ip; using namespace std;
class Session : public std::enable_shared_from_this<Session>{ public: Session(tcp::socket socket) : socket_(std::move(socket)), closed_(false), isWriteAsync_(false) {
} ~Session(){ do_close(); } void Start(){ std::cout << "accept a connection: " << socket_.remote_endpoint().address().to_string() <<std::endl; do_read(); } void Send(const uint8_t* msg, std::size_t len){ MessageBuffer buffer(len + 4); buffer.Write("S:", 2); buffer.Write(msg, len); buffer.Write("\r\n", 2); writeQueue_.push(std::move(buffer)); do_write(); } private: void do_close(){ if(!closed_){ closed_ = true; std::cout << "closed a connections: " << socket_.remote_endpoint().address().to_string() <<std::endl; boost::system::error_code err; socket_.close(err); } } void do_read(){ readBuffer_.Normalize(); readBuffer_.EnsureFreeSpace(); auto self(shared_from_this()); socket_.async_read_some(buffer(readBuffer_.GetWritePointer(), readBuffer_.GetRemainingSpace()),[this, self](boost::system::error_code err, std::size_t length){ if(err){ do_close(); return; } readBuffer_.WriteCompleted(length); std::cout << "recv length: " << length << std::endl; uint8_t* base = readBuffer_.GetReadPointer(); readBuffer_.ReadCompleted(length); Send(base, length); do_read(); }); } void do_write(){ if(isWriteAsync_) return; isWriteAsync_ = true; MessageBuffer &buf = writeQueue_.front(); auto self(shared_from_this()); socket_.async_write_some(buffer(buf.GetReadPointer(), buf.GetActiveSize()), [this, self](boost::system::error_code err, std::size_t length){ if(err){ do_close(); return; } isWriteAsync_ = false; writeQueue_.front().ReadCompleted(length); if(!writeQueue_.front().GetActiveSize()) writeQueue_.pop(); if(!writeQueue_.empty()) do_write(); }); } tcp::socket socket_; MessageBuffer readBuffer_; std::queue<MessageBuffer> writeQueue_; bool closed_; bool isWriteAsync_; };
class Server{ public: Server(io_context &io_ctx, short port) : acceptor_(io_ctx, tcp::endpoint(tcp::v4(), port)) { do_accept(); } private: void do_accept(){ acceptor_.async_accept([this](boost::system::error_code err, tcp::socket socket){ if(!err){ std::make_shared<Session>(std::move(socket))->Start(); } do_accept(); }); } private: tcp::acceptor acceptor_; };
int main() { io_context io_ctx; Server s(io_ctx, 8989); io_ctx.run(); return 0; }
|