boost::circular_buffer how to handle overwrite shift

I have 2 processes: a producer and "consumer" which still leaves the values insider the buffer and they will just be overwritten.

But having the consumer keep track is posing a bit of a problem. When the buffer is full and values get overwritten, the value pointed at index 0 is the value just ahead of the value that was just overwritten (i.e the next oldest value) and the value that was just inserted is the last index, shifting all values in between.

cb.push_back(0)
cb.push_back(1)
cb.push_back(2)

consumer reads to cb[1], cb[2] should == 2 when next read

cb.push_back(3)

cb[2] now == 1 effectively reading the old value

Interestingly iterators on circular buffer do keep the same value even when the buffer starts to get overwritten and this would work okay except if when reading you do reach the end() iterator it will always equal the end() iterator even after inserting more values, so you then have to std::prev(iter, 1) after you have finished consuming and then when you go to read again after more values have been inserted do std::next(iter, 1) so that you dont read a value you already read.

Answers


I believe circular_buffer exists precisely to abstract the iterator positioning from you.

The fact that the buffer is circular shouldn't matter to you: it's just a queue interface.

How circular_buffer wants to be used can be seen very clearly in this example: http://www.boost.org/doc/libs/1_60_0/libs/circular_buffer/example/circular_buffer_sum_example.cpp

If you somehow want that level of control, you'll either

  • want to use simpler container primitive and build your own logic
  • you could write your bounded buffer on top of circular buffer. A full example of that is here: http://www.boost.org/doc/libs/1_60_0/libs/circular_buffer/test/bounded_buffer_comparison.cpp

    The explanation mentions:

    The bounded buffer is normally used in a producer-consumer mode [...]

    [...]

    The bounded buffer::pop_back() method does not remove the item but the item is left in the circular_buffer which then replaces it with a new one (inserted by a producer) when the circular_buffer is full. This technique is more effective than removing the item explicitly by calling the circular_buffer::pop_back() method of the circular_buffer.

It sounds like it should help you a lot.

UPDATE

Here's a demo adapted to use shared memory:

#define BOOST_CB_DISABLE_DEBUG

#include <boost/circular_buffer.hpp>
#include <boost/thread/thread.hpp>
#include <boost/call_traits.hpp>
#include <boost/bind.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <iostream>

const unsigned long QUEUE_SIZE     = 1000L;
const unsigned long TOTAL_ELEMENTS = QUEUE_SIZE * 1000L;

namespace bip = boost::interprocess;

template <class T, class Alloc, typename CV = boost::condition_variable, typename Mutex = boost::mutex>
class bounded_buffer {
public:
    typedef boost::circular_buffer<T, Alloc> container_type;
    typedef typename container_type::size_type                  size_type;
    typedef typename container_type::value_type                 value_type;
    typedef typename container_type::allocator_type             allocator_type;
    typedef typename boost::call_traits<value_type>::param_type param_type;

    bounded_buffer(size_type capacity, Alloc alloc = Alloc()) : m_unread(0), m_container(capacity, alloc) {}

    void push_front(param_type item) {
        boost::unique_lock<Mutex> lock(m_mutex);

        m_not_full.wait(lock, boost::bind(&bounded_buffer::is_not_full, this));
        m_container.push_front(item);
        ++m_unread;
        lock.unlock();

        m_not_empty.notify_one();
    }

    void pop_back(value_type* pItem) {
        boost::unique_lock<Mutex> lock(m_mutex);

        m_not_empty.wait(lock, boost::bind(&bounded_buffer::is_not_empty, this));
        *pItem = m_container[--m_unread];
        lock.unlock();

        m_not_full.notify_one();
    }

private:
    bounded_buffer(const bounded_buffer&);              // Disabled copy constructor
    bounded_buffer& operator = (const bounded_buffer&); // Disabled assign operator

    bool is_not_empty() const { return m_unread > 0; }
    bool is_not_full() const { return m_unread < m_container.capacity(); }

    size_type m_unread;
    container_type m_container;
    Mutex m_mutex;
    CV m_not_empty;
    CV m_not_full;
};

namespace Shared {
    using segment = bip::managed_shared_memory;
    using smgr    = segment::segment_manager;
    template <typename T> using alloc = bip::allocator<T, smgr>;
    template <typename T> using bounded_buffer = ::bounded_buffer<T, alloc<T>, bip::interprocess_condition, bip::interprocess_mutex >;
}

template<class Buffer>
class Consumer {

    typedef typename Buffer::value_type value_type;
    Buffer* m_container;
    value_type m_item;

public:
    Consumer(Buffer* buffer) : m_container(buffer) {}

    void operator()() {
        for (unsigned long i = 0L; i < TOTAL_ELEMENTS; ++i) {
            m_container->pop_back(&m_item);
        }
    }
};

template<class Buffer>
class Producer {

    typedef typename Buffer::value_type value_type;
    Buffer* m_container;

public:
    Producer(Buffer* buffer) : m_container(buffer) {}

    void operator()() {
        for (unsigned long i = 0L; i < TOTAL_ELEMENTS; ++i) {
            m_container->push_front(value_type());
        }
    }
};

int main(int argc, char**) {
    using Buffer = Shared::bounded_buffer<int>;

    if (argc>1) {
        std::cout << "Creating shared buffer\n";
        Shared::segment mem(bip::create_only, "test_bounded_buffer", 10<<20); // 10 MiB
        Buffer* buffer = mem.find_or_construct<Buffer>("shared_buffer")(QUEUE_SIZE, mem.get_segment_manager());

        assert(buffer);

        // Initialize the buffer with some values before launching producer and consumer threads.
        for (unsigned long i = QUEUE_SIZE / 2L; i > 0; --i) {
            buffer->push_front(BOOST_DEDUCED_TYPENAME Buffer::value_type());
        }

        std::cout << "running producer\n";
        Producer<Buffer> producer(buffer);
        boost::thread(producer).join();
    } else {
        std::cout << "Opening shared buffer\n";

        Shared::segment mem(bip::open_only, "test_bounded_buffer");
        Buffer* buffer = mem.find_or_construct<Buffer>("shared_buffer")(QUEUE_SIZE, mem.get_segment_manager());

        assert(buffer);

        std::cout << "running consumer\n";
        Consumer<Buffer> consumer(buffer);
        boost::thread(consumer).join();
    }
}

When you run two processes:

time (./test producer & sleep .1; ./test; wait)
Creating shared buffer
running producer
Opening shared buffer
running consumer

real    0m0.594s
user    0m0.372s
sys 0m0.600s

Need Your Help

View source code with login (PHP)

php file google-chrome login xampp

I am trying to view the source code of a website which needs Google login.

jQuery Cookie doesn't save value

jquery cookies setcookie jquery-cookie

I have jQuery Cookie Plugin and want I to set a cookie (with expires) to execute a script after x pageviews but cookie doesnt return vlaue of pageviews but return NaN.