I wrote a very simple chat server using boost::asio. Everything works as it should. As I just redid the TCPSession class to use "strand", there was an error in the functionality of the class: The server accepts a connection and immediately drops it, because I get the error "Bad file descriptor" in the function "async_read". Can't understand why.
The problem is in class TCPSession as class TCPServer and function main remain unchanged for both cases.
Can anybody help me to resolve this problem?
Thank you in advance for any help or indication of where I made an error.
What I have tried:
#include <boost/asio.hpp>
#include <queue>
#include <unordered_set>
#include <iostream>
namespace io = boost::asio;
using tcp = io::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::placeholders;
using message_handler = std::function<void (std::string)>;
using error_handler = std::function<void ()>;
class TCPSession : public std::enable_shared_from_this<TCPSession>
{
public:
TCPSession(tcp::socket&& socket)
: socket_(std::move(socket))
{
}
void start(message_handler&& on_message, error_handler&& on_error)
{
this->on_message_ = std::move(on_message);
this->on_error_ = std::move(on_error);
async_read();
}
void post(std::string const& message)
{
bool idle = outgoing_.empty();
outgoing_.push(message);
if(idle)
{
async_write();
}
}
private:
void async_read()
{
io::async_read_until(socket_, streambuf_, "\n", std::bind(&TCPSession::on_read, shared_from_this(), _1, _2));
}
void on_read(error_code error, std::size_t bytes_transferred)
{
if(!error)
{
std::stringstream message;
message << socket_.remote_endpoint(error) << ": " << std::istream(&streambuf_).rdbuf();
std::string strr = message.str();
std::cout << "Received message from client: " << strr << std::endl;
streambuf_.consume(bytes_transferred);
on_message_(message.str());
async_read();
}
else
{
socket_.close(error);
on_error_();
}
}
void async_write()
{
io::async_write(socket_, io::buffer(outgoing_.front()), std::bind(&TCPSession::on_write, shared_from_this(), _1, _2));
}
void on_write(error_code error, std::size_t bytes_transferred)
{
if(!error)
{
outgoing_.pop();
if(!outgoing_.empty())
{
async_write();
}
}
else
{
socket_.close(error);
on_error_();
}
}
tcp::socket socket_;
io::streambuf streambuf_;
std::queue<std::string> outgoing_;
message_handler on_message_;
error_handler on_error_;
};
class TCPSession : public std::enable_shared_from_this<TCPSession>
{
public:
TCPSession(io::io_context& io_context)
: socket_(io_context)
, read (io_context)
, write (io_context)
{
}
void start(message_handler&& on_message, error_handler&& on_error)
{
this->on_message_ = std::move(on_message);
this->on_error_ = std::move(on_error);
async_read();
}
void post(std::string const& message)
{
bool idle = outgoing_.empty();
outgoing_.push(message);
if(idle)
{
async_write();
}
}
private:
void async_read()
{
io::async_read(socket_, streambuf_, io::bind_executor(read,
[&](error_code error, std::size_t bytes_transferred)
{
if (!error)
{
std::istream is(&streambuf_);
std::string message(std::istreambuf_iterator<char>(is), {});
std::cout << "Received message from client: " << message << std::endl;
on_message_(message);
streambuf_.consume(bytes_transferred);
async_read();
}
else
{
socket_.close(error);
on_error_();
}
}));
}
void async_write()
{
auto self = shared_from_this();
io::async_write(socket_, io::buffer(outgoing_.front()), io::bind_executor(write,
[self, this](error_code error, std::size_t )
{
if(!error)
{
outgoing_.pop();
if(!outgoing_.empty())
{
async_write();
}
}
else
{
socket_.close(error);
on_error_();
}
}));
}
tcp::socket socket_;
io::streambuf streambuf_;
std::queue<std::string> outgoing_;
message_handler on_message_;
error_handler on_error_;
io::io_context::strand read;
io::io_context::strand write;
};
class TCPServer
{
public:
TCPServer(io::io_context& io_context, std::uint16_t port)
: io_context_(io_context)
, acceptor_ (io_context, tcp::endpoint(tcp::v4(), port))
{
}
void async_accept()
{
socket_.emplace(io_context_);
acceptor_.async_accept(*socket_, [&] (error_code error)
{
auto client = std::make_shared<TCPSession>(io_context_);
clients_.insert(client);
std::cout << "One client came in\n";
client->start( std::bind(&TCPServer::post, this, _1),
[&, weak = std::weak_ptr(client)]
{
if(auto shared = weak.lock(); shared && clients_.erase(shared))
{
std::cout << "One client went out\n";
}
}
);
async_accept();
});
}
void post(std::string const& message)
{
for(auto& client : clients_)
{
client->post(message);
}
}
private:
io::io_context& io_context_;
tcp::acceptor acceptor_;
std::optional<tcp::socket> socket_;
std::unordered_set<std::shared_ptr<TCPSession>> clients_;
};
int main()
{
io::io_context io_context;
TCPServer srv(io_context, 15001);
srv.async_accept();
io_context.run();
return 0;
}