@@ -3,18 +3,17 @@ use std::{
3
3
sync:: Arc ,
4
4
} ;
5
5
6
- use futures:: { channel :: mpsc , select, FutureExt , SinkExt } ;
6
+ use futures:: { select, FutureExt } ;
7
7
8
8
use async_std:: {
9
9
io:: BufReader ,
10
10
net:: { TcpListener , TcpStream , ToSocketAddrs } ,
11
11
prelude:: * ,
12
12
task,
13
+ sync:: { channel, Sender , Receiver } ,
13
14
} ;
14
15
15
16
type Result < T > = std:: result:: Result < T , Box < dyn std:: error:: Error + Send + Sync > > ;
16
- type Sender < T > = mpsc:: UnboundedSender < T > ;
17
- type Receiver < T > = mpsc:: UnboundedReceiver < T > ;
18
17
19
18
#[ derive( Debug ) ]
20
19
enum Void { }
@@ -26,7 +25,7 @@ pub(crate) fn main() -> Result<()> {
26
25
async fn accept_loop ( addr : impl ToSocketAddrs ) -> Result < ( ) > {
27
26
let listener = TcpListener :: bind ( addr) . await ?;
28
27
29
- let ( broker_sender, broker_receiver) = mpsc :: unbounded ( ) ;
28
+ let ( broker_sender, broker_receiver) = channel ( 10 ) ;
30
29
let broker = task:: spawn ( broker_loop ( broker_receiver) ) ;
31
30
let mut incoming = listener. incoming ( ) ;
32
31
while let Some ( stream) = incoming. next ( ) . await {
@@ -39,7 +38,7 @@ async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
39
38
Ok ( ( ) )
40
39
}
41
40
42
- async fn connection_loop ( mut broker : Sender < Event > , stream : TcpStream ) -> Result < ( ) > {
41
+ async fn connection_loop ( broker : Sender < Event > , stream : TcpStream ) -> Result < ( ) > {
43
42
let stream = Arc :: new ( stream) ;
44
43
let reader = BufReader :: new ( & * stream) ;
45
44
let mut lines = reader. lines ( ) ;
@@ -48,15 +47,14 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
48
47
None => return Err ( "peer disconnected immediately" . into ( ) ) ,
49
48
Some ( line) => line?,
50
49
} ;
51
- let ( _shutdown_sender, shutdown_receiver) = mpsc :: unbounded :: < Void > ( ) ;
50
+ let ( _shutdown_sender, shutdown_receiver) = channel :: < Void > ( 0 ) ;
52
51
broker
53
52
. send ( Event :: NewPeer {
54
53
name : name. clone ( ) ,
55
54
stream : Arc :: clone ( & stream) ,
56
55
shutdown : shutdown_receiver,
57
56
} )
58
- . await
59
- . unwrap ( ) ;
57
+ . await ;
60
58
61
59
while let Some ( line) = lines. next ( ) . await {
62
60
let line = line?;
@@ -76,8 +74,7 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
76
74
to : dest,
77
75
msg,
78
76
} )
79
- . await
80
- . unwrap ( ) ;
77
+ . await ;
81
78
}
82
79
83
80
Ok ( ( ) )
@@ -115,23 +112,23 @@ enum Event {
115
112
from : String ,
116
113
to : Vec < String > ,
117
114
msg : String ,
118
- } ,
115
+ }
119
116
}
120
117
121
118
async fn broker_loop ( mut events : Receiver < Event > ) {
122
119
let ( disconnect_sender, mut disconnect_receiver) =
123
- mpsc :: unbounded :: < ( String , Receiver < String > ) > ( ) ;
120
+ channel :: < ( String , Receiver < String > ) > ( 2 ) ;
124
121
let mut peers: HashMap < String , Sender < String > > = HashMap :: new ( ) ;
125
122
126
123
loop {
124
+
127
125
let event = select ! {
128
- event = events. next( ) . fuse ( ) => match event {
126
+ event = events. next( ) => match event {
129
127
None => break ,
130
128
Some ( event) => event,
131
129
} ,
132
- disconnect = disconnect_receiver. next( ) . fuse ( ) => {
130
+ disconnect = disconnect_receiver. next( ) => {
133
131
let ( name, _pending_messages) = disconnect. unwrap( ) ;
134
- assert!( peers. remove( & name) . is_some( ) ) ;
135
132
continue ;
136
133
} ,
137
134
} ;
@@ -140,7 +137,7 @@ async fn broker_loop(mut events: Receiver<Event>) {
140
137
for addr in to {
141
138
if let Some ( peer) = peers. get_mut ( & addr) {
142
139
let msg = format ! ( "from {}: {}\n " , from, msg) ;
143
- peer. send ( msg) . await . unwrap ( ) ;
140
+ peer. send ( msg) . await ;
144
141
}
145
142
}
146
143
}
@@ -151,20 +148,24 @@ async fn broker_loop(mut events: Receiver<Event>) {
151
148
} => match peers. entry ( name. clone ( ) ) {
152
149
Entry :: Occupied ( ..) => ( ) ,
153
150
Entry :: Vacant ( entry) => {
154
- let ( client_sender, mut client_receiver) = mpsc :: unbounded ( ) ;
151
+ let ( client_sender, mut client_receiver) = channel ( 10 ) ;
155
152
entry. insert ( client_sender) ;
156
- let mut disconnect_sender = disconnect_sender. clone ( ) ;
153
+ let disconnect_sender = disconnect_sender. clone ( ) ;
157
154
spawn_and_log_error ( async move {
158
155
let res =
159
156
connection_writer_loop ( & mut client_receiver, stream, shutdown) . await ;
160
157
disconnect_sender
161
158
. send ( ( name, client_receiver) )
162
- . await
163
- . unwrap ( ) ;
159
+ . await ;
164
160
res
165
161
} ) ;
166
162
}
167
163
} ,
164
+ Event :: Disconnect {
165
+ name
166
+ } => {
167
+ assert ! ( peers. remove( & name) . is_some( ) ) ;
168
+ }
168
169
}
169
170
}
170
171
drop ( peers) ;
0 commit comments