cs144 lab3: the TCP sender
在前面的实验中,我们已经接触到了,seqno、ackno、SYN、FIN 标志、checksum 等技术。这个实验涉及到一个重要技术:超时重传。这是解决不可靠网络出现分组丢失问题的重要技术之一。
如何在可能丢失分组的不可靠网络之上进行可靠传输?ARQ 协议!
停等 ARQ 协议
停止并等待协议的工作原理如下:
- 发送点对接收点发送数据包,然后等待接收点回复ACK并且开始计时。
- 在等待过程中,发送点停止发送新的数据包。
- 当数据包没有成功被接收点接收时候,接收点不会发送ACK.这样发送点在等待一定时间后,重新发送数据包。
- 反复以上步骤直到收到从接收点发送的ACK.
连续 ARQ 协议
为了克服停止并等待ARQ协议长时间等待ACK的缺点。这个协议会连续发送一组数据包,然后再等待这些数据包的ACK.
本实验的实现是 连续 ARQ 协议。
需要注意的点:
- SYN、FIN 也会占用窗口空间
- 根据 receiver 发来的 segment 来设置 sender 的窗口;发第一个 syn segment 前,将 sender 的窗口设置为 1
- 当 peer 的窗口大小为 0 时,把发送窗口大小设置为 1
一些疑惑的点:
receiver 会发回来一个 ackno 和 windown_size 。receiver 发回的 ackno 表示的是 ackno 之前的所有 segment 都已经被成功接收并 reassemble。这也就是说对与一个 ByteStream 而言,receiver 已经把 [0, unwrap(ackno, isn, checkpoint) ) 范围的元素(在这里就是一个字节) reassemble 了!并且 receiver 还可以接受 [unwrap(ackno, isn, checkpoint), unwrap(ackno, isn, checkpoint) + window_size) 范围的元素,那么对于 sender 来说,它能的发送 ByteStream 的范围就是 [_next_seqno, unwrap(ackno, isn, checkpoint) + window_size);也就是说发送窗口大小是 unwrap(ackno, isn, checkpoint) + window_size - _next_seqno
。
本实验对于超时的 segment 的处理是这样的:仅仅设置一个定时器(注意不是像 SR 中那样每一个 segment 都设一个 timer),如果超时了,就重传最早的那个(seqno 最靠前的)segment,并把 RTO 乘倍。
sender 要发哪些东西给 peer?
首先 payload 肯定是要发的;其次是 header 中的一些字段:SYN、FIN,seqno;
sender 的发送窗口大小就没必要发了,因为本来就是 receiver 来控制 sender 能够发多少数据,sender 控制不了 receiver!
何时开启一个 timer ?
- 当带有负载或带有 SYN、FIN 标志的 segment 被发送时,就要开启 timer,因为这些 segment 都是占用了 absolut seqno 的 segment,要保证这些 segment 可靠送达对端!
- 超时重传时要重启 timer
为什么超时的时候 RTO 要乘倍增长或保持不变?
讲一下自己的理解:因为,如果是因为网络环境差导致的分组丢失,从而导致超时,那么我们应该尽量降低重传频率,不然网络环境会更加不堪重负,这时候就要乘倍增长 RTO 让重传频率降低;如果不是因为网络环境差,而是因为 receiver 处理数据太慢(即 ByteStream 中的数据迟迟没有读走),导致窗口大小变为 0,如果此时发送方傻傻的把发送窗口变成 0,而 receiver 又不继续发响应,那么当 receiver 的接收窗口大于 0 时,发送方无法得知,由于发送方窗口一直为 0 而导致通信被阻塞了(有没有这种可能?)。。。所以当 receiver 窗口为 0 时,而发送方仍有东西要发送时(即占用 absolute sequence number),让发送窗口为 1,这样发送方会不断向 receiver 发送一个字节,而此时 receiver 因为接收窗口为 0 而无法接收(直接不理会收到的 segment)导致发送方超时,由于这时候的超时不是网络环境导致的,所以让 RTO 保持原样是合理的!
实验代码
经过大概 10 次左右的 make check_lab3
终于通过了所有测试用例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| Test project /home/cs144/sponge/build Start 1: t_wrapping_ints_cmp 1/33 Test #1: t_wrapping_ints_cmp .............. Passed 0.00 sec Start 2: t_wrapping_ints_unwrap 2/33 Test #2: t_wrapping_ints_unwrap ........... Passed 0.00 sec Start 3: t_wrapping_ints_wrap 3/33 Test #3: t_wrapping_ints_wrap ............. Passed 0.00 sec Start 4: t_wrapping_ints_roundtrip 4/33 Test #4: t_wrapping_ints_roundtrip ........ Passed 0.12 sec Start 5: t_recv_connect 5/33 Test #5: t_recv_connect ................... Passed 0.00 sec Start 6: t_recv_transmit 6/33 Test #6: t_recv_transmit .................. Passed 0.03 sec Start 7: t_recv_window 7/33 Test #7: t_recv_window .................... Passed 0.00 sec Start 8: t_recv_reorder 8/33 Test #8: t_recv_reorder ................... Passed 0.00 sec Start 9: t_recv_close 9/33 Test #9: t_recv_close ..................... Passed 0.00 sec Start 10: t_recv_special 10/33 Test #10: t_recv_special ................... Passed 0.00 sec Start 11: t_send_connect 11/33 Test #11: t_send_connect ................... Passed 0.00 sec Start 12: t_send_transmit 12/33 Test #12: t_send_transmit .................. Passed 0.03 sec Start 13: t_send_retx 13/33 Test #13: t_send_retx ...................... Passed 0.00 sec Start 14: t_send_window 14/33 Test #14: t_send_window .................... Passed 0.01 sec Start 15: t_send_ack 15/33 Test #15: t_send_ack ....................... Passed 0.00 sec Start 16: t_send_close 16/33 Test #16: t_send_close ..................... Passed 0.00 sec Start 17: t_send_extra 17/33 Test #17: t_send_extra ..................... Passed 0.00 sec Start 18: t_strm_reassem_single 18/33 Test #18: t_strm_reassem_single ............ Passed 0.00 sec Start 19: t_strm_reassem_seq 19/33 Test #19: t_strm_reassem_seq ............... Passed 0.00 sec Start 20: t_strm_reassem_dup 20/33 Test #20: t_strm_reassem_dup ............... Passed 0.00 sec Start 21: t_strm_reassem_holes 21/33 Test #21: t_strm_reassem_holes ............. Passed 0.00 sec Start 22: t_strm_reassem_many 22/33 Test #22: t_strm_reassem_many .............. Passed 0.04 sec Start 23: t_strm_reassem_overlapping 23/33 Test #23: t_strm_reassem_overlapping ....... Passed 0.00 sec Start 24: t_strm_reassem_win 24/33 Test #24: t_strm_reassem_win ............... Passed 0.03 sec Start 25: t_strm_reassem_cap 25/33 Test #25: t_strm_reassem_cap ............... Passed 0.05 sec Start 26: t_byte_stream_construction 26/33 Test #26: t_byte_stream_construction ....... Passed 0.00 sec Start 27: t_byte_stream_one_write 27/33 Test #27: t_byte_stream_one_write .......... Passed 0.00 sec Start 28: t_byte_stream_two_writes 28/33 Test #28: t_byte_stream_two_writes ......... Passed 0.00 sec Start 29: t_byte_stream_capacity 29/33 Test #29: t_byte_stream_capacity ........... Passed 0.28 sec Start 30: t_byte_stream_many_writes 30/33 Test #30: t_byte_stream_many_writes ........ Passed 0.00 sec Start 53: t_address_dt 31/33 Test #53: t_address_dt ..................... Passed 5.03 sec Start 54: t_parser_dt 32/33 Test #54: t_parser_dt ...................... Passed 0.00 sec Start 55: t_socket_dt 33/33 Test #55: t_socket_dt ...................... Passed 0.00 sec
|
代码如下:
重传定时器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class RTimer { public: RTimer() = default; void start(uint32_t to) { _time_out = to; _started = true; } void stop() { _started = false; } uint32_t time_out() const { return _time_out; }
bool is_time_out(size_t t) const { return _started && t >= _time_out; } bool started() const { return _started; } private: bool _started{}; uint32_t _time_out{}; };
|
TCPSender 类中,自己添加的 private 变量 和 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| class TCPSender { private: ... RTimer _timer;
std::queue<TCPSegment> _outstandings;
size_t _ms_since_alive{};
unsigned int _rtx{};
uint64_t _checkpoint{};
uint16_t _win_size{1};
unsigned int _rto;
bool _peer_busy{};
uint64_t _bytes_in_flight{};
bool _fin_sended{}; private: TCPSegment make_segment(std::string&& payload, bool syn, bool fin); WrappingInt32 get_seqno(); uint64_t get_absolute_seqno(WrappingInt32, WrappingInt32, uint64_t) const; bool space_available() const { return _win_size > 0; } bool is_peer_busy() const { return _peer_busy; }
void send_and_store(std::string&& payload, bool syn, bool fin); public: ... };
|
TCPSender 接口的具体实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn) : _isn(fixed_isn.value_or(WrappingInt32{random_device()()})) , _initial_retransmission_timeout{retx_timeout} , _stream(capacity) , _timer() , _outstandings() , _rto(retx_timeout) {}
uint64_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; }
void TCPSender::fill_window() { if (!space_available()) return; if (_next_seqno == 0) { size_t can_read = std::min(TCPConfig::MAX_PAYLOAD_SIZE, static_cast<const size_t&>(_win_size - 1)); send_and_store(_stream.read(can_read), true, false); } else if (_stream.input_ended() && !_fin_sended) { if (_win_size >= _stream.buffer_size() + 1) { send_and_store(_stream.read(_win_size), false, true); _fin_sended = true; } } while (!_stream.buffer_empty() && space_available()) { size_t can_read = std::min(TCPConfig::MAX_PAYLOAD_SIZE, static_cast<const size_t&>(_win_size)); send_and_store(_stream.read(can_read), 0, 0); } if (!_timer.started() && !_outstandings.empty()) _timer.start(_ms_since_alive + _rto); }
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) { uint64_t left = get_absolute_seqno(ackno, _isn, _checkpoint); if (left > _next_seqno) return; uint64_t right = left + window_size; _win_size = right - _next_seqno;
if (window_size == 0) { _peer_busy = true; _win_size = 1; } else _peer_busy = false;
bool has_new_data = false; while (!_outstandings.empty()) { TCPSegment segment = _outstandings.front(); uint64_t ab_seqno = get_absolute_seqno(segment.header().seqno, _isn, _checkpoint); uint16_t length = segment.length_in_sequence_space(); if (ab_seqno + length > left) break;
has_new_data = true; _outstandings.pop(); _bytes_in_flight -= length;
_rto = _initial_retransmission_timeout; _rtx = 0; }; if (_outstandings.empty()) _timer.stop(); else if(has_new_data) _timer.start(_ms_since_alive + _rto); } void TCPSender::tick(const size_t ms_since_last_tick) { if (_outstandings.empty()) return; _ms_since_alive += ms_since_last_tick; if (_timer.is_time_out(_ms_since_alive)) { if (is_peer_busy()) { } else { _rtx++; _rto *= 2; } _segments_out.push(_outstandings.front()); _timer.start(_ms_since_alive + _rto); } }
unsigned int TCPSender::consecutive_retransmissions() const { return _rtx; }
void TCPSender::send_empty_segment() { _segments_out.push(make_segment("", false, false)); }
TCPSegment TCPSender::make_segment(std::string&& payload, bool syn, bool fin) { TCPSegment segment; segment.payload() = std::move(payload); segment.header().seqno = get_seqno(); segment.header().syn = syn; segment.header().fin = fin;
return segment; }
WrappingInt32 TCPSender::get_seqno() { return wrap(_next_seqno, _isn); }
uint64_t TCPSender::get_absolute_seqno(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) const { return unwrap(n, isn, checkpoint); }
void TCPSender::send_and_store(std::string&& payload, bool syn, bool fin) { TCPSegment segment = make_segment(std::move(payload), syn, fin);
uint64_t length = segment.length_in_sequence_space(); _segments_out.push(segment); _next_seqno += length; _win_size -= length;
_outstandings.push(segment); _bytes_in_flight += length;
_checkpoint = _next_seqno; }
|