0
0
mirror of https://github.com/OpenVPN/openvpn3.git synced 2024-09-20 12:12:15 +02:00

OMICore: support asynchronous command response

To support asynchronous command response, the virtual
method omi_command_in() should now return a boolean.

When omi_command_in() returns false, synchronous mode
is chosen (the previous default).  This means that
omi_command_in() must emit "SUCCESS: ..." or
"ERROR: ..." before it returns.

When omi_command_in() returns true, the new asynchronous
mode is chosen, and omi_command_in() may return before
emitting SUCCESS or ERROR.  In this mode, OMICore will
pause the incoming command pipeline and not make any
further calls to omi_command_in() until
OMICore::async_done() is called.

Signed-off-by: James Yonan <james@openvpn.net>
This commit is contained in:
James Yonan 2018-07-07 12:27:24 -06:00 committed by Lev Stipakov
parent fd1ed92b68
commit ac158fd0bf
No known key found for this signature in database
GPG Key ID: 88670BE258B9C258

View File

@ -28,6 +28,7 @@
#include <openvpn/buffer/bufstr.hpp>
#include <openvpn/time/timestr.hpp>
#include <openvpn/time/asiotimer.hpp>
#include <openvpn/asio/asiowork.hpp>
// include acceptors for different protocols
#include <openvpn/acceptor/base.hpp>
@ -72,6 +73,8 @@ namespace openvpn {
return;
stop_called = true;
asio_work.reset();
// close acceptor
if (acceptor)
acceptor->close();
@ -298,6 +301,9 @@ namespace openvpn {
listen_tcp(addr, port);
}
}
// don't exit Asio event loop until AsioWork object is deleted
asio_work.reset(new AsioWork(io_context));
}
void start_connection_if_not_hold()
@ -331,6 +337,11 @@ namespace openvpn {
return true;
}
void async_done()
{
process_recv();
}
void log_full(const std::string& text) // logs to OMI buffer and log file
{
const time_t now = ::time(NULL);
@ -384,7 +395,7 @@ namespace openvpn {
}
virtual bool omi_command_is_multiline(const std::string& arg0, const Option& option) = 0;
virtual void omi_command_in(const std::string& arg0, const Command& cmd) = 0;
virtual bool omi_command_in(const std::string& arg0, const Command& cmd) = 0;
virtual void omi_start_connection() = 0;
virtual void omi_done(const bool eof) = 0;
virtual void omi_sigterm() = 0;
@ -441,12 +452,12 @@ namespace openvpn {
volatile bool ready_ = true;
};
void command_in(std::unique_ptr<Command> cmd)
bool command_in(std::unique_ptr<Command> cmd)
{
try {
const std::string arg0 = cmd->option.get_optional(0, 64);
if (arg0.empty())
return;
return false;
if (!cmd->valid_utf8)
throw Exception("invalid UTF8");
switch (arg0[0])
@ -456,7 +467,7 @@ namespace openvpn {
if (arg0 == "bytecount")
{
process_bytecount_cmd(cmd->option);
return;
return false;
}
break;
}
@ -465,12 +476,12 @@ namespace openvpn {
if (hist_echo.is_cmd(cmd->option))
{
send(hist_echo.process_cmd(cmd->option));
return;
return false;
}
if (arg0 == "exit")
{
conditional_stop(true);
return;
return false;
}
break;
}
@ -482,7 +493,7 @@ namespace openvpn {
send(hold_cmd(cmd->option, release));
if (release)
hold_release();
return;
return false;
}
break;
}
@ -491,7 +502,7 @@ namespace openvpn {
if (hist_log.is_cmd(cmd->option))
{
send(hist_log.process_cmd(cmd->option));
return;
return false;
}
break;
}
@ -500,7 +511,7 @@ namespace openvpn {
if (arg0 == "quit")
{
conditional_stop(true);
return;
return false;
}
break;
}
@ -509,17 +520,17 @@ namespace openvpn {
if (hist_state.is_cmd(cmd->option))
{
send(hist_state.process_cmd(cmd->option));
return;
return false;
}
if (arg0 == "signal")
{
process_signal_cmd(cmd->option);
return;
return false;
}
break;
}
}
omi_command_in(arg0, *cmd);
return omi_command_in(arg0, *cmd);
}
catch (const std::exception& e)
{
@ -528,6 +539,7 @@ namespace openvpn {
err_ref = cmd->option.err_ref();
send("ERROR: error processing " + err_ref + " : " + e.what() + "\r\n");
}
return false;
}
bool is_hold_cmd(const Option& o) const
@ -659,8 +671,9 @@ namespace openvpn {
send(">INFO:OpenVPN Management Interface Version 1 -- type 'help' for more info\r\n");
}
void process_in_line() // process incoming line in in_partial
bool process_in_line() // process incoming line in in_partial
{
bool ret = false;
const bool utf8 = Unicode::is_valid_utf8(in_partial);
string::trim_crlf(in_partial);
if (multiline)
@ -669,7 +682,7 @@ namespace openvpn {
throw omi_error("process_in_line: internal error");
if (in_partial == "END")
{
command_in(std::move(command));
ret = command_in(std::move(command));
command.reset();
multiline = false;
}
@ -688,10 +701,11 @@ namespace openvpn {
multiline = command_is_multiline(command->option);
if (!multiline)
{
command_in(std::move(command));
ret = command_in(std::move(command));
command.reset();
}
}
return ret;
}
static std::string read_config(const std::string& fn)
@ -846,19 +860,21 @@ namespace openvpn {
void queue_recv()
{
if (!is_sock_open())
if (!is_sock_open() || recv_queued)
return;
BufferPtr buf(new BufferAllocated(256, 0));
socket->async_receive(buf->mutable_buffer_clamp(),
[self=Ptr(this), sock=socket, buf](const openvpn_io::error_code& error, const size_t bytes_recvd)
{
self->handle_recv(error, bytes_recvd, *buf, sock.get());
self->handle_recv(error, bytes_recvd, std::move(buf), sock.get());
});
recv_queued = true;
}
void handle_recv(const openvpn_io::error_code& error, const size_t bytes_recvd,
Buffer& buf, const AsioPolySock::Base* queued_socket)
BufferPtr buf, const AsioPolySock::Base* queued_socket)
{
recv_queued = false;
if (!is_sock_open() || socket.get() != queued_socket)
return;
if (error)
@ -869,22 +885,30 @@ namespace openvpn {
conditional_stop(eof);
return;
}
buf.set_size(bytes_recvd);
buf->set_size(bytes_recvd);
in_buf = std::move(buf);
process_recv();
}
while (buf.size())
void process_recv()
{
while (in_buf->size())
{
const char c = (char)buf.pop_front();
const char c = (char)in_buf->pop_front();
in_partial += c;
if (c == '\n')
{
bool defer = false;
try {
process_in_line();
defer = process_in_line();
}
catch (const std::exception& e)
{
send("ERROR: in OMI command: " + std::string(e.what()) + "\r\n");
}
in_partial.clear();
if (defer)
return;
}
}
@ -941,12 +965,15 @@ namespace openvpn {
// I/O
Acceptor::Base::Ptr acceptor;
AsioPolySock::Base::Ptr socket;
std::unique_ptr<AsioWork> asio_work;
std::deque<BufferPtr> content_out;
std::string in_partial;
std::unique_ptr<Command> command;
BufferPtr in_buf;
bool management_client_root = false;
bool multiline = false;
bool errors_to_stderr = false;
bool recv_queued = false;
// stopping
volatile bool stop_called = false;