Optimizing a shared buffer in a producer/consumer multithreaded environment

I have some project where I have a single producer thread which writes events into a buffer, and an additional single consumer thread which takes events from the buffer. My goal is to optimize this thing for a single dual-core machine to achieve maximum throughput.

Currently, I am using some simple lock-free ring buffer (lock-free is possible since I have only one consumer and one producer thread and therefore the pointers are only updated by a single thread).

#define BUF_SIZE 32768

struct buf_t { volatile int writepos; volatile void * buffer[BUF_SIZE]; 
    volatile int readpos;) };

void produce (buf_t *b, void * e) {
    int next = (b->writepos+1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[b->writepos] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    while (b->readpos == b->writepos); // nothing to consume. wait
    int next = (b->readpos+1) % BUF_SIZE;
    void * res = b->buffer[b->readpos]; b->readpos = next;
    return res;
}

buf_t *alloc () {
    buf_t *b = (buf_t *)malloc(sizeof(buf_t));
    b->writepos = 0; b->readpos = 0; return b;
}

However, this implementation is not yet fast enough and should be optimized further. I've tried with different BUF_SIZE values and got some speed-up. Additionaly, I've moved writepos before the buffer and readpos after the buffer to ensure that both variables are on different cache lines which resulted also in some speed.

What I need is a speedup of about 400 %. Do you have any ideas how I could achieve this using things like padding etc?

Answers


Here's one optimisation I can see: in consume(), you don't need to be fetching b->readpos continously, since the thread calling consume() is the only one that can update it anyway. Because it's volatile, the compiler can't optimise all those fetches away, so you'll need to do it explicitly:

void * consume (buf_t *b) {
    int rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    int next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[rp]; b->readpos = next;
    return res;
}

You should also step through your buffer in strides of at least a cacheline each, otherwise you'll get cachelines ping-ponging between the two CPUs (as one CPU wants the cacheline to read b->buffer[n] and 15 times out of 16 the other invalidates it to write b->buffer[n+1]). Eg:

#define STRIDE 16
#define STEPS 2048
#define BUF_SIZE (STRIDE * STEPS)

#define TO_INDEX(n) (STRIDE * (((n) + 1) % STEPS) + (((n) + 1) / STEPS))

void produce (buf_t *b, void * e) {
    unsigned wp = b->writepos;
    unsigned next = (wp + 1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[TO_INDEX(wp)] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    unsigned rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    unsigned next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[TO_INDEX(rp)]; b->readpos = next;
    return res;
}

Got to be worth a try, anyway. (Note that as long as STRIDE and STEPS are powers of 2, the scary-looking division and modulus in TO_INDEX() can be optimised to a shift and a bitwise-and, but only if the operands are unsigned - hence I suggest changing those types accordingly).


I assume that you are using a machine with more than one processor or core. If not then your busy waits are going to hurt things. They may hurt anyway if you are running under an OS that decides that you don't sleep enough and bumps down your dynamic priority and there are other programs running.

You need to collect data on how full your buffer gets. At a certain point too big starts to hurt your cache too.

If you use a global array rather than allocating it from the heap then the pointer to the buffer becomes a pointer literal and both threads won't have to read that pointer value from the same place in cache because it will just be shoved into the code.

If throughput is what's important to you (at the expense of latency) and cache really is playing a big roll, then you might consider letting the consumer lag the producer so that they aren't trying to read and write from the same place in the buffer.

You may want to change the interface to your consumer function so that it can consume in cache size (or multiple of) sized chunks (this plays nice with the consumer lagging the producer suggestion I made above) in addition to individual or partial cache line chunks. Try to keep the consumption cache aligned. If you think of the available data as a snake, then it's possible for the head and the tail to be unaligned. You should only consume an unaligned tail when there is no other data to consume. If you can consume any other data in a call to consume then you should just leave the tail for the next call.

Other than that and what has been mentioned by caf I'd have to suspect that whatever is happening outside the of this code has to be playing a bigger role.


I've implemented the optimizations in the first code block of cafs answer. They actually gave some little speed up (thank you), however it is not yet enough. The second optimization which results in the cache being filled by column instead of by row results in a worse performance.

The idea of the consumer lagging behind the producer did not gave any speedup.

Now, I'm at 300%.

An additional change I have made was some hack regarding the volatile writepos and readpos variables:

void produce (void * e) {
    unsigned int oldpos = buffer.writepos;
    unsigned int next = (oldpos+1) % BUF_SIZE;
    while (next == buffer.rpos) { // rpos is not volatile
        buffer.rpos = buffer.readpos;
        usleep(1);
    }
    buffer.buffer[oldpos] = e; buffer.writepos = next;
}

and similar for the consume().

Additional changes to the struct lead to the following new buffer struct (in the global scope as it was suggested in one answer instead of on the heap).

#define STRIDE 16
#define STEPS 524288

struct buf_t {
    volatile unsigned int writepos;
    int offset [STRIDE - 1];
    unsigned int wpos;
    int offset2 [STRIDE - 1];
    volatile void * buffer[BUF_SIZE];
    int offset4 [STRIDE];
    volatile unsigned int readpos;
    int offset3 [STRIDE - 1];
    unsigned int rpos;
}

which gave the 300% speedup which was missing and pushed it below the performance limit I had to achieve.

If you have some additional hacks which could be used to increase the performance further, don't hesitate to post them also :-)

Thanks for help.


Need Your Help

Cannot copy to UIPasteBoard

ios iphone facebook

I am developing an application where I have to share photo and text to Facebook and FBMessenger, for this I am using FBSDKShareKit Framework.