Implementing a Thread-Safe Queue using Condition Variables (Updated)

Tuesday, 16 September 2008

One problem that comes up time and again with multi-threaded code is how to transfer data from one thread to another. For example, one common way to parallelize a serial algorithm is to split it into independent chunks and make a pipeline — each stage in the pipeline can be run on a separate thread, and each stage adds the data to the input queue for the next stage when it's done. For this to work properly, the input queue needs to be written so that data can safely be added by one thread and removed by another thread without corrupting the data structure.

Basic Thread Safety with a Mutex

The simplest way of doing this is just to put wrap a non-thread-safe queue, and protect it with a mutex (the examples use the types and functions from the upcoming 1.35 release of Boost):

template<typename Data>
class concurrent_queue
{
private:
    std::queue<Data> the_queue;
    mutable boost::mutex the_mutex;
public:
    void push(const Data& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    Data& front()
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.front();
    }
    
    Data const& front() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.front();
    }

    void pop()
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.pop();
    }
};

This design is subject to race conditions between calls to empty, front and pop if there is more than one thread removing items from the queue, but in a single-consumer system (as being discussed here), this is not a problem. There is, however, a downside to such a simple implementation: if your pipeline stages are running on separate threads, they likely have nothing to do if the queue is empty, so they end up with a wait loop:

    while(some_queue.empty())
    {
        boost::this_thread::sleep(boost::posix_time::milliseconds(50));
    }

Though the sleep avoids the high CPU consumption of a direct busy wait, there are still some obvious downsides to this formulation. Firstly, the thread has to wake every 50ms or so (or whatever the sleep period is) in order to lock the mutex, check the queue, and unlock the mutex, forcing a context switch. Secondly, the sleep period imposes a limit on how fast the thread can respond to data being added to the queue — if the data is added just before the call to sleep, the thread will wait at least 50ms before checking for data. On average, the thread will only respond to data after about half the sleep time (25ms here).

Waiting with a Condition Variable

As an alternative to continuously polling the state of the queue, the sleep in the wait loop can be replaced with a condition variable wait. If the condition variable is notified in push when data is added to an empty queue, then the waiting thread will wake. This requires access to the mutex used to protect the queue, so needs to be implemented as a member function of concurrent_queue:

template<typename Data>
class concurrent_queue
{
private:
    boost::condition_variable the_condition_variable;
public:
    void wait_for_data()
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }
    }
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        bool const was_empty=the_queue.empty();
        the_queue.push(data);
        if(was_empty)
        {
            the_condition_variable.notify_one();
        }
    }
    // rest as before
};

There are three important things to note here. Firstly, the lock variable is passed as a parameter to wait — this allows the condition variable implementation to atomically unlock the mutex and add the thread to the wait queue, so that another thread can update the protected data whilst the first thread waits.

Secondly, the condition variable wait is still inside a while loop — condition variables can be subject to spurious wake-ups, so it is important to check the actual condition being waited for when the call to wait returns.

Be careful when you notify

Thirdly, the call to notify_one comes after the data is pushed on the internal queue. This avoids the waiting thread being notified if the call to the_queue.push throws an exception. As written, the call to notify_one is still within the protected region, which is potentially sub-optimal: the waiting thread might wake up immediately it is notified, and before the mutex is unlocked, in which case it will have to block when the mutex is reacquired on the exit from wait. By rewriting the function so that the notification comes after the mutex is unlocked, the waiting thread will be able to acquire the mutex without blocking:

template<typename Data>
class concurrent_queue
{
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        bool const was_empty=the_queue.empty();
        the_queue.push(data);

        lock.unlock(); // unlock the mutex

        if(was_empty)
        {
            the_condition_variable.notify_one();
        }
    }
    // rest as before
};

Reducing the locking overhead

Though the use of a condition variable has improved the pushing and waiting side of the interface, the interface for the consumer thread still has to perform excessive locking: wait_for_data, front and pop all lock the mutex, yet they will be called in quick succession by the consumer thread.

By changing the consumer interface to a single wait_and_pop function, the extra lock/unlock calls can be avoided:

template<typename Data>
class concurrent_queue
{
public:
    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }
        
        popped_value=the_queue.front();
        the_queue.pop();
    }

    // rest as before
};

Using a reference parameter to receive the result is used to transfer ownership out of the queue in order to avoid the exception safety issues of returning data by-value: if the copy constructor of a by-value return throws, then the data has been removed from the queue, but is lost, whereas with this approach, the potentially problematic copy is performed prior to modifying the queue (see Herb Sutter's Guru Of The Week #8 for a discussion of the issues). This does, of course, require that an instance Data can be created by the calling code in order to receive the result, which is not always the case. In those cases, it might be worth using something like boost::optional to avoid this requirement.

Handling multiple consumers

As well as removing the locking overhead, the combined wait_and_pop function has another benefit — it automatically allows for multiple consumers. Whereas the fine-grained nature of the separate functions makes them subject to race conditions without external locking (one reason why the authors of the SGI STL advocate against making things like std::vector thread-safe — you need external locking to do many common operations, which makes the internal locking just a waste of resources), the combined function safely handles concurrent calls.

If multiple threads are popping entries from a full queue, then they just get serialized inside wait_and_pop, and everything works fine. If the queue is empty, then each thread in turn will block waiting on the condition variable. When a new entry is added to the queue, one of the threads will wake and take the value, whilst the others keep blocking. If more than one thread wakes (e.g. with a spurious wake-up), or a new thread calls wait_and_pop concurrently, the while loop ensures that only one thread will do the pop, and the others will wait.

Update: As commenter David notes below, using multiple consumers does have one problem: if there are several threads waiting when data is added, only one is woken. Though this is exactly what you want if only one item is pushed onto the queue, if multiple items are pushed then it would be desirable if more than one thread could wake. There are two solutions to this: use notify_all() instead of notify_one() when waking threads, or to call notify_one() whenever any data is added to the queue, even if the queue is not currently empty. If all threads are notified then the extra threads will see it as a spurious wake and resume waiting if there isn't enough data for them. If we notify with every push() then only the right number of threads are woken. This is my preferred option: condition variable notify calls are pretty light-weight when there are no threads waiting. The revised code looks like this:

template<typename Data>
class concurrent_queue
{
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
        lock.unlock();
        the_condition_variable.notify_one();
    }
    // rest as before
};

There is one benefit that the separate functions give over the combined one — the ability to check for an empty queue, and do something else if the queue is empty. empty itself still works in the presence of multiple consumers, but the value that it returns is transitory — there is no guarantee that it will still apply by the time a thread calls wait_and_pop, whether it was true or false. For this reason it is worth adding an additional function: try_pop, which returns true if there was a value to retrieve (in which case it retrieves it), or false to indicate that the queue was empty.

template<typename Data>
class concurrent_queue
{
public:
    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }
        
        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }

    // rest as before
};

By removing the separate front and pop functions, our simple naive implementation has now become a usable multiple producer, multiple consumer concurrent queue.

The Final Code

Here is the final code for a simple thread-safe multiple producer, multiple consumer queue:

template<typename Data>
class concurrent_queue
{
private:
    std::queue<Data> the_queue;
    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
        lock.unlock();
        the_condition_variable.notify_one();
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }
        
        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }
        
        popped_value=the_queue.front();
        the_queue.pop();
    }

};

Posted by Anthony Williams
[/ threading /] permanent link
Tags: , , ,

| Stumble It! stumbleupon logo | Submit to Reddit reddit logo | Submit to DZone dzone logo

Comment on this post

If you liked this post, why not subscribe to the RSS feed RSS feed or Follow me on Twitter? You can also subscribe to this blog by email using the form on the right.

75 Comments

I had a go a implementing one of these with pthreads, not as neat as yours! <code> #ifndef __SYNCQUEUE_H #define __SYNCQUEUE_H

#include <stdio.h> #include <pthread.h>

template <class T> class ListNode { public: T item; ListNode<T> *next; };

template <class T> class SyncQueue { public: SyncQueue() { head = NULL; tail = NULL; size = 0; pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond, NULL); }

bool enqueue(const T &item) { if (pthread_mutex_lock(&mutex) != 0) { perror("Error! Couldn't lock mutex."); return false; }

ListNode<T> *node = new ListNode<T>; node->item = item;

if (size == 0) { head = tail = node; } else { tail->next = node; tail = tail->next; } size++;

if (pthread_cond_broadcast(&cond) != 0) { perror("cond broadcast error."); return false; } if (pthread_mutex_unlock(&mutex) != 0) { perror("couldn't unlock mutex."); return false; } return true; }

bool dequeue(T &ret_item) { pthread_mutex_lock(&mutex);

while (size < 1) { pthread_cond_wait(&cond, &mutex); }

if (size == 0) return false;

//printf("Queue size: %d\n", size); ret_item = head->item; ListNode<T> *t_node = head->next; delete head; head = t_node; size--;

pthread_mutex_unlock(&mutex); return true; }

int size;

private: ListNode<T> *head; ListNode<T> *tail; pthread_mutex_t mutex; pthread_cond_t cond; }; #endif </code>

by hacama at 18:56:59 on Thursday, 28 August 2008

Great article.

One behavior I can't understand is the following. Assume another thread is producing data and putting it on q, and the code below is the consumer:

concurrent_queue<Data> q; ... Data d; while (1) { q.wait_and_pop(d); do_something_with(d); }

runs significantly slower (for producer that produces a certain number of Data items) than:

concurrent_queue<Data> q; ... Data d; while (1) { while(q.empty()) { boost::this_thread::sleep(boost::posix_time::milliseconds(50)); }

q.wait_and_pop(d); do_something_with(d); }

How can this be?

Frank

by Franklin Perry at 22:13:06 on Thursday, 28 August 2008

Unfortunately, your design as well as implementation is faulty - crash guaranteed if you actually test it.

Implementation is wrong You are locking the mutex recursively (in empty() check).

Design is wrong because for all intents and purposes, this is a serial queue - all access takes an exclusive lock, so parallelization only achieves serial waiting for all users of the queue.
by ohell at 01:00:11 on Friday, 29 August 2008

@ohell:

Check again, there is no recursive locking, and yes I did test it. You are right that it serializes all users, but that's the best you can do with one mutex. You can do better with two mutexes, as you can allow a simultaneous push and pop.

@Frank:

What system are you testing this on? Also, what are you measuring when you say "runs slower"? Overall execution time?

As @ohell points out, this queue essentially serializes the pushes and the pops. If the mutex is highly contended, this may slow things down. If the popping thread sleeps for a bit, the pushing thread might get more than one item pushed without having to fight for the mutex, and improve the overall performance. However, it will likely increase the latency between the push and the pop.

by Anthony Williams at 09:00:15 on Friday, 29 August 2008

@ohell:

I've tested as well, and it works as advertised.

@anthony:

Sorry, should've been more clear: overall execution time is what I meant. I, too, was thinking of an explanation along the lines of mutex contention, but I still can't see why the popping thread waiting would help in that case. Since wait() releases the mutex, from the pusher's perspective there is no difference between the popper being in sleep() or wait(), right?

That said, I'm working up some more tests that are outside my application to see if I can isolate the problem a little better. Wouldn't be the first time the problem was somewhere else in the code... perhaps adding the sleep() is having an unusual side effect somewhere else in the app. I'll let you know what I find.

Frank

by Franklin Perry at 15:05:16 on Friday, 29 August 2008

Hi,

I'm relatively new to the whole multithreading business and currently looking for an implementation of a producer-consumer architecture... I think.

I wanted to ask, could I just copy the code presented here and maybe use it like that? Would there be any licensing issue?

Cheers!

by David at 23:46:42 on Thursday, 04 September 2008

Hi David,

Yes, you can just copy the code presented here and use it for whatever you like. There won't be any licensing issues. I'm glad you find it helpful.

by Anthony Williams at 10:05:40 on Friday, 05 September 2008

Cool, thanks!

by David at 12:45:18 on Friday, 05 September 2008

Okay, I think I have found an issue when multiple consumers use the queue. Assume all consumers are waiting for new data to be pushed onto the queue. When the producer then pushes multiple items in short succession, i.e. so quick that the first consumer to wake up cannot empty the queue again, then the_condition_variable.notify_one() is only called once (because it is blocked by the 'was_empty if' later). It seems to work for me if I replace notify_one() by notify_all().

Btw, I hope that all consumer threads waking up at the same is not a problem, but as far as I understand the notification mechanism, only one of them will acquire control over the mutex...

Let me know what you think...
by David at 23:14:37 on Sunday, 07 September 2008

Hi David,

You're right. Thanks for spotting that. I guess my testing was not exhaustive enough :-(

The only impact of waking all the consumers is that they consume CPU time: if there's nothing in the queue they just treat it as a spurious wake and go back to sleep.

by Anthony Williams at 08:00:20 on Monday, 08 September 2008

push() unlocks the mutex before notifying the condition variable. This gives an opportunity for another push() thread to grab the mutex before notifying the pop() thread.

If you have several threads pushing, and these push threads have lower priority than the pop() thread(s) - it would seem you (could) have a priority inversion - at the extreme, the pop() thread would never wake.

by Ray at 13:23:30 on Friday, 10 October 2008

My previous comment is based on using a real-time priority-preemtive scheduler (no round-robin variant) - I forgot to state that.

by Ray at 13:32:03 on Friday, 10 October 2008

Hi Ray,

By unlocking the mutex before notifying the condition variable, we do indeed allow another thread to acquire the mutex in order to push a new value on the queue. When the popping thread wakes it will block on the mutex until the new push() thread unlocks the mutex again.

If the popping thread is high priority and the pushing thread low priority, then you could have a temporary priority inversion, but that's a natural consequence of using a single mutex for push and pop. However, once the popping thread has blocked on the mutex, the scheduler will wake it as soon as the push thread unlocks the mutex, so it will only ever have to wait for one push().

On the flip side, if the popping thread is waiting on the condition variable, by unlocking the mutex before we call notify the (high priority) popping thread can wake and acquire the mutex immediately upon the call to notify, rather than having to wake and then go back to sleep because it still can't acquire the mutex.

by Anthony Williams at 13:42:14 on Friday, 10 October 2008

Hi,

Great article! But it would be really helpful if you could show a pthreads version (i.e. no dependency on boost) ... I think more people are familiar with pthreads than with boost/thread

Thanks

by AMS at 22:54:20 on Monday, 01 December 2008

Hi, I've found this code really useful, but am confused about the wait_and_pop() procedure.

In it, you return a reference to the first element, using: popped_value=the_queue.front();

But this is immediately followed by the_queue.pop();

When I'm using this code, I use it like so:

Data d; myQueue.wait_and_pop(d); cout << d.a_string;

But when I do this, I get a crash in the destructor of my Data instance at the time of the pop(). Why is pop() drying to call my destructor? And if that's what the expected behavior is, then how am I supposed to actually use the object stored in the queue? The moment I release it, it's deallocated.

Then again, my C++ is very rusty, and I might be making an incorrect assumption about the behavior...

Thanks!

by jimt at 01:41:52 on Thursday, 11 December 2008

Hi jimt,

Thanks for your comment. wait_and_pop() does not return a reference to the first element.

popped_value=the_queue.front() *copies* the variable referenced by the_queue.front() to variable referenced by popped_value using the copy-assignment operator. The references remain pointing to the same elements. This is immediately followed by the_queue.pop(), since this is required to remove the element from the queue.

pop() calls the destructor to destroy the element in the queue. That's fine, because you have a *copy* of that element in your variable d. If this is crashing, it is because you have a bug in your copy-assignment operator or your destructor. For example, have you got a pointer to dynamic storage which is being copied as a raw pointer without reference counting, and so being double-deleted?

by Anthony Williams at 08:02:41 on Thursday, 11 December 2008

Hi, good idea on template thread-safe cross-platfrm queue, which I need to implement for a project. This article (link) does a Windows specific void* queue, and discusses important issues:

http://www.codeproject.com/KB/threads/semaphores.aspx

I don't see use of a semaphore in the waiting mechanism, and then I see various notes about problems. I suspect this needs to be written in terms of a semiphore. Here is what that article says about that:

** There's no substitute for a Semaphore If you think you have invented a clever, faster, more efficient, easier, or whatever way of doing a semaphore without actually using a Semaphore, the chances approach unity that you have simply fooled yourself. Read Dijkstra's earlier papers where he was developing the notion of synchronization primitives that were preemptive-threading safe, and there was no InterlockedIncrement operation to help him. These are complex papers; the techniques are subtle. Only if you fully understand the issues of synchronization should you even consider trying something like this. The rest of the time, particularly if you are new to parallelism and synchronization, take this as a rule: you haven't a clue as to how to create a semaphore effect without using semaphores. I've been doing this professionally for a quarter century and I don't feel confident trying to fake out a semaphore's functionality with some other mechanism. Trust me in this: You Don't Want To Go There. **

I'm thinking of taking your basic idea, but putting in terms of semaphore like that article. I don't think there will be too many changes.

by G Elliott at 00:24:38 on Friday, 23 January 2009

OK, the action of the "condition_variable" and the mutex implements some sort of equivalent of the semaphore. Of course mutex is a special kind of semaphore, usually used for a slightly different purpose, and likewise the wake action of the "condition_variable" is also like a binary (one count) semaphore. Though it may have spurious wakes, it seems to avoid the problem of tracking counts in the article I pointed to, that of the semaphore count tracking the queue count. Here only the queue keeps track of the count, and mutex blocks to only one section of code accessing the queue at a time. Spurious wakes only occur if a parallel thread happens to ask for queue item before the condition_variable sleeping thread can get to it, a slim timing event between producing and consuming.

Oh, a note on the crash issue noted by someone: One must also be careful about thread safeness of the items stored in the queue. Copy of an allocated and managed item (like string class) must be thread safe. For example a possible string class might delete string allocation after last holder was deleted, but keep a count of how many holders contained the actual string memory pointer. The string class is the item copied, not the string memory itself. If such class used pointers as this, the temporary condition is the count of pointers goes to 2 until the other holder is deleted. But imagine another thread also doing such copy and delete operations, having thus 3 and 4 pointers to the same actual memory of the string, across 2 threads. Only thread-safe string class using its own mutex can manage this without getting mixed up.

by G. Elliott at 09:13:41 on Friday, 23 January 2009

Could you send me example of usage your concurrent_queue? 1. One thread put elements to queue 2. Second thread get and process elements from queue (is not empty) 3. End of program when queue is empty and some flag was setup

Something like that with concurrent_queue: http://www.codeguru.com/forum/showpost.php? p=1317302&postcount=17

Best regards, Mariusz
by Mariusz K at 19:43:26 on Tuesday, 10 February 2009

Could you explain why you lock in the functions: empty(), try_pop(Data& popped_value) and wait_and_pop(Data& popped_value)? I only understand why you lock in the push(Data const& data) function because there you also unlock. Who will unlock the lock if i use for example the empty() function? If i use the empty() function and then calls the push() function, wont that mean that the lock is still held by the empty function which prevents me from pushing an item on to the queue.

I would like to have an explanation, i want to learn.

Regards, Greg

by Greg Hall at 08:26:26 on Monday, 09 March 2009

Hi Greg,

The locks are there to protect the shared data. Without the locks it is not safe to access the internal queue.

The lock is automatically released when the boost::mutex::scoped_lock object is destroyed at the closing brace of the enclosing block. This is an example of the RAII idiom.

by Anthony Williams at 09:49:15 on Monday, 09 March 2009
It's really "boost::condition" not "boost::condition_variable", your code won't compile spelled the latter way.
by mark at 20:26:54 on Tuesday, 07 April 2009

Correction, there's no condition_variable defined in my version of boost (1.34.1) but they've added it later. If you include to condition.hpp, it includes condition_variable.hpp if you have boost 1.38

by mark at 20:55:01 on Tuesday, 07 April 2009

Hi Mark,

True, there's no boost::condition_variable in boost 1.34 --- I added it in boost 1.35.

by Anthony Williams at 21:32:55 on Tuesday, 07 April 2009

Thanks for this article!

by C at 04:29:24 on Friday, 25 September 2009

Great article! I was kinda suprised, because I had just implemented a queue almost exactly the same way as you did in the first part of the article. However, I thought using boost::sleep in the loop was a nasty solution and was googling for another way to do things when I came over you'r article. Thanks!

by SZH at 00:39:35 on Saturday, 31 October 2009

How do you make your program clean up properly on exit if it is waiting on the wait_and_pop function?

by Claude at 22:49:02 on Sunday, 29 November 2009

Wait_and_pop would not release the main lock on the queue till the queue has some data. Push needs the lock to push data in the queue. Wont this result in a deadlock?

by sonia at 23:23:27 on Saturday, 05 December 2009

my bad. dint see its the same lock u r using in the condition var.

by sonia at 23:24:56 on Saturday, 05 December 2009

Hi,

I have two observations to make

1)The lock should be owned by the thread that calls notify. lock.unlock(); the_condition_variable.notify_one(); @see: http://linux.die.net/man/3/pthread_cond_signal ", if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal(). "

2)Is more efficient if you call notify_all (from push(...)) only if the Q has ONE element. In other words if it was empty prior to the push.

by Dan at 23:35:32 on Friday, 05 February 2010

That's for sharing. I've found your article extremely useful for understanding these concepts.

by Julian at 09:37:16 on Tuesday, 02 March 2010

Thank you very much! Especially for that "Be careful when you notify" section! It speeded up things greatly.

by Enjo at 14:02:55 on Wednesday, 03 March 2010

Hi, thank you for this excellent article and code. This is the blocking queue I have been looking for. Allow me to ask one question, is there any plan to add cancel() to this queue model? I think it is needed to wakeup all waiting consumer threads and let them have a chance to exit when the program is in the middle of exiting.

Best regards.

by ning at 09:12:29 on Wednesday, 21 April 2010
In order to cancel waiting threads, I would typically just push a special item on the queue that they will acknowledge as an "all done" message. e.g. a NULL pointer.
by Anthony Williams at 17:04:18 on Thursday, 22 April 2010

If you have multiple threads waiting and you push your "all done" item onto the queue, only one of the threads can pick it up. The others would still be waiting. I guess you _could_ push one "all done" item on the queue for every thread that's reading the queue. As long as you make sure to make the thread quit at that point so that no thread picks up more than one "all done" item, I guess that would work. Thanks for the idea.

by Rich Stephens at 14:22:21 on Monday, 26 April 2010

The Queue here is only taking care of undeflow or empty queue. What if we have a fixed buffer size and we have to prevent Overflow? I guess then we will have to write void wait_and_push(Data& popped_value) on the same line as of void wait_and_pop(Data& popped_value) and will have to use separate condition_variable for that.

by Aditya at 16:33:18 on Sunday, 09 May 2010

I've put a version with a cancel function in it online: http://codepad.org/By84d0X5 Its untested .. but shows how I'd handle cancelation.

by Mike Anderson at 04:40:21 on Monday, 19 July 2010

Nice one Mike. You need to put the test for cancellation inside the loop in wait_and_pop(), otherwise waiting threads won't exit the loop when they're woken by the cancel() function --- they'll just find that the queue is still empty and go back to sleep.

by Anthony Williams at 07:53:50 on Monday, 19 July 2010

I've updated the code now. Also cancelling an already cancelled queue now thows the Cancelled exception. And it now lives on github so anyone can fork it and my changes will be more trackable. (As I'm about to start using it in a real application) http://gist.github.com/482342

by Mike Anderson at 05:41:06 on Tuesday, 20 July 2010

@Aditya: it's quite simple to implement overflow guards: you don't want to use a separate condition_variable for that, for better understanding just rename the actual one to something more meaningful such as "size_changed". Then you'll need to notify every time the size has changed, not just in the push method but also in try_pop and wait_and_pop.

by Manuel Bua at 10:12:39 on Friday, 20 August 2010

Hi, I am new to LINUX C++ development, I am trying to use your Link List in my multithreaded application but when I try to compile this class then it gave me an error "'condition_variable' in name space boost does not name a type". when I try to find this header file in include I haven't found boost::condition_variable in boost 1.44 how to over come the error.

Please help me out.

by vikram Varma at 08:05:03 on Tuesday, 19 October 2010

boost::condition_variable is in the <boost/thread/condition_variable.hpp> header.

by Anthony Williams at 08:35:02 on Tuesday, 19 October 2010

Thanks a lot for quick response.

<boost/thread/condition_variable.hpp> I have tried to find this header file but didn't find. I am using RHEL 5 x86_64. with "gcc" compiler.

Please help.

by Vikram Varma at 09:57:27 on Tuesday, 19 October 2010

You will have to install boost.thread for your compiler. I don't know whether there is a prebuilt package for RHEL, but if there is then it will probably install boost so that the headers are in /usr/include/boost or /usr/local/include/boost. If there isn't a prebuilt package then you'll have to download and install boost yourself from http://boost.org Then you need to make sure that the headers are on the include path for your compiler, and the libraries on the link path

by Anthony Williams at 10:10:39 on Tuesday, 19 October 2010
I have compiled concurrent_queue class successfully and when I used it then there is no compiletime error it was working fine untill and unless I haven't use the function "wait_and_pop". It is giving me error "undefined reference to boost::detail::::get_current_thread_data()".
by vikram varma at 17:02:54 on Wednesday, 20 October 2010

@Vikram: Follow the instructions at http://www.boost.org/doc/libs/1_44_0/more/getting_started/unix-variants.html to ensure you are installing and using boost correctly.

by Anthony Williams at 08:42:13 on Thursday, 21 October 2010

Hi, I am following the your suggetions provided by you at "http://boost.2283326.n4.nabble.com/subject-Thread-undefined-reference-to-boost-thread-thread-td2583741.html" actually now I caught the problem, the library "libboost_thread-gcc41-mt-1_xx.a" is not creating. I am following the boost document also but didn't get any clue, How can I get the multithread support library following are the libray that are creating after building of Boost Thread SRC. 1. libboosr_thread.a 1. libboosr_thread.so.1.44.0 1. libboosr_thread.so

by vikram Varma at 11:30:37 on Thursday, 21 October 2010

The problem is solved..... I have download the boost version 1.37 and build it, It create me "mt" library. but still am not understanding that why it was not working with version 1.44.

by vikram varma at 14:56:37 on Thursday, 21 October 2010

Hello Mr Anthony I need to replace our currently used windows message queues to pass data between modules in a statically bound executable. Could you suggest a library for the same? Would boost::interprocess::queue be a good solution? Thanks bornlibra23

by bornlibra23 at 11:57:08 on Thursday, 28 October 2010

How do I create a vector of concurrent_queue's. When I try to set the size I get a compilation error. i.e.

e.g. Compiling this:

#include "concurrentqueue.h" #include <vector> #include <stdio.h>

class Packet { public: Packet() {} ~Packet() {} };

typedef concurrent_queue<Packet*> InputQueue;

int main(int argc, char* argv[]) { std::vector<InputQueue > inp_queue(5);

inp_queue[0].push(new Packet()); return 0; }

Produces this:

In file included from /usr/include/c++/4.4/deque:63, from /usr/include/c++/4.4/queue:61, from concurrentqueue.h:3, from test.cpp:1: /usr/include/c++/4.4/bits/stl_construct.h: In function ‘void std::_Construct(_T1*, const _T2&) [with _T1 = concurrent_queue<Packet*>, _T2 = concurrent_queue<Packet*>]’: /usr/include/c++/4.4/bits/stl_uninitialized.h:187: instantiated from ‘static void std::__uninitialized_fill_n<<anonymous> >::uninitialized_fill_n(_ForwardIterator, _Size, const _Tp&) [with _ForwardIterator = concurrent_queue<Packet*>*, _Size = long unsigned int, _Tp = concurrent_queue<Packet*>, bool <anonymous> = false]’ /usr/include/c++/4.4/bits/stl_uninitialized.h:223: instantiated from ‘void std::uninitialized_fill_n(_ForwardIterator, _Size, const _Tp&) [with _ForwardIterator = concurrent_queue<Packet*>*, _Size = long unsigned int, _Tp = concurrent_queue<Packet*>]’ /usr/include/c++/4.4/bits/stl_uninitialized.h:318: instantiated from ‘void std::__uninitialized_fill_n_a(_ForwardIterator, _Size, const _Tp&, std::allocator<_Tp2>&) [with _ForwardIterator = concurrent_queue<Packet*>*, _Size = long unsigned int, _Tp = concurrent_queue<Packet*>, _Tp2 = concurrent_queue<Packet*>]’ /usr/include/c++/4.4/bits/stl_vector.h:1035: instantiated from ‘void std::vector<_Tp, _Alloc>::_M_fill_initialize(size_t, const _Tp&) [with _Tp = concurrent_queue<Packet*>, _Alloc = std::allocator<concurrent_queue<Packet*> >]’ /usr/include/c++/4.4/bits/stl_vector.h:230: instantiated from ‘std::vector<_Tp, _Alloc>::vector(size_t, const _Tp&, const _Alloc&) [with _Tp = concurrent_queue<Packet*>, _Alloc = std::allocator<concurrent_queue<Packet*> >]’ test.cpp:18: instantiated from here /usr/include/c++/4.4/bits/stl_construct.h:74: error: no matching function for call to ‘concurrent_queue<Packet*>::concurrent_queue(const concurrent_queue<Packet*>&)’ concurrentqueue.h:12: note: candidates are: concurrent_queue<Packet*>::concurrent_queue() concurrentqueue.h:12: note: concurrent_queue<Packet*>::concurrent_queue(concurrent_queue<Packet*>&)

Resize is no better.

by Matt at 02:11:28 on Monday, 01 November 2010

Hi, great class it works well. I've added a copy constructor and assignment operator to mine in order to allow this to be added into a vector. i.e. a vector of concurrent queues. That way I am able to route input to several worker threads and I can dynamically create as many threads as needed.

by Matt at 03:21:13 on Monday, 01 November 2010

I have written a producer-consumer queue and I found the following enormously useful:

- multi_push() and flush() multi_push() allows a producer to put more than one item on the queue at a time with just one action flush() allows a consumer to take all the items off the queue in one go.

This can reduce the number of locking actions.

- broadcast()

A producer may broadcast to wake up any consumers, which leads to:

- status return to your wait_and_pop() command and an optional time-out as a parameter. Return value informs the user why they were interrupted. This would generally be one of 3 reasons: got data, broadcast or timed-out. How the consumer handles a broadcast is situation-driven. Obviously the "flush()" function I mentioned has the same properties.
by Neil at 15:21:02 on Tuesday, 09 November 2010

Hi, I have a single-producer single-consumer thread. so I'm thinking of just using the code at the 1st section "Basic Thread Safety with a Mutex". The consumer would sleep indefinitely until being interrupted by the producer, 2. lock the queue, check if queue is empty, if not pop everything, unlock, process work, back to 2., if queue empty, unlock and go back to sleep.

the producer locks, puts works on the queue, unlocks, interrupts thread

are there issues with this method?

regards,

by Hicham at 23:05:39 on Sunday, 05 December 2010

I think using sleep is a mistake and it degrades performance as well. You'll find more ellegant solution in every book on operating systems - use one mutex and one semaphore and you can have both multiple producers and consumers and without delays. My solution:

#include <pthread.h> #include <semaphore.h>

using namespace std;

template <typename Type> class ConcurrentQueue : public queue<Type> { public: ConcurrentQueue() { sem_init(&sem, 0, 0); pthread_mutex_init(&lock, NULL); }

virtual ~ConcurrentQueue() { pthread_mutex_destroy(&lock); sem_destroy(&sem); }

void push(const Type& x) { pthread_mutex_lock(&lock); queue<Type>::push(x); pthread_mutex_unlock(&lock); sem_post(&sem); }

Type pop() { Type t; sem_wait(&sem); pthread_mutex_lock(&lock); t = queue<Type>::front(); queue<Type>::pop(); pthread_mutex_unlock(&lock); return t; } private: pthread_mutex_t lock; sem_t sem; };

by Richard at 23:00:27 on Thursday, 24 February 2011

Hi there,

I was wondering if I could return a reference in waitAndPop or por instead of passing a reference to hold the value of the popped element? I could just do front and return in waitAndPop and then call pop to get rid of the popped element.

Best regards, Alex

by Alejandro at 16:36:31 on Wednesday, 06 April 2011

In the member function `push', why not use scoped lock like that, to avoid the call to unlock ?

void push(Data const& data) { { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); } the_condition_variable.notify_one(); }

by François Legendre at 09:29:06 on Wednesday, 20 April 2011

excellent article. tks very much

by pharaon at 11:38:28 on Wednesday, 11 May 2011

std::queue is defined in 0x? I can't find this in vc2005 or gcc3.3

by softarts at 06:46:41 on Wednesday, 17 August 2011

std::queue is defined in the <queue> header. It is available in VC2005, and should be in gcc 3.3 too (though I don't have a copy at hand to check). See http://msdn.microsoft.com/en-us/library/s23s3de6%28v=VS.80%29.aspx for the VC2005 docs.

by Anthony Williams at 08:16:01 on Wednesday, 17 August 2011

Hi,

First of all, thanks for the article, it's very useful. Even if it may have some problems regarding how the push works.

I use omni_thread wrapper library under Solaris, instead of boost, but all the other code is the same as in your article:

void push(const Data& data) { omni_mutex_lock l(_queueMutex); _theQueue.push(data); _queueMutex.unlock(); _waitCondition.signal(); }

It seems that the explicit unlock caused problems and crash (if there are many threads writing to the queue).

I removed the call to unlock and changed the implementation like this, and it seems to work, according to my tests:

void push(const Data& data) { { omni_mutex_lock l(_queueMutex); _theQueue.push(data); } _waitCondition.signal(); }

Could it be some problem with omni_thread? Or maybe I don;t know everything, how the unlock works...Can you help me to understand why the first case fail and the second not?
by Zoltan Szeverenyi at 09:34:24 on Wednesday, 16 November 2011

RTFM...

manual of pthread_mutext_unlock says: "Attempting to unlock the mutex if it is not locked results in undefined behavior."

In my first version of push I used a scoped lock and the explicit call to unlovk TOGETHER, which means that when the function returned then the scoped lock object called the unlock again...

by Zoltan Szeverenyi at 10:29:02 on Wednesday, 16 November 2011

Thanks for posting this. This was a great tutorial.

by Clark Sims at 02:33:05 on Friday, 18 November 2011

I don't see the race condition in the first piece of code. If multiple consumers call empty, front and/or pop, at the same time the mutex, and scoped_lock mean that only one thread can alter the data at one time, so all should be good, shouldn't it?

by Clark Sims at 01:45:46 on Sunday, 27 November 2011

You need to change your locking mechanism in the call to Push() with the conditional variable. You are using a boost::mutex::scoped_lock. If you do an early unlock it will try to unlock again and throw an EPERM when it leaves scope. It took me a while to figure out that was illegal. Maybe it works in some versions but it does not work in boost 1.46.1.

by Kevin Peirson at 22:01:30 on Thursday, 08 December 2011

Thank you for this posting, I find it very concise and useful. While using this in a scenario of single producer/multiple consumers I came across a puzzle: if the consumers are using wait_and_pop() and some of them happen to be blocked while the producer stops producing and quits, these consumers will stay blocked. Using try_pop() is spinning. I've tried to apply the idea outlined in the post at http://stackoverflow.com/questions/1410649/waiting-for-multiple-condition-variables-in-boost which discusses a solution to wait for multiple condition variables but still, not all consumers quit on the condition that producer has stopped and the queue is empty.

by John K at 21:32:29 on Friday, 30 December 2011

Thanks for the wonderful article and code. I don't think I would have understood this by any other article.

./Siva.

by Sivaram Kannan at 16:31:00 on Friday, 27 April 2012

Many many thanks! I've finally solved my concurrency issues in my project (BoostStomp) using your code!

by Elias Karakoulakis at 10:15:42 on Sunday, 06 May 2012

Thanks for posting this code. In order to use this in C++11 with a queue of unique_ptr's, I have added an overload of push which takes an rvalue reference to Data, as follows:

void push(Data&& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(std::move(data)); lock.unlock(); the_condition_variable.notify_one(); }

Is this appropriate? Is there anything else I'm missing here?

by John Dibling at 21:22:18 on Wednesday, 06 June 2012
In addition, move semantics need to be applied to the various `pop` methods.
by John Dibling at 21:34:26 on Wednesday, 06 June 2012

I am looking at using this, but having the consumer not block, not even for checking the queue. I have one producer and one consumer. The consumer will be called whenever it gets a chance as part of its loop.

Here is what I am changing: bool try_try_pop(Data& popped_value) { boost::mutex::scoped_try_lock lock(the_mutex); if(!lock) { cout<<"Mutex is locked by another thread."; return false; } if(the_queue.empty()) { return false; } popped_value=the_queue.front(); the_queue.pop(); return true; }

Do you see issues with this approach? I was a little unsure of the lock check as it overloads the ! operator to do the check.

by Demolishun at 22:03:01 on Wednesday, 13 June 2012

Hi, at the following code, you acquire lock in [1] and after that you try to acquire lock in [2] again which cause deadlock :

{ boost::mutex::scoped_lock lock(the_mutex); [1] if(the_queue.empty()); [2] .... ... }

by Hassan Monfared at 09:05:47 on Sunday, 29 July 2012

@Hassan Monfared: take a closer look, this is not a dead lock. you are mistaking the empty() method of the concurrent queue class and the internal queue data structure.

by William Symionow at 06:15:31 on Friday, 28 December 2012

template <typename Data> class concurrent_queue { private: boost::mutex qu_cs_; std::queue<Data> queue_; boost::mutex cond_cs_; boost::condition_variable condition_;

public: void push(Data const & data) { boost::mutex::scoped_lock lock(qu_cs_); queue_.push(data); lock.unlock();

condition_.notify_one(); }

bool empty() const { boost::mutex::scoped_lock lock(qu_cs_); return queue_.empty(); }

bool pop(Data& value) { boost::mutex::scoped_lock lock(qu_cs_); if (queue_.empty()) { return false; }

value = queue_.front(); queue_.pop(); return true; }

bool wait_and_pop(Data& value, int msec) { if (pop(value)) { return true; }

{ boost::mutex::scoped_lock wait(cond_cs_); if (!condition_.timed_wait(wait, boost::posix_time::milliseconds(msec))) { return false; } } //end

{ boost::mutex::scoped_lock lock(qu_cs_); if (queue_.empty()) { return false; } value = queue_.front(); queue_.pop(); return true; } //end }

void wait_and_pop(Data& value) { if (pop(value)) { return; }

while (true) { { boost::mutex::scoped_lock wait(cond_cs_); condition_.wait(wait); } //end

{ boost::mutex::scoped_lock lock(qu_cs_); if (queue_.empty()) { continue; } value = queue_.front(); queue_.pop(); return; } //end } //while } };
by shrine at 02:56:17 on Tuesday, 26 February 2013

I implemented my on synchronized generic queue with the new C++11 constructs thanks to this post! Much appreciated post.

Regards, Bengt

by Bengt Ericsson at 10:37:21 on Sunday, 16 June 2013

Do you have any performance results, latency, msg/sec etc? I have a C implementation of a very fast lockfree queue with latency around 100ns, wonder if this is better? I'm using semaphores and built in primitives.

Cheers!

by mark at 01:49:33 on Sunday, 22 September 2013

Add your comment

Your name:

Your URL:

Email address:

Person or spambot?

Your comment: