0
0
mirror of https://github.com/OpenVPN/openvpn3.git synced 2024-09-20 12:12:15 +02:00

runcontext.hpp changes:

* added ServerThreadWeakBase, an alternative version of
  ServerThreadBase that supports weak pointers,

* added set_exit_socket() method for triggering a mutual
  exit between two partner processes if either process
  closes their end of the socket,

* added a prefix string to distinguish between multiple
  RunContext objects, and

* refactored cancel() method to better leverage on asio::post.
This commit is contained in:
James Yonan 2015-07-04 16:36:15 -06:00
parent d014f2ebb6
commit 3a0bd15039

View File

@ -34,6 +34,7 @@
#include <type_traits> // for std::is_nothrow_move_constructible
#include <thread>
#include <mutex>
#include <memory>
#include <openvpn/common/exception.hpp>
#include <openvpn/common/size.hpp>
@ -43,6 +44,10 @@
#include <openvpn/time/asiotimer.hpp>
#include <openvpn/time/timestr.hpp>
#ifdef ASIO_HAS_LOCAL_SOCKETS
#include <openvpn/common/scoped_fd.hpp>
#endif
namespace openvpn {
struct ServerThreadBase : public RC<thread_safe_refcount>
@ -52,6 +57,14 @@ namespace openvpn {
virtual void thread_safe_stop() = 0;
};
struct ServerThreadWeakBase : public RCWeak<thread_safe_refcount>
{
typedef RCPtr<ServerThreadWeakBase> Ptr;
typedef RCWeakPtr<ServerThreadWeakBase> WPtr;
virtual void thread_safe_stop() = 0;
};
template <typename ServerThread, typename Stats>
class RunContext : public LogBase
{
@ -126,6 +139,23 @@ namespace openvpn {
servlist[unit] = nullptr;
}
#ifdef ASIO_HAS_LOCAL_SOCKETS
void set_exit_socket(ScopedFD& fd)
{
exit_sock.reset(new asio::posix::stream_descriptor(io_context, fd.release()));
exit_sock->async_read_some(asio::null_buffers(),
[self=Ptr(this)](const asio::error_code& error, const size_t bytes_recvd)
{
self->cancel();
});
}
#endif
void set_prefix(const std::string& pre)
{
prefix = pre + ": ";
}
void run()
{
if (!halt)
@ -174,50 +204,53 @@ namespace openvpn {
// called from main or worker thread
void remove_thread()
{
std::lock_guard<std::mutex> lock(mutex);
if (--thread_count <= 0)
do_cancel();
}
void cancel()
{
std::lock_guard<std::mutex> lock(mutex);
do_cancel();
bool last = false;
{
std::lock_guard<std::mutex> lock(mutex);
last = (--thread_count <= 0);
}
if (last)
cancel();
}
// called from main or worker thread
void do_cancel()
void cancel()
{
if (!halt)
{
halt = true;
if (halt)
return;
asio::post(io_context, [self=Ptr(this)]()
{
std::lock_guard<std::mutex> lock(self->mutex);
if (self->halt)
return;
self->halt = true;
exit_timer.cancel();
self->exit_timer.cancel();
self->exit_sock.reset();
if (self->signals)
self->signals->cancel();
if (signals)
asio::post(io_context, [sig=signals]()
{
sig->cancel();
});
unsigned int stopped = 0;
for (size_t i = 0; i < servlist.size(); ++i)
{
ServerThread* serv = servlist[i];
if (serv)
{
serv->thread_safe_stop();
++stopped;
}
servlist[i] = nullptr;
}
OPENVPN_LOG("Stopping " << stopped << '/' << servlist.size() << " thread(s)");
}
// stop threads
{
unsigned int stopped = 0;
for (size_t i = 0; i < self->servlist.size(); ++i)
{
ServerThread* serv = self->servlist[i];
if (serv)
{
serv->thread_safe_stop();
++stopped;
}
self->servlist[i] = nullptr;
}
OPENVPN_LOG(self->prefix << "Stopping " << stopped << '/' << self->servlist.size() << " thread(s)");
}
});
}
void exit_timer_callback(const asio::error_code& e)
{
if (!e && !halt)
if (!e)
cancel();
}
@ -255,7 +288,11 @@ namespace openvpn {
typename Stats::Ptr stats;
ASIOSignals::Ptr signals;
AsioTimer exit_timer;
std::string prefix;
std::vector<std::thread*> threadlist;
#ifdef ASIO_HAS_LOCAL_SOCKETS
std::unique_ptr<asio::posix::stream_descriptor> exit_sock;
#endif
// servlist and related vars protected by mutex
std::mutex mutex;