The things I tested on a broadcast queue - multiqueue2

MultiQueue2 is a fast bounded mpmc queue that supports broadcast/broadcast style operations

MultiQueue was developed by Sam Schetterer, but not updated for some time. I found it very useful as it implements futures. However, it is with a few outdated library API and the use of spin locks is taking 100% CPU in many cases.

Stone Age

For any channels, if there is a potential for conflicts (more than one of the writers/readers are consuming the resources), there must be locks. So the 100% issue is very likely to be caused by a spinlock.

At first, I did it in the sleep way

wait.rslink
31
32
33
34
35
36
37
#[inline(always)]
pub fn check(seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
let cur_count = load_tagless(at);
use std::{thread, time};
thread::sleep(time::Duration::from_millis(50));
wc.load(Relaxed) == 0 || seq == cur_count || past(seq, cur_count).1
}

Nonetheless, I sacrified the throughput because I punished every check.

And then, I tried a much smaller sleep. This is just meaningless because it does not trigger CPU from P-State to S-State. And I immediately find that it may be fool to cocerce S-State in a spinlock design.

And then I tried 1ms,10ms,1000ns, for both settings of check after wait or check before wait. link. It does not make much different.

I tried the _mm_pause from SSE2 as well, but it does not make any different.

wait.rslink
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#[inline(always)]
pub fn check(seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
let cur_count = load_tagless(at);
if is_x86_feature_detected!("sse2") {
#[cfg(target_arch = "x86")]
use std::arch::x86::_mm_pause;
#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::_mm_pause;
unsafe{
_mm_pause();
}
}else{
use std::{thread, time};
thread::sleep(time::Duration::from_millis(DEFAULT_CHECK_DELAY));
}
wc.load(Relaxed) == 0 || seq == cur_count || past(seq, cur_count).1

}

Although _mm_pause is designed for spinlock, but the point is CPU will not switch P-State and S-State just for a few nono-seconds. It is simply not the right way.

Tool Age

Keep trying the trival ways will never proceed more (although it practically solved the system problem of 100% cpu usage in my program and my program is not required to be very performant too), so I stepped back and rethought about my first solution.

The first solution was actually doing less checking and forcing the cpu to sleep, so that it does not waste too much energy to do useless checkings which are 99%(roughly) predictable.

What I really want to punish are the collided conflicts, because they are very likely to be blocked again if the CPU checks in the next cycle. At this time, sleeping is better than checking.

So I make the spinlock to be a swing-back one.

wait.rslink
31
32
33
34
35
36
37
38
39
40
41
42
#[inline(always)]
pub fn check(seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
let cur_count = load_tagless(at);

if wc.load(Relaxed) == 0 || seq == cur_count || past(seq, cur_count).1 {
true
} else {
use std::{thread, time};
thread::sleep(time::Duration::from_millis(DEFAULT_CHECK_DELAY));
false
}
}

This works exactly as expected. The cpu usage lowered and the through-put is able to pass all the tests.

Futures

However, I then found that the CPU usage is high for future queues. That, there is a kind of hybrid lock used for future queues.

Therefore, I removed all the Spin before going to parking_lot::Mutex::new(VecDeque::new()). link

It increases the number of low level context switches very much but indeed lowed the CPU usage. By the nature of this heavy switch comparing to the light weight spinlock, the through-put of future queues becomes just 1/10 of the normal queue.

And this experiment also taught me the performance ratio between parking_lot mutexes and native spins.

No Silver Bullet

I really would like to come up with an equation for people to set the try_spins precisely, but it is very complicated because it includes all the envirnment information like CPU frequency, number of consumers, rate of feeding, etc.

So I can just left it to the user.

multiqueue.rslink
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
/// Usage: futures_multiqueue_with(`capacity`,`try_spins`,`yield_spins`)
/// `capacity` is the maximum item to be allowed in queue; when it is full, `Err(Full{...})` will be emitted
/// `try_spins` is a performant, low latency blocking wait for lightweight conflict solving, lower this number when your CPU usage is high.
/// `yield_spins` is still busy but slowered by `yield()`, this number can be small.
///
/// `futures_multiqueue_with(1000,0,0)` is possible, which will turn this hybrid-lock into a kernal lock.
/// Feel free to test different setting that matches your system.
pub fn futures_multiqueue_with<RW: QueueRW<T>, T>(
capacity: Index,
try_spins: usize,
yield_spins: usize,
) -> (FutInnerSend<RW, T>, FutInnerRecv<RW, T>) {
let cons_arc = Arc::new(FutWait::with_spins(try_spins, yield_spins));
let prod_arc = Arc::new(FutWait::with_spins(try_spins, yield_spins));
let (tx, rx) = MultiQueue::new_internal(capacity, cons_arc.clone());
let ftx = FutInnerSend {
writer: tx,
wait: cons_arc.clone(),
prod_wait: prod_arc.clone(),
};
let rtx = FutInnerRecv {
reader: rx,
wait: cons_arc.clone(),
prod_wait: prod_arc.clone(),
};
(ftx, rtx)
}

I would like to investigate and develop a clear equation for concurrency hybrid lock, but it seems to be a bit long and I may have it later in a Paper form.

Feel free to leave issues or comments.