This article describes a general-purpose thread-safe queue. It also provides implementation in C++.
What is a queue? Link to heading
A regular queue is a fairly straightforward data structure. Its main characteristic is that it uses the “first in, first out” principle (FIFO). That is, the element that gets added first is the first one to be read. Think of it like a pipe: items enter at one end and exit at the other, in the same order they entered. A simple queue can be implemented using an array or a linked list. Array-based queues use a contiguous block of memory to store elements. In many cases, they prove more performant than their list-based counterparts. They have good spatial locality and provide efficient access to the elements stored. Linked list-based queues use nodes connected by pointers, making it more convenient to add and remove elements dynamically.
However, these basic implementations are not suitable for concurrent environments where multiple threads might access the queue simultaneously. This is where the concept of a “thread-safe queue” becomes crucial.
Thread safety Link to heading
Imagine a situation where one thread is trying to add an element, while another thread is simultaneously trying to read or remove an element. Without proper safeguards, this could lead to a race condition, potentially corrupting the queue’s state and causing undefined behavior.
A thread-safe queue addresses these problems by incorporating synchronization mechanisms. More specifically, a typical implementation involves using a mutex and a condition variable. A mutex protects access to the queue’s state, whereas a condition variable allows consumer threads to wait efficiently until the queue receives a value to read.
Implementation Link to heading
Here is a simple C++ implementation:
/* thread_safe_queue.hpp */
#ifndef THREAD_SAFE_QUEUE_HPP
#define THREAD_SAFE_QUEUE_HPP
#include <condition_variable>
#include <mutex>
#include <queue>
template <typename T>
class ThreadSafeQueue
{
public:
ThreadSafeQueue() {}
ThreadSafeQueue &operator=(ThreadSafeQueue &) = delete;
void push(T value)
{
std::lock_guard<std::mutex> lock(this->mut);
this->data.push(value);
this->cond.notify_one(); // Notify a waiting consumer.
}
T pop()
{
std::unique_lock<std::mutex> lock(this->mut);
// Wait until the queue becomes not empty.
this->cond.wait(lock, [this] { return !this->data.empty(); });
T value = this->data.front();
this->data.pop();
return value;
}
private:
std::queue<T> data;
std::mutex mut;
std::condition_variable cond;
};
#endif // THREAD_SAFE_QUEUE_HPP