aboutsummaryrefslogtreecommitdiff
path: root/source/lib/control/src/connection.cpp
blob: 30fc8af1c2006208eac16de36861121fe11705ab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include "wanda/control/connection.hpp"

#include "wanda/proto/message.hpp"

#include <asio.hpp>

#include <limits>

namespace wanda::control
{
  connection::pointer make_connection(connection::protocol::socket && socket)
  {
    return std::make_shared<connection>(connection::key{}, std::move(socket));
  }

  connection::connection(connection::key key, connection::protocol::socket socket)
      : keyed{key}
      , m_socket{std::move(socket)}
  {
  }

  bool connection::add(listener * listener)
  {
    auto [_, inserted] = m_listeners.insert(listener);
    return inserted;
  }

  bool connection::remove(listener * listener)
  {
    return m_listeners.erase(listener);
  }

  void connection::start()
  {
    if (m_state == state::unknown)
    {
      m_state = state::fresh;
      perform_read();
    }
  }

  void connection::send(proto::message message)
  {
    m_output << message << '\n';
    asio::async_write(m_socket, m_out, asio::transfer_exactly(message.size() + 1), [that = shared_from_this(), this](auto const & error, auto const length) {
      if (error)
      {
        // TODO: Handle error
      }
      else
      {
        m_out.consume(length);
      }
    });
  }

  void connection::close()
  {
    if (auto error = std::error_code{}; m_socket.cancel(error))
    {
      for (auto & listener : m_listeners)
      {
        listener->on_error(shared_from_this(), error);
      }
    }

    if (auto error = std::error_code{}; m_socket.close(error))
    {
      for (auto & listener : m_listeners)
      {
        listener->on_error(shared_from_this(), error);
      }
    }

    for (auto & listener : m_listeners)
    {
      listener->on_close(shared_from_this());
    }
    m_listeners.clear();
  }

  void connection::update(state state)
  {
    m_state = state;
  }

  connection::state connection::current_state() const
  {
    return m_state;
  }

  void connection::perform_read()
  {
    asio::async_read_until(m_socket, m_in, '\n', [that = shared_from_this(), this](auto const & error, auto const length) {
      if (error)
      {
        for (auto & listener : m_listeners)
        {
          listener->on_error(shared_from_this(), error);
        }
        close();
      }
      else
      {
        auto msg = proto::message{};
        m_input >> msg;
        if (!m_input)
        {
          m_input.ignore(std::numeric_limits<std::streamsize>::max());
          m_input.clear();
        }
        else
        {
          for (auto & listener : m_listeners)
          {
            listener->on_received(shared_from_this(), msg);
          }
        }
        perform_read();
      }
    });
  }

}  // namespace wanda::control