diff --git a/libsponge/tcp_connection.cc b/libsponge/tcp_connection.cc index aab4055..3a54672 100644 --- a/libsponge/tcp_connection.cc +++ b/libsponge/tcp_connection.cc @@ -8,39 +8,124 @@ // automated checks run by `make check`. template -void DUMMY_CODE(Targs &&... /* unused */) {} +void DUMMY_CODE(Targs &&.../* unused */) {} using namespace std; -size_t TCPConnection::remaining_outbound_capacity() const { return {}; } +size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); } -size_t TCPConnection::bytes_in_flight() const { return {}; } +size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); } -size_t TCPConnection::unassembled_bytes() const { return {}; } +size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); } -size_t TCPConnection::time_since_last_segment_received() const { return {}; } +size_t TCPConnection::time_since_last_segment_received() const { return _time_last_segment_received; } -void TCPConnection::segment_received(const TCPSegment &seg) { DUMMY_CODE(seg); } +void TCPConnection::segment_received(const TCPSegment &seg) { + if (!_active) + return; + _time_last_segment_received = 0; + if (seg.header().rst) { + _sender.stream_in().set_error(); + _receiver.stream_out().set_error(); + _active = false; + return; + } + bool send_ack = seg.length_in_sequence_space(); + _receiver.segment_received(seg); + if (seg.header().ack && _receiver.ackno().has_value()) { + // filter out acks before syn + _sender.ack_received(seg.header().ackno, seg.header().win); + // _sender.fill_window(); + if (!_sender.segments_out().empty()) send_ack = false; + } + // std::cerr << seg.header().to_string(); + // std::cerr << _receiver.stream_out().bytes_written() << " " << _receiver.unassembled_bytes() << std::endl; + // std::cerr << seg.payload().size() << std::endl; -bool TCPConnection::active() const { return {}; } + if (send_ack) { + _sender.fill_window(); + if (_sender.segments_out().empty()) { + _sender.send_empty_segment(); + } + } + _send_all(); + // TODO: Keep-alive but it seems to be use + if (_receiver.stream_out().input_ended() && !_sender.stream_in().eof()) { + _linger_after_streams_finish = false; + } +} + +bool TCPConnection::active() const { return _active; } size_t TCPConnection::write(const string &data) { - DUMMY_CODE(data); - return {}; + auto &stream_in = _sender.stream_in(); + auto wr_sz = stream_in.write(data); + _sender.fill_window(); + _send_all(); + return wr_sz; } //! \param[in] ms_since_last_tick number of milliseconds since the last call to this method -void TCPConnection::tick(const size_t ms_since_last_tick) { DUMMY_CODE(ms_since_last_tick); } +void TCPConnection::tick(const size_t ms_since_last_tick) { + _time_last_segment_received += ms_since_last_tick; + _sender.tick(ms_since_last_tick); + if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) { + _sender.send_empty_segment(); + auto &seg_queue = _sender.segments_out(); + auto seg = seg_queue.front(); + seg_queue.pop(); + if (_receiver.ackno().has_value()) { + seg.header().ack = true; + seg.header().ackno = _receiver.ackno().value(); + } + seg.header().win = _receiver.window_size(); + seg.header().rst = true; + _segments_out.push(seg); + _sender.stream_in().set_error(); + _receiver.stream_out().set_error(); + _active = false; + return; + } + _send_all(); + if (!_sender.bytes_in_flight() && _sender.stream_in().eof() && _receiver.stream_out().input_ended() && + !_receiver.unassembled_bytes()) { + if (!_linger_after_streams_finish) { + _active = false; + } else if (_time_last_segment_received >= 10 * _cfg.rt_timeout) { + _active = false; + } + } +} -void TCPConnection::end_input_stream() {} +void TCPConnection::end_input_stream() { + _sender.stream_in().end_input(); + _sender.fill_window(); + _send_all(); +} -void TCPConnection::connect() {} +void TCPConnection::connect() { + _sender.fill_window(); + _send_all(); +} TCPConnection::~TCPConnection() { try { if (active()) { cerr << "Warning: Unclean shutdown of TCPConnection\n"; - + _sender.send_empty_segment(); + auto &seg_queue = _sender.segments_out(); + auto seg = seg_queue.front(); + seg_queue.pop(); + if (_receiver.ackno().has_value()) { + seg.header().ack = true; + seg.header().ackno = _receiver.ackno().value(); + } + seg.header().win = _receiver.window_size(); + seg.header().rst = true; + _segments_out.push(seg); + _sender.stream_in().set_error(); + _receiver.stream_out().set_error(); + _active = false; // Your code here: need to send a RST segment to the peer } } catch (const exception &e) { diff --git a/libsponge/tcp_connection.hh b/libsponge/tcp_connection.hh index b8a907d..bee8aba 100644 --- a/libsponge/tcp_connection.hh +++ b/libsponge/tcp_connection.hh @@ -5,6 +5,8 @@ #include "tcp_receiver.hh" #include "tcp_sender.hh" #include "tcp_state.hh" +#include +#include //! \brief A complete endpoint of a TCP connection class TCPConnection { @@ -20,7 +22,21 @@ class TCPConnection { //! for 10 * _cfg.rt_timeout milliseconds after both streams have ended, //! in case the remote TCPConnection doesn't know we've received its whole stream? bool _linger_after_streams_finish{true}; - + size_t _time_last_segment_received{0}; + bool _active{true}; + void _send_all() { + auto &seg_queue = _sender.segments_out(); + while (!seg_queue.empty()) { + auto seg = seg_queue.front(); + seg_queue.pop(); + if (_receiver.ackno().has_value()) { + seg.header().ack = true; + seg.header().ackno = _receiver.ackno().value(); + } + seg.header().win = std::min(_receiver.window_size(), static_cast(std::numeric_limits::max())); + _segments_out.push(seg); + } + } public: //! \name "Input" interface for the writer //!@{ diff --git a/libsponge/tcp_helpers/tcp_header.cc b/libsponge/tcp_helpers/tcp_header.cc index 77209fc..45424c3 100644 --- a/libsponge/tcp_helpers/tcp_header.cc +++ b/libsponge/tcp_helpers/tcp_header.cc @@ -80,7 +80,7 @@ string TCPHeader::serialize() const { //! \returns A string with the header's contents string TCPHeader::to_string() const { stringstream ss{}; - ss << hex << boolalpha << "TCP source port: " << +sport << '\n' + ss << boolalpha << "TCP source port: " << +sport << '\n' << "TCP dest port: " << +dport << '\n' << "TCP seqno: " << seqno << '\n' << "TCP ackno: " << ackno << '\n' diff --git a/libsponge/tcp_sender.cc b/libsponge/tcp_sender.cc index d87978a..cc58866 100644 --- a/libsponge/tcp_sender.cc +++ b/libsponge/tcp_sender.cc @@ -92,6 +92,7 @@ void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_si } else { _timer.start(); } + fill_window(); } //! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method @@ -112,7 +113,7 @@ unsigned int TCPSender::consecutive_retransmissions() const { return _timer.cons void TCPSender::send_empty_segment() { TCPHeader hdr; - hdr.seqno = wrap(_next_seqno - (_next_seqno ? 1 : 0), _isn); + hdr.seqno = next_seqno(); TCPSegment seg; seg.header() = hdr; _segments_out.push(seg);