cs144 lab3: the TCP sender

在前面的实验中,我们已经接触到了,seqno、ackno、SYN、FIN 标志、checksum 等技术。这个实验涉及到一个重要技术:超时重传。这是解决不可靠网络出现分组丢失问题的重要技术之一。

如何在可能丢失分组的不可靠网络之上进行可靠传输?ARQ 协议!

停等 ARQ 协议

停止并等待协议的工作原理如下:

  1. 发送点对接收点发送数据包,然后等待接收点回复ACK并且开始计时。
  2. 在等待过程中,发送点停止发送新的数据包。
  3. 当数据包没有成功被接收点接收时候,接收点不会发送ACK.这样发送点在等待一定时间后,重新发送数据包。
  4. 反复以上步骤直到收到从接收点发送的ACK.

连续 ARQ 协议

为了克服停止并等待ARQ协议长时间等待ACK的缺点。这个协议会连续发送一组数据包,然后再等待这些数据包的ACK.

本实验的实现是 连续 ARQ 协议

需要注意的点:

  • SYN、FIN 也会占用窗口空间
  • 根据 receiver 发来的 segment 来设置 sender 的窗口;发第一个 syn segment 前,将 sender 的窗口设置为 1
  • 当 peer 的窗口大小为 0 时,把发送窗口大小设置为 1

一些疑惑的点:

receiver 会发回来一个 acknowindown_sizereceiver 发回的 ackno 表示的是 ackno 之前的所有 segment 都已经被成功接收并 reassemble。这也就是说对与一个 ByteStream 而言,receiver 已经把 [0, unwrap(ackno, isn, checkpoint) ) 范围的元素(在这里就是一个字节) reassemble 了!并且 receiver 还可以接受 [unwrap(ackno, isn, checkpoint), unwrap(ackno, isn, checkpoint) + window_size) 范围的元素,那么对于 sender 来说,它能的发送 ByteStream 的范围就是 [_next_seqno, unwrap(ackno, isn, checkpoint) + window_size);也就是说发送窗口大小是 unwrap(ackno, isn, checkpoint) + window_size - _next_seqno

本实验对于超时的 segment 的处理是这样的:仅仅设置一个定时器(注意不是像 SR 中那样每一个 segment 都设一个 timer),如果超时了,就重传最早的那个(seqno 最靠前的)segment,并把 RTO 乘倍。

sender 要发哪些东西给 peer?

首先 payload 肯定是要发的;其次是 header 中的一些字段:SYN、FIN,seqno;

sender 的发送窗口大小就没必要发了,因为本来就是 receiver 来控制 sender 能够发多少数据,sender 控制不了 receiver!

何时开启一个 timer ?

  • 当带有负载或带有 SYN、FIN 标志的 segment 被发送时,就要开启 timer,因为这些 segment 都是占用了 absolut seqno 的 segment,要保证这些 segment 可靠送达对端!
  • 超时重传时要重启 timer

为什么超时的时候 RTO 要乘倍增长或保持不变?

讲一下自己的理解:因为,如果是因为网络环境差导致的分组丢失,从而导致超时,那么我们应该尽量降低重传频率,不然网络环境会更加不堪重负,这时候就要乘倍增长 RTO 让重传频率降低;如果不是因为网络环境差,而是因为 receiver 处理数据太慢(即 ByteStream 中的数据迟迟没有读走),导致窗口大小变为 0,如果此时发送方傻傻的把发送窗口变成 0,而 receiver 又不继续发响应,那么当 receiver 的接收窗口大于 0 时,发送方无法得知,由于发送方窗口一直为 0 而导致通信被阻塞了(有没有这种可能?)。。。所以当 receiver 窗口为 0 时,而发送方仍有东西要发送时(即占用 absolute sequence number),让发送窗口为 1,这样发送方会不断向 receiver 发送一个字节,而此时 receiver 因为接收窗口为 0 而无法接收(直接不理会收到的 segment)导致发送方超时,由于这时候的超时不是网络环境导致的,所以让 RTO 保持原样是合理的!

实验代码

经过大概 10 次左右的 make check_lab3 终于通过了所有测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
Test project /home/cs144/sponge/build
Start 1: t_wrapping_ints_cmp
1/33 Test #1: t_wrapping_ints_cmp .............. Passed 0.00 sec
Start 2: t_wrapping_ints_unwrap
2/33 Test #2: t_wrapping_ints_unwrap ........... Passed 0.00 sec
Start 3: t_wrapping_ints_wrap
3/33 Test #3: t_wrapping_ints_wrap ............. Passed 0.00 sec
Start 4: t_wrapping_ints_roundtrip
4/33 Test #4: t_wrapping_ints_roundtrip ........ Passed 0.12 sec
Start 5: t_recv_connect
5/33 Test #5: t_recv_connect ................... Passed 0.00 sec
Start 6: t_recv_transmit
6/33 Test #6: t_recv_transmit .................. Passed 0.03 sec
Start 7: t_recv_window
7/33 Test #7: t_recv_window .................... Passed 0.00 sec
Start 8: t_recv_reorder
8/33 Test #8: t_recv_reorder ................... Passed 0.00 sec
Start 9: t_recv_close
9/33 Test #9: t_recv_close ..................... Passed 0.00 sec
Start 10: t_recv_special
10/33 Test #10: t_recv_special ................... Passed 0.00 sec
Start 11: t_send_connect
11/33 Test #11: t_send_connect ................... Passed 0.00 sec
Start 12: t_send_transmit
12/33 Test #12: t_send_transmit .................. Passed 0.03 sec
Start 13: t_send_retx
13/33 Test #13: t_send_retx ...................... Passed 0.00 sec
Start 14: t_send_window
14/33 Test #14: t_send_window .................... Passed 0.01 sec
Start 15: t_send_ack
15/33 Test #15: t_send_ack ....................... Passed 0.00 sec
Start 16: t_send_close
16/33 Test #16: t_send_close ..................... Passed 0.00 sec
Start 17: t_send_extra
17/33 Test #17: t_send_extra ..................... Passed 0.00 sec
Start 18: t_strm_reassem_single
18/33 Test #18: t_strm_reassem_single ............ Passed 0.00 sec
Start 19: t_strm_reassem_seq
19/33 Test #19: t_strm_reassem_seq ............... Passed 0.00 sec
Start 20: t_strm_reassem_dup
20/33 Test #20: t_strm_reassem_dup ............... Passed 0.00 sec
Start 21: t_strm_reassem_holes
21/33 Test #21: t_strm_reassem_holes ............. Passed 0.00 sec
Start 22: t_strm_reassem_many
22/33 Test #22: t_strm_reassem_many .............. Passed 0.04 sec
Start 23: t_strm_reassem_overlapping
23/33 Test #23: t_strm_reassem_overlapping ....... Passed 0.00 sec
Start 24: t_strm_reassem_win
24/33 Test #24: t_strm_reassem_win ............... Passed 0.03 sec
Start 25: t_strm_reassem_cap
25/33 Test #25: t_strm_reassem_cap ............... Passed 0.05 sec
Start 26: t_byte_stream_construction
26/33 Test #26: t_byte_stream_construction ....... Passed 0.00 sec
Start 27: t_byte_stream_one_write
27/33 Test #27: t_byte_stream_one_write .......... Passed 0.00 sec
Start 28: t_byte_stream_two_writes
28/33 Test #28: t_byte_stream_two_writes ......... Passed 0.00 sec
Start 29: t_byte_stream_capacity
29/33 Test #29: t_byte_stream_capacity ........... Passed 0.28 sec
Start 30: t_byte_stream_many_writes
30/33 Test #30: t_byte_stream_many_writes ........ Passed 0.00 sec
Start 53: t_address_dt
31/33 Test #53: t_address_dt ..................... Passed 5.03 sec
Start 54: t_parser_dt
32/33 Test #54: t_parser_dt ...................... Passed 0.00 sec
Start 55: t_socket_dt
33/33 Test #55: t_socket_dt ...................... Passed 0.00 sec

代码如下:

重传定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class RTimer {
public:
RTimer() = default;
void start(uint32_t to) {
_time_out = to;
_started = true;
}
void stop() { _started = false; }
uint32_t time_out() const { return _time_out; }

bool is_time_out(size_t t) const { return _started && t >= _time_out; }
bool started() const { return _started; }
private:
bool _started{};
uint32_t _time_out{};
};

TCPSender 类中,自己添加的 private 变量 和 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class TCPSender {
private:
...
// my code:
RTimer _timer;

std::queue<TCPSegment> _outstandings;

size_t _ms_since_alive{};

unsigned int _rtx{}; // consective retransmission times

uint64_t _checkpoint{};

uint16_t _win_size{1};

unsigned int _rto; // retransmission time out

bool _peer_busy{};

uint64_t _bytes_in_flight{};

bool _fin_sended{};
private:
TCPSegment make_segment(std::string&& payload, bool syn, bool fin);
WrappingInt32 get_seqno();
uint64_t get_absolute_seqno(WrappingInt32, WrappingInt32, uint64_t) const;
bool space_available() const { return _win_size > 0; }
bool is_peer_busy() const { return _peer_busy; }

void send_and_store(std::string&& payload, bool syn, bool fin);
public:
...
};

TCPSender 接口的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, _timer()
, _outstandings()
, _rto(retx_timeout)
{}

uint64_t TCPSender::bytes_in_flight() const {
return _bytes_in_flight;
}

void TCPSender::fill_window() {
if (!space_available()) return;
// size_t last_size = _outstandings.size();
if (_next_seqno == 0) { // 前两次握手
size_t can_read = std::min(TCPConfig::MAX_PAYLOAD_SIZE,
static_cast<const size_t&>(_win_size - 1)); // 预留一个字节给 syn
send_and_store(_stream.read(can_read), true, false);
} else if (_stream.input_ended() && !_fin_sended) { // 后面四次挥手
// 判断何时发送 fin
if (_win_size >= _stream.buffer_size() + 1) {
// 刚好能把 fin 塞入
send_and_store(_stream.read(_win_size), false, true);
_fin_sended = true;
}
}
// 仅传输 payload
while (!_stream.buffer_empty() && space_available()) {
size_t can_read = std::min(TCPConfig::MAX_PAYLOAD_SIZE,
static_cast<const size_t&>(_win_size));
send_and_store(_stream.read(can_read), 0, 0);
}
// 因为 ack_received 后会调用一次 fill_window 因此判断下是否已经 FIN_ACKED,如果 FIN_ACKED 了,那么 _outstandings 肯定为空
if (!_timer.started() && !_outstandings.empty())
_timer.start(_ms_since_alive + _rto);
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t left = get_absolute_seqno(ackno, _isn, _checkpoint);
if (left > _next_seqno) return;
uint64_t right = left + window_size;
_win_size = right - _next_seqno;

if (window_size == 0) { _peer_busy = true; _win_size = 1; }
else _peer_busy = false;

bool has_new_data = false;
while (!_outstandings.empty()) {
TCPSegment segment = _outstandings.front();
uint64_t ab_seqno = get_absolute_seqno(segment.header().seqno, _isn, _checkpoint);
uint16_t length = segment.length_in_sequence_space();
if (ab_seqno + length > left)
break;

has_new_data = true;
_outstandings.pop();
_bytes_in_flight -= length;

_rto = _initial_retransmission_timeout;
_rtx = 0;
};
if (_outstandings.empty()) _timer.stop();
else if(has_new_data) _timer.start(_ms_since_alive + _rto);
}
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
if (_outstandings.empty()) return;
_ms_since_alive += ms_since_last_tick;
if (_timer.is_time_out(_ms_since_alive)) {
if (is_peer_busy()) {
// 如果是对端繁忙的情况,即 发送端还有数据发,但 receiver 窗口变成 0
} else {
// 网络环境差导致丢包或延迟送达
_rtx++;
_rto *= 2;
}
// 重传最早的那个 segment,并重启定时器
// assert(!_outstandings.empty()); // 一定非空
_segments_out.push(_outstandings.front());
// _outstandings.pop();
_timer.start(_ms_since_alive + _rto);
}
}

unsigned int TCPSender::consecutive_retransmissions() const { return _rtx; }

void TCPSender::send_empty_segment() {
_segments_out.push(make_segment("", false, false));
}

TCPSegment TCPSender::make_segment(std::string&& payload, bool syn, bool fin) {
TCPSegment segment;
segment.payload() = std::move(payload);
segment.header().seqno = get_seqno();
segment.header().syn = syn;
segment.header().fin = fin;

// _next_seqno += segment.length_in_sequence_space();
return segment;
}

WrappingInt32 TCPSender::get_seqno() {
return wrap(_next_seqno, _isn);
}

uint64_t TCPSender::get_absolute_seqno(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) const {
return unwrap(n, isn, checkpoint);
}

void TCPSender::send_and_store(std::string&& payload, bool syn, bool fin) {
TCPSegment segment = make_segment(std::move(payload), syn, fin);

// send
uint64_t length = segment.length_in_sequence_space();
_segments_out.push(segment);
_next_seqno += length;
_win_size -= length;

// store
_outstandings.push(segment);
_bytes_in_flight += length;

// update checkpoint
_checkpoint = _next_seqno;
}