#include "LibLsp/JsonRpc/MessageIssue.h" #include "LibLsp/JsonRpc/WebSocketServer.h" #include #include #include #include "LibLsp/JsonRpc/stream.h" #include #include #include // namespace beast = boost::beast; // from // namespace http = beast::http; // from // namespace websocket = beast::websocket; // from namespace net = asio; // from using tcp = asio::ip::tcp; // from namespace lsp { //------------------------------------------------------------------------------ struct WebSocketServer::Data { ix::WebSocketServer server; std::shared_ptr handler; std::shared_ptr endpoint; RemoteEndPoint point; lsp::Log& log; Data(const std::string& ua, const std::string& addr, int port, std::shared_ptr h, std::shared_ptr ep, lsp::Log& lg, uint32_t max_workers) : server(port) , handler(std::move(h)) , endpoint(std::move(ep)) , point(handler, endpoint, lg, lsp::JSONStreamStyle::Standard, static_cast(max_workers)) , log(lg) { server.setOnConnectionCallback( [this, ua](std::weak_ptr wp, std::shared_ptr) { // wrap and start processing auto ws = wp.lock(); if(!ws) return; auto wrapper = std::make_shared(ws); point.startProcessingMessages(wrapper, wrapper); } ); } }; websocket_stream_wrapper::websocket_stream_wrapper(std::shared_ptr ws) : ws_(std::move(ws)), request_waiter(new MultiQueueWaiter()), on_request(request_waiter) { // incoming messages → queue ws_->setOnMessageCallback( [this](const ix::WebSocketMessagePtr& msg) { if (msg->type == ix::WebSocketMessageType::Message) { const auto& s = msg->str; on_request.EnqueueAll(std::vector(s.begin(), s.end()), false); } else if (msg->type == ix::WebSocketMessageType::Error) { error_message = msg->str; } } ); } bool websocket_stream_wrapper::fail() { return bad(); } bool websocket_stream_wrapper::eof() { return bad(); } bool websocket_stream_wrapper::good() { return !bad(); } websocket_stream_wrapper& websocket_stream_wrapper::read(char* str, std::streamsize count) { auto some = on_request.TryDequeueSome(static_cast(count)); memcpy(str, some.data(), some.size()); for (std::streamsize i = some.size(); i < count; ++i) { str[i] = static_cast(get()); } return *this; } int websocket_stream_wrapper::get() { return on_request.Dequeue(); } bool websocket_stream_wrapper::bad() { return ws_->getReadyState() != ix::ReadyState::Open; } websocket_stream_wrapper& websocket_stream_wrapper::write(std::string const& c) { ws_->send(c); return *this; } websocket_stream_wrapper& websocket_stream_wrapper::write(std::streamsize _s) { std::ostringstream temp; temp << _s; ws_->send(temp.str()); return *this; } websocket_stream_wrapper& websocket_stream_wrapper::flush() { return *this; } void websocket_stream_wrapper::clear() { } std::string websocket_stream_wrapper::what() { if (!error_message.empty()) { return error_message; } if (ws_->getReadyState() != ix::ReadyState::Open) { return "Socket is not open."; } return {}; } WebSocketServer::~WebSocketServer() { delete d_ptr; } WebSocketServer::WebSocketServer( std::string const& user_agent, std::string const& address, std::string const& port, std::shared_ptr json_handler, std::shared_ptr localEndPoint, lsp::Log& log, uint32_t _max_workers ) : point(json_handler, localEndPoint, log, lsp::JSONStreamStyle::Standard, static_cast(_max_workers)), d_ptr(new Data(user_agent, address, std::stoi(port), json_handler, localEndPoint, log, _max_workers)) { } void WebSocketServer::run() { ix::initNetSystem(); d_ptr->server.listen(); d_ptr->server.start(); } void WebSocketServer::stop() { d_ptr->server.stop(); } } // namespace lsp