std/sync/mpmc/list.rs
1//! Unbounded channel implemented as a linked list.
2
3use super::context::Context;
4use super::error::*;
5use super::select::{Operation, Selected, Token};
6use super::utils::{Backoff, CachePadded};
7use super::waker::SyncWaker;
8use crate::cell::UnsafeCell;
9use crate::marker::PhantomData;
10use crate::mem::MaybeUninit;
11use crate::ptr;
12use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
13use crate::time::Instant;
14
15// Bits indicating the state of a slot:
16// * If a message has been written into the slot, `WRITE` is set.
17// * If a message has been read from the slot, `READ` is set.
18// * If the block is being destroyed, `DESTROY` is set.
19const WRITE: usize = 1;
20const READ: usize = 2;
21const DESTROY: usize = 4;
22
23// Each block covers one "lap" of indices.
24const LAP: usize = 32;
25// The maximum number of messages a block can hold.
26const BLOCK_CAP: usize = LAP - 1;
27// How many lower bits are reserved for metadata.
28const SHIFT: usize = 1;
29// Has two different purposes:
30// * If set in head, indicates that the block is not the last one.
31// * If set in tail, indicates that the channel is disconnected.
32const MARK_BIT: usize = 1;
33
34/// A slot in a block.
35struct Slot<T> {
36 /// The message.
37 msg: UnsafeCell<MaybeUninit<T>>,
38
39 /// The state of the slot.
40 state: AtomicUsize,
41}
42
43impl<T> Slot<T> {
44 /// Waits until a message is written into the slot.
45 fn wait_write(&self) {
46 let backoff = Backoff::new();
47 while self.state.load(Ordering::Acquire) & WRITE == 0 {
48 backoff.spin_heavy();
49 }
50 }
51}
52
53/// A block in a linked list.
54///
55/// Each block in the list can hold up to `BLOCK_CAP` messages.
56struct Block<T> {
57 /// The next block in the linked list.
58 next: AtomicPtr<Block<T>>,
59
60 /// Slots for messages.
61 slots: [Slot<T>; BLOCK_CAP],
62}
63
64impl<T> Block<T> {
65 /// Creates an empty block.
66 fn new() -> Box<Block<T>> {
67 // SAFETY: This is safe because:
68 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
69 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
70 // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
71 // holds a MaybeUninit.
72 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
73 unsafe { Box::new_zeroed().assume_init() }
74 }
75
76 /// Waits until the next pointer is set.
77 fn wait_next(&self) -> *mut Block<T> {
78 let backoff = Backoff::new();
79 loop {
80 let next = self.next.load(Ordering::Acquire);
81 if !next.is_null() {
82 return next;
83 }
84 backoff.spin_heavy();
85 }
86 }
87
88 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
89 unsafe fn destroy(this: *mut Block<T>, start: usize) {
90 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
91 // begun destruction of the block.
92 for i in start..BLOCK_CAP - 1 {
93 let slot = unsafe { (*this).slots.get_unchecked(i) };
94
95 // Mark the `DESTROY` bit if a thread is still using the slot.
96 if slot.state.load(Ordering::Acquire) & READ == 0
97 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
98 {
99 // If a thread is still using the slot, it will continue destruction of the block.
100 return;
101 }
102 }
103
104 // No thread is using the block, now it is safe to destroy it.
105 drop(unsafe { Box::from_raw(this) });
106 }
107}
108
109/// A position in a channel.
110#[derive(Debug)]
111struct Position<T> {
112 /// The index in the channel.
113 index: AtomicUsize,
114
115 /// The block in the linked list.
116 block: AtomicPtr<Block<T>>,
117}
118
119/// The token type for the list flavor.
120#[derive(Debug)]
121pub(crate) struct ListToken {
122 /// The block of slots.
123 block: *const u8,
124
125 /// The offset into the block.
126 offset: usize,
127}
128
129impl Default for ListToken {
130 #[inline]
131 fn default() -> Self {
132 ListToken { block: ptr::null(), offset: 0 }
133 }
134}
135
136/// Unbounded channel implemented as a linked list.
137///
138/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
139/// represented as numbers of type `usize` and wrap on overflow.
140///
141/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
142/// improve cache efficiency.
143pub(crate) struct Channel<T> {
144 /// The head of the channel.
145 head: CachePadded<Position<T>>,
146
147 /// The tail of the channel.
148 tail: CachePadded<Position<T>>,
149
150 /// Receivers waiting while the channel is empty and not disconnected.
151 receivers: SyncWaker,
152
153 /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
154 _marker: PhantomData<T>,
155}
156
157impl<T> Channel<T> {
158 /// Creates a new unbounded channel.
159 pub(crate) fn new() -> Self {
160 Channel {
161 head: CachePadded::new(Position {
162 block: AtomicPtr::new(ptr::null_mut()),
163 index: AtomicUsize::new(0),
164 }),
165 tail: CachePadded::new(Position {
166 block: AtomicPtr::new(ptr::null_mut()),
167 index: AtomicUsize::new(0),
168 }),
169 receivers: SyncWaker::new(),
170 _marker: PhantomData,
171 }
172 }
173
174 /// Attempts to reserve a slot for sending a message.
175 fn start_send(&self, token: &mut Token) -> bool {
176 let backoff = Backoff::new();
177 let mut tail = self.tail.index.load(Ordering::Acquire);
178 let mut block = self.tail.block.load(Ordering::Acquire);
179 let mut next_block = None;
180
181 loop {
182 // Check if the channel is disconnected.
183 if tail & MARK_BIT != 0 {
184 token.list.block = ptr::null();
185 return true;
186 }
187
188 // Calculate the offset of the index into the block.
189 let offset = (tail >> SHIFT) % LAP;
190
191 // If we reached the end of the block, wait until the next one is installed.
192 if offset == BLOCK_CAP {
193 backoff.spin_heavy();
194 tail = self.tail.index.load(Ordering::Acquire);
195 block = self.tail.block.load(Ordering::Acquire);
196 continue;
197 }
198
199 // If we're going to have to install the next block, allocate it in advance in order to
200 // make the wait for other threads as short as possible.
201 if offset + 1 == BLOCK_CAP && next_block.is_none() {
202 next_block = Some(Block::<T>::new());
203 }
204
205 // If this is the first message to be sent into the channel, we need to allocate the
206 // first block and install it.
207 if block.is_null() {
208 let new = Box::into_raw(Block::<T>::new());
209
210 if self
211 .tail
212 .block
213 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
214 .is_ok()
215 {
216 self.head.block.store(new, Ordering::Release);
217 block = new;
218 } else {
219 next_block = unsafe { Some(Box::from_raw(new)) };
220 tail = self.tail.index.load(Ordering::Acquire);
221 block = self.tail.block.load(Ordering::Acquire);
222 continue;
223 }
224 }
225
226 let new_tail = tail + (1 << SHIFT);
227
228 // Try advancing the tail forward.
229 match self.tail.index.compare_exchange_weak(
230 tail,
231 new_tail,
232 Ordering::SeqCst,
233 Ordering::Acquire,
234 ) {
235 Ok(_) => unsafe {
236 // If we've reached the end of the block, install the next one.
237 if offset + 1 == BLOCK_CAP {
238 let next_block = Box::into_raw(next_block.unwrap());
239 self.tail.block.store(next_block, Ordering::Release);
240 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
241 (*block).next.store(next_block, Ordering::Release);
242 }
243
244 token.list.block = block as *const u8;
245 token.list.offset = offset;
246 return true;
247 },
248 Err(_) => {
249 backoff.spin_light();
250 tail = self.tail.index.load(Ordering::Acquire);
251 block = self.tail.block.load(Ordering::Acquire);
252 }
253 }
254 }
255 }
256
257 /// Writes a message into the channel.
258 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
259 // If there is no slot, the channel is disconnected.
260 if token.list.block.is_null() {
261 return Err(msg);
262 }
263
264 // Write the message into the slot.
265 let block = token.list.block as *mut Block<T>;
266 let offset = token.list.offset;
267 unsafe {
268 let slot = (*block).slots.get_unchecked(offset);
269 slot.msg.get().write(MaybeUninit::new(msg));
270 slot.state.fetch_or(WRITE, Ordering::Release);
271 }
272
273 // Wake a sleeping receiver.
274 self.receivers.notify();
275 Ok(())
276 }
277
278 /// Attempts to reserve a slot for receiving a message.
279 fn start_recv(&self, token: &mut Token) -> bool {
280 let backoff = Backoff::new();
281 let mut head = self.head.index.load(Ordering::Acquire);
282 let mut block = self.head.block.load(Ordering::Acquire);
283
284 loop {
285 // Calculate the offset of the index into the block.
286 let offset = (head >> SHIFT) % LAP;
287
288 // If we reached the end of the block, wait until the next one is installed.
289 if offset == BLOCK_CAP {
290 backoff.spin_heavy();
291 head = self.head.index.load(Ordering::Acquire);
292 block = self.head.block.load(Ordering::Acquire);
293 continue;
294 }
295
296 let mut new_head = head + (1 << SHIFT);
297
298 if new_head & MARK_BIT == 0 {
299 atomic::fence(Ordering::SeqCst);
300 let tail = self.tail.index.load(Ordering::Relaxed);
301
302 // If the tail equals the head, that means the channel is empty.
303 if head >> SHIFT == tail >> SHIFT {
304 // If the channel is disconnected...
305 if tail & MARK_BIT != 0 {
306 // ...then receive an error.
307 token.list.block = ptr::null();
308 return true;
309 } else {
310 // Otherwise, the receive operation is not ready.
311 return false;
312 }
313 }
314
315 // If head and tail are not in the same block, set `MARK_BIT` in head.
316 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
317 new_head |= MARK_BIT;
318 }
319 }
320
321 // The block can be null here only if the first message is being sent into the channel.
322 // In that case, just wait until it gets initialized.
323 if block.is_null() {
324 backoff.spin_heavy();
325 head = self.head.index.load(Ordering::Acquire);
326 block = self.head.block.load(Ordering::Acquire);
327 continue;
328 }
329
330 // Try moving the head index forward.
331 match self.head.index.compare_exchange_weak(
332 head,
333 new_head,
334 Ordering::SeqCst,
335 Ordering::Acquire,
336 ) {
337 Ok(_) => unsafe {
338 // If we've reached the end of the block, move to the next one.
339 if offset + 1 == BLOCK_CAP {
340 let next = (*block).wait_next();
341 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
342 if !(*next).next.load(Ordering::Relaxed).is_null() {
343 next_index |= MARK_BIT;
344 }
345
346 self.head.block.store(next, Ordering::Release);
347 self.head.index.store(next_index, Ordering::Release);
348 }
349
350 token.list.block = block as *const u8;
351 token.list.offset = offset;
352 return true;
353 },
354 Err(_) => {
355 backoff.spin_light();
356 head = self.head.index.load(Ordering::Acquire);
357 block = self.head.block.load(Ordering::Acquire);
358 }
359 }
360 }
361 }
362
363 /// Reads a message from the channel.
364 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
365 if token.list.block.is_null() {
366 // The channel is disconnected.
367 return Err(());
368 }
369
370 // Read the message.
371 let block = token.list.block as *mut Block<T>;
372 let offset = token.list.offset;
373 unsafe {
374 let slot = (*block).slots.get_unchecked(offset);
375 slot.wait_write();
376 let msg = slot.msg.get().read().assume_init();
377
378 // Destroy the block if we've reached the end, or if another thread wanted to destroy but
379 // couldn't because we were busy reading from the slot.
380 if offset + 1 == BLOCK_CAP {
381 Block::destroy(block, 0);
382 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
383 Block::destroy(block, offset + 1);
384 }
385
386 Ok(msg)
387 }
388 }
389
390 /// Attempts to send a message into the channel.
391 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
392 self.send(msg, None).map_err(|err| match err {
393 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
394 SendTimeoutError::Timeout(_) => unreachable!(),
395 })
396 }
397
398 /// Sends a message into the channel.
399 pub(crate) fn send(
400 &self,
401 msg: T,
402 _deadline: Option<Instant>,
403 ) -> Result<(), SendTimeoutError<T>> {
404 let token = &mut Token::default();
405 assert!(self.start_send(token));
406 unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
407 }
408
409 /// Attempts to receive a message without blocking.
410 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
411 let token = &mut Token::default();
412
413 if self.start_recv(token) {
414 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
415 } else {
416 Err(TryRecvError::Empty)
417 }
418 }
419
420 /// Receives a message from the channel.
421 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
422 let token = &mut Token::default();
423 loop {
424 if self.start_recv(token) {
425 unsafe {
426 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
427 }
428 }
429
430 if let Some(d) = deadline {
431 if Instant::now() >= d {
432 return Err(RecvTimeoutError::Timeout);
433 }
434 }
435
436 // Prepare for blocking until a sender wakes us up.
437 Context::with(|cx| {
438 let oper = Operation::hook(token);
439 self.receivers.register(oper, cx);
440
441 // Has the channel become ready just now?
442 if !self.is_empty() || self.is_disconnected() {
443 let _ = cx.try_select(Selected::Aborted);
444 }
445
446 // Block the current thread.
447 // SAFETY: the context belongs to the current thread.
448 let sel = unsafe { cx.wait_until(deadline) };
449
450 match sel {
451 Selected::Waiting => unreachable!(),
452 Selected::Aborted | Selected::Disconnected => {
453 self.receivers.unregister(oper).unwrap();
454 // If the channel was disconnected, we still have to check for remaining
455 // messages.
456 }
457 Selected::Operation(_) => {}
458 }
459 });
460 }
461 }
462
463 /// Returns the current number of messages inside the channel.
464 pub(crate) fn len(&self) -> usize {
465 loop {
466 // Load the tail index, then load the head index.
467 let mut tail = self.tail.index.load(Ordering::SeqCst);
468 let mut head = self.head.index.load(Ordering::SeqCst);
469
470 // If the tail index didn't change, we've got consistent indices to work with.
471 if self.tail.index.load(Ordering::SeqCst) == tail {
472 // Erase the lower bits.
473 tail &= !((1 << SHIFT) - 1);
474 head &= !((1 << SHIFT) - 1);
475
476 // Fix up indices if they fall onto block ends.
477 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
478 tail = tail.wrapping_add(1 << SHIFT);
479 }
480 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
481 head = head.wrapping_add(1 << SHIFT);
482 }
483
484 // Rotate indices so that head falls into the first block.
485 let lap = (head >> SHIFT) / LAP;
486 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
487 head = head.wrapping_sub((lap * LAP) << SHIFT);
488
489 // Remove the lower bits.
490 tail >>= SHIFT;
491 head >>= SHIFT;
492
493 // Return the difference minus the number of blocks between tail and head.
494 return tail - head - tail / LAP;
495 }
496 }
497 }
498
499 /// Returns the capacity of the channel.
500 pub(crate) fn capacity(&self) -> Option<usize> {
501 None
502 }
503
504 /// Disconnects senders and wakes up all blocked receivers.
505 ///
506 /// Returns `true` if this call disconnected the channel.
507 pub(crate) fn disconnect_senders(&self) -> bool {
508 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
509
510 if tail & MARK_BIT == 0 {
511 self.receivers.disconnect();
512 true
513 } else {
514 false
515 }
516 }
517
518 /// Disconnects receivers.
519 ///
520 /// Returns `true` if this call disconnected the channel.
521 pub(crate) fn disconnect_receivers(&self) -> bool {
522 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
523
524 if tail & MARK_BIT == 0 {
525 // If receivers are dropped first, discard all messages to free
526 // memory eagerly.
527 self.discard_all_messages();
528 true
529 } else {
530 false
531 }
532 }
533
534 /// Discards all messages.
535 ///
536 /// This method should only be called when all receivers are dropped.
537 fn discard_all_messages(&self) {
538 let backoff = Backoff::new();
539 let mut tail = self.tail.index.load(Ordering::Acquire);
540 loop {
541 let offset = (tail >> SHIFT) % LAP;
542 if offset != BLOCK_CAP {
543 break;
544 }
545
546 // New updates to tail will be rejected by MARK_BIT and aborted unless it's
547 // at boundary. We need to wait for the updates take affect otherwise there
548 // can be memory leaks.
549 backoff.spin_heavy();
550 tail = self.tail.index.load(Ordering::Acquire);
551 }
552
553 let mut head = self.head.index.load(Ordering::Acquire);
554 // The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts
555 // to initialize the first block before noticing that the receivers disconnected. Late allocations
556 // will be deallocated by the sender in Drop.
557 let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
558
559 // If we're going to be dropping messages we need to synchronize with initialization
560 if head >> SHIFT != tail >> SHIFT {
561 // The block can be null here only if a sender is in the process of initializing the
562 // channel while another sender managed to send a message by inserting it into the
563 // semi-initialized channel and advanced the tail.
564 // In that case, just wait until it gets initialized.
565 while block.is_null() {
566 backoff.spin_heavy();
567 block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
568 }
569 }
570 // After this point `head.block` is not modified again and it will be deallocated if it's
571 // non-null. The `Drop` code of the channel, which runs after this function, also attempts
572 // to deallocate `head.block` if it's non-null. Therefore this function must maintain the
573 // invariant that if a deallocation of head.block is attemped then it must also be set to
574 // NULL. Failing to do so will lead to the Drop code attempting a double free. For this
575 // reason both reads above do an atomic swap instead of a simple atomic load.
576
577 unsafe {
578 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
579 while head >> SHIFT != tail >> SHIFT {
580 let offset = (head >> SHIFT) % LAP;
581
582 if offset < BLOCK_CAP {
583 // Drop the message in the slot.
584 let slot = (*block).slots.get_unchecked(offset);
585 slot.wait_write();
586 let p = &mut *slot.msg.get();
587 p.as_mut_ptr().drop_in_place();
588 } else {
589 (*block).wait_next();
590 // Deallocate the block and move to the next one.
591 let next = (*block).next.load(Ordering::Acquire);
592 drop(Box::from_raw(block));
593 block = next;
594 }
595
596 head = head.wrapping_add(1 << SHIFT);
597 }
598
599 // Deallocate the last remaining block.
600 if !block.is_null() {
601 drop(Box::from_raw(block));
602 }
603 }
604
605 head &= !MARK_BIT;
606 self.head.index.store(head, Ordering::Release);
607 }
608
609 /// Returns `true` if the channel is disconnected.
610 pub(crate) fn is_disconnected(&self) -> bool {
611 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
612 }
613
614 /// Returns `true` if the channel is empty.
615 pub(crate) fn is_empty(&self) -> bool {
616 let head = self.head.index.load(Ordering::SeqCst);
617 let tail = self.tail.index.load(Ordering::SeqCst);
618 head >> SHIFT == tail >> SHIFT
619 }
620
621 /// Returns `true` if the channel is full.
622 pub(crate) fn is_full(&self) -> bool {
623 false
624 }
625}
626
627impl<T> Drop for Channel<T> {
628 fn drop(&mut self) {
629 let mut head = self.head.index.load(Ordering::Relaxed);
630 let mut tail = self.tail.index.load(Ordering::Relaxed);
631 let mut block = self.head.block.load(Ordering::Relaxed);
632
633 // Erase the lower bits.
634 head &= !((1 << SHIFT) - 1);
635 tail &= !((1 << SHIFT) - 1);
636
637 unsafe {
638 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
639 while head != tail {
640 let offset = (head >> SHIFT) % LAP;
641
642 if offset < BLOCK_CAP {
643 // Drop the message in the slot.
644 let slot = (*block).slots.get_unchecked(offset);
645 let p = &mut *slot.msg.get();
646 p.as_mut_ptr().drop_in_place();
647 } else {
648 // Deallocate the block and move to the next one.
649 let next = (*block).next.load(Ordering::Relaxed);
650 drop(Box::from_raw(block));
651 block = next;
652 }
653
654 head = head.wrapping_add(1 << SHIFT);
655 }
656
657 // Deallocate the last remaining block.
658 if !block.is_null() {
659 drop(Box::from_raw(block));
660 }
661 }
662 }
663}