331 lines
13 KiB
C++
331 lines
13 KiB
C++
#include "tcp_sponge_socket.hh"
|
|
|
|
#include "network_interface.hh"
|
|
#include "parser.hh"
|
|
#include "tun.hh"
|
|
#include "util.hh"
|
|
|
|
#include <cstddef>
|
|
#include <exception>
|
|
#include <iostream>
|
|
#include <stdexcept>
|
|
#include <string>
|
|
#include <sys/socket.h>
|
|
#include <sys/syscall.h>
|
|
#include <sys/types.h>
|
|
#include <unistd.h>
|
|
#include <utility>
|
|
|
|
using namespace std;
|
|
|
|
static constexpr size_t TCP_TICK_MS = 10;
|
|
|
|
//! \param[in] condition is a function returning true if loop should continue
|
|
template <typename AdaptT>
|
|
void TCPSpongeSocket<AdaptT>::_tcp_loop(const function<bool()> &condition) {
|
|
auto base_time = timestamp_ms();
|
|
while (condition()) {
|
|
auto ret = _eventloop.wait_next_event(TCP_TICK_MS);
|
|
if (ret == EventLoop::Result::Exit or _abort) {
|
|
break;
|
|
}
|
|
|
|
if (_tcp.value().active()) {
|
|
const auto next_time = timestamp_ms();
|
|
_tcp.value().tick(next_time - base_time);
|
|
_datagram_adapter.tick(next_time - base_time);
|
|
base_time = next_time;
|
|
}
|
|
}
|
|
}
|
|
|
|
//! \param[in] data_socket_pair is a pair of connected AF_UNIX SOCK_STREAM sockets
|
|
//! \param[in] datagram_interface is the interface for reading and writing datagrams
|
|
template <typename AdaptT>
|
|
TCPSpongeSocket<AdaptT>::TCPSpongeSocket(pair<FileDescriptor, FileDescriptor> data_socket_pair,
|
|
AdaptT &&datagram_interface)
|
|
: LocalStreamSocket(move(data_socket_pair.first))
|
|
, _thread_data(move(data_socket_pair.second))
|
|
, _datagram_adapter(move(datagram_interface)) {
|
|
_thread_data.set_blocking(false);
|
|
}
|
|
|
|
template <typename AdaptT>
|
|
void TCPSpongeSocket<AdaptT>::_initialize_TCP(const TCPConfig &config) {
|
|
_tcp.emplace(config);
|
|
|
|
// Set up the event loop
|
|
|
|
// There are four possible events to handle:
|
|
//
|
|
// 1) Incoming datagram received (needs to be given to
|
|
// TCPConnection::segment_received method)
|
|
//
|
|
// 2) Outbound bytes received from local application via a write()
|
|
// call (needs to be read from the local stream socket and
|
|
// given to TCPConnection::data_written method)
|
|
//
|
|
// 3) Incoming bytes reassembled by the TCPConnection
|
|
// (needs to be read from the inbound_stream and written
|
|
// to the local stream socket back to the application)
|
|
//
|
|
// 4) Outbound segment generated by TCP (needs to be
|
|
// given to underlying datagram socket)
|
|
|
|
// rule 1: read from filtered packet stream and dump into TCPConnection
|
|
_eventloop.add_rule(_datagram_adapter,
|
|
Direction::In,
|
|
[&] {
|
|
auto seg = _datagram_adapter.read();
|
|
if (seg) {
|
|
_tcp->segment_received(move(seg.value()));
|
|
}
|
|
|
|
// debugging output:
|
|
if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0 and not _fully_acked) {
|
|
cerr << "DEBUG: Outbound stream to "
|
|
<< _datagram_adapter.config().destination.to_string()
|
|
<< " has been fully acknowledged.\n";
|
|
_fully_acked = true;
|
|
}
|
|
},
|
|
[&] { return _tcp->active(); });
|
|
|
|
// rule 2: read from pipe into outbound buffer
|
|
_eventloop.add_rule(
|
|
_thread_data,
|
|
Direction::In,
|
|
[&] {
|
|
const auto data = _thread_data.read(_tcp->remaining_outbound_capacity());
|
|
const auto len = data.size();
|
|
const auto amount_written = _tcp->write(move(data));
|
|
if (amount_written != len) {
|
|
throw runtime_error("TCPConnection::write() accepted less than advertised length");
|
|
}
|
|
|
|
if (_thread_data.eof()) {
|
|
_tcp->end_input_stream();
|
|
_outbound_shutdown = true;
|
|
|
|
// debugging output:
|
|
cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
|
|
<< " finished (" << _tcp.value().bytes_in_flight() << " byte"
|
|
<< (_tcp.value().bytes_in_flight() == 1 ? "" : "s") << " still in flight).\n";
|
|
}
|
|
},
|
|
[&] { return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); },
|
|
[&] {
|
|
_tcp->end_input_stream();
|
|
_outbound_shutdown = true;
|
|
});
|
|
|
|
// rule 3: read from inbound buffer into pipe
|
|
_eventloop.add_rule(
|
|
_thread_data,
|
|
Direction::Out,
|
|
[&] {
|
|
ByteStream &inbound = _tcp->inbound_stream();
|
|
// Write from the inbound_stream into
|
|
// the pipe, handling the possibility of a partial
|
|
// write (i.e., only pop what was actually written).
|
|
const size_t amount_to_write = min(size_t(65536), inbound.buffer_size());
|
|
const std::string buffer = inbound.peek_output(amount_to_write);
|
|
const auto bytes_written = _thread_data.write(move(buffer), false);
|
|
inbound.pop_output(bytes_written);
|
|
|
|
if (inbound.eof() or inbound.error()) {
|
|
_thread_data.shutdown(SHUT_WR);
|
|
_inbound_shutdown = true;
|
|
|
|
// debugging output:
|
|
cerr << "DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string()
|
|
<< " finished " << (inbound.error() ? "with an error/reset.\n" : "cleanly.\n");
|
|
if (_tcp.value().state() == TCPState::State::TIME_WAIT) {
|
|
cerr << "DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...\n";
|
|
}
|
|
}
|
|
},
|
|
[&] {
|
|
return (not _tcp->inbound_stream().buffer_empty()) or
|
|
((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) and not _inbound_shutdown);
|
|
});
|
|
|
|
// rule 4: read outbound segments from TCPConnection and send as datagrams
|
|
_eventloop.add_rule(_datagram_adapter,
|
|
Direction::Out,
|
|
[&] {
|
|
while (not _tcp->segments_out().empty()) {
|
|
_datagram_adapter.write(_tcp->segments_out().front());
|
|
_tcp->segments_out().pop();
|
|
}
|
|
},
|
|
[&] { return not _tcp->segments_out().empty(); });
|
|
}
|
|
|
|
//! \brief Call [socketpair](\ref man2::socketpair) and return connected Unix-domain sockets of specified type
|
|
//! \param[in] type is the type of AF_UNIX sockets to create (e.g., SOCK_SEQPACKET)
|
|
//! \returns a std::pair of connected sockets
|
|
static inline pair<FileDescriptor, FileDescriptor> socket_pair_helper(const int type) {
|
|
int fds[2];
|
|
SystemCall("socketpair", ::socketpair(AF_UNIX, type, 0, static_cast<int *>(fds)));
|
|
return {FileDescriptor(fds[0]), FileDescriptor(fds[1])};
|
|
}
|
|
|
|
//! \param[in] datagram_interface is the underlying interface (e.g. to UDP, IP, or Ethernet)
|
|
template <typename AdaptT>
|
|
TCPSpongeSocket<AdaptT>::TCPSpongeSocket(AdaptT &&datagram_interface)
|
|
: TCPSpongeSocket(socket_pair_helper(SOCK_STREAM), move(datagram_interface)) {}
|
|
|
|
template <typename AdaptT>
|
|
TCPSpongeSocket<AdaptT>::~TCPSpongeSocket() {
|
|
try {
|
|
if (_tcp_thread.joinable()) {
|
|
cerr << "Warning: unclean shutdown of TCPSpongeSocket\n";
|
|
// force the other side to exit
|
|
_abort.store(true);
|
|
_tcp_thread.join();
|
|
}
|
|
} catch (const exception &e) {
|
|
cerr << "Exception destructing TCPSpongeSocket: " << e.what() << endl;
|
|
}
|
|
}
|
|
|
|
template <typename AdaptT>
|
|
void TCPSpongeSocket<AdaptT>::wait_until_closed() {
|
|
shutdown(SHUT_RDWR);
|
|
if (_tcp_thread.joinable()) {
|
|
cerr << "DEBUG: Waiting for clean shutdown... ";
|
|
_tcp_thread.join();
|
|
cerr << "done.\n";
|
|
}
|
|
}
|
|
|
|
//! \param[in] c_tcp is the TCPConfig for the TCPConnection
|
|
//! \param[in] c_ad is the FdAdapterConfig for the FdAdapter
|
|
template <typename AdaptT>
|
|
void TCPSpongeSocket<AdaptT>::connect(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {
|
|
if (_tcp) {
|
|
throw runtime_error("connect() with TCPConnection already initialized");
|
|
}
|
|
|
|
_initialize_TCP(c_tcp);
|
|
|
|
_datagram_adapter.config_mut() = c_ad;
|
|
|
|
cerr << "DEBUG: Connecting to " << c_ad.destination.to_string() << "...\n";
|
|
_tcp->connect();
|
|
|
|
const TCPState expected_state = TCPState::State::SYN_SENT;
|
|
|
|
if (_tcp->state() != expected_state) {
|
|
throw runtime_error("After TCPConnection::connect(), state was " + _tcp->state().name() + " but expected " +
|
|
expected_state.name());
|
|
}
|
|
|
|
_tcp_loop([&] { return _tcp->state() == TCPState::State::SYN_SENT; });
|
|
cerr << "Successfully connected to " << c_ad.destination.to_string() << ".\n";
|
|
|
|
_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
|
|
}
|
|
|
|
//! \param[in] c_tcp is the TCPConfig for the TCPConnection
|
|
//! \param[in] c_ad is the FdAdapterConfig for the FdAdapter
|
|
template <typename AdaptT>
|
|
void TCPSpongeSocket<AdaptT>::listen_and_accept(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {
|
|
if (_tcp) {
|
|
throw runtime_error("listen_and_accept() with TCPConnection already initialized");
|
|
}
|
|
|
|
_initialize_TCP(c_tcp);
|
|
|
|
_datagram_adapter.config_mut() = c_ad;
|
|
_datagram_adapter.set_listening(true);
|
|
|
|
cerr << "DEBUG: Listening for incoming connection...\n";
|
|
_tcp_loop([&] {
|
|
const auto s = _tcp->state();
|
|
return (s == TCPState::State::LISTEN or s == TCPState::State::SYN_RCVD or s == TCPState::State::SYN_SENT);
|
|
});
|
|
cerr << "New connection from " << _datagram_adapter.config().destination.to_string() << ".\n";
|
|
|
|
_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
|
|
}
|
|
|
|
template <typename AdaptT>
|
|
void TCPSpongeSocket<AdaptT>::_tcp_main() {
|
|
try {
|
|
if (not _tcp.has_value()) {
|
|
throw runtime_error("no TCP");
|
|
}
|
|
_tcp_loop([] { return true; });
|
|
shutdown(SHUT_RDWR);
|
|
if (not _tcp.value().active()) {
|
|
cerr << "DEBUG: TCP connection finished "
|
|
<< (_tcp.value().state() == TCPState::State::RESET ? "uncleanly" : "cleanly.\n");
|
|
}
|
|
_tcp.reset();
|
|
} catch (const exception &e) {
|
|
cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
//! Specialization of TCPSpongeSocket for TCPOverUDPSocketAdapter
|
|
template class TCPSpongeSocket<TCPOverUDPSocketAdapter>;
|
|
|
|
//! Specialization of TCPSpongeSocket for TCPOverIPv4OverTunFdAdapter
|
|
template class TCPSpongeSocket<TCPOverIPv4OverTunFdAdapter>;
|
|
|
|
//! Specialization of TCPSpongeSocket for TCPOverIPv4OverEthernetAdapter
|
|
template class TCPSpongeSocket<TCPOverIPv4OverEthernetAdapter>;
|
|
|
|
//! Specialization of TCPSpongeSocket for LossyTCPOverUDPSocketAdapter
|
|
template class TCPSpongeSocket<LossyTCPOverUDPSocketAdapter>;
|
|
|
|
//! Specialization of TCPSpongeSocket for LossyTCPOverIPv4OverTunFdAdapter
|
|
template class TCPSpongeSocket<LossyTCPOverIPv4OverTunFdAdapter>;
|
|
|
|
CS144TCPSocket::CS144TCPSocket() : TCPOverIPv4SpongeSocket(TCPOverIPv4OverTunFdAdapter(TunFD("tun144"))) {}
|
|
|
|
void CS144TCPSocket::connect(const Address &address) {
|
|
TCPConfig tcp_config;
|
|
tcp_config.rt_timeout = 100;
|
|
|
|
FdAdapterConfig multiplexer_config;
|
|
multiplexer_config.source = {"169.254.144.9", to_string(uint16_t(random_device()()))};
|
|
multiplexer_config.destination = address;
|
|
|
|
TCPOverIPv4SpongeSocket::connect(tcp_config, multiplexer_config);
|
|
}
|
|
|
|
static const string LOCAL_TAP_IP_ADDRESS = "169.254.10.9";
|
|
static const string LOCAL_TAP_NEXT_HOP_ADDRESS = "169.254.10.1";
|
|
|
|
EthernetAddress random_private_ethernet_address() {
|
|
EthernetAddress addr;
|
|
for (auto &byte : addr) {
|
|
byte = random_device()(); // use a random local Ethernet address
|
|
}
|
|
addr.at(0) |= 0x02; // "10" in last two binary digits marks a private Ethernet address
|
|
addr.at(0) &= 0xfe;
|
|
|
|
return addr;
|
|
}
|
|
|
|
FullStackSocket::FullStackSocket()
|
|
: TCPOverIPv4OverEthernetSpongeSocket(TCPOverIPv4OverEthernetAdapter(TapFD("tap10"),
|
|
random_private_ethernet_address(),
|
|
Address(LOCAL_TAP_IP_ADDRESS, "0"),
|
|
Address(LOCAL_TAP_NEXT_HOP_ADDRESS, "0"))) {}
|
|
|
|
void FullStackSocket::connect(const Address &address) {
|
|
TCPConfig tcp_config;
|
|
tcp_config.rt_timeout = 100;
|
|
|
|
FdAdapterConfig multiplexer_config;
|
|
multiplexer_config.source = {LOCAL_TAP_IP_ADDRESS, to_string(uint16_t(random_device()()))};
|
|
multiplexer_config.destination = address;
|
|
|
|
TCPOverIPv4OverEthernetSpongeSocket::connect(tcp_config, multiplexer_config);
|
|
}
|