Qt线程同步-单生产者多消费者

单生产者单消费者大家应该已经掌握了。上一篇使用的是QMutex跟QWaitCondition。

如果你的C++编译器版本比较高的话,那就可以使用C++11的 std::condition_variable了

代码

这次对比单生产者单消费者,我们多了mutex来进行消费者之间的同步。这样就可以写单生产者多消费者了。

这是代码中多的一个锁,用于同步消费者。

std::mutex mutexConsumer; //消费者用来同步用的

上代码,这是线程同步需要的变量

struct ItemRepository
{
    std::deque<int> itemQueue; //缓冲区
    const int MaxSize = 10; // 仓库所容纳的产品最大个数
    int itemCounterUse = 0; //消费者用了多少
    std::mutex mutex;// 互斥量,保护产品缓冲区
    std::mutex mutexConsumer; //消费者用来同步用的
    std::condition_variable bufferNotFull;     // 条件变量, 指产品仓库缓冲区不为满
    std::condition_variable bufferNotEmpty;    // 条件变量, 指产品仓库缓冲区不为空
}gItemRepository;   // 产品库全局变量,生产者和消费者操作该变量.

这里是实现代码

class Producer : public QThread
{
    Q_OBJECT
public:
    Producer(QObject *parent = NULL) : QThread(parent)
    {
    }
    // 生产 产品
    void ProduceItem(ItemRepository &itemRepo, int item)
    {
        std::unique_lock<std::mutex> lock(itemRepo.mutex);
        if (itemRepo.itemQueue.size() == itemRepo.MaxSize)
        {
            itemRepo.bufferNotFull.wait(lock);
            qDebug()<<"product has full"<<endl;
        }

//#ifdef Q_OS_WIN
//        Sleep(1000);
//#endif

        itemRepo.itemQueue.push_back(item);         // 仓库放入产品
        itemRepo.bufferNotEmpty.notify_all();  // 通知消费者仓库不为空
    }

    void run() override
    {
        for (int i = 1; i <= kItemsToProduce; i++)
        {
            ProduceItem(gItemRepository, i);
            {
                qDebug()<<"procut------------"<<i<<endl;
            }
        }
    }
};

class Consumer : public QThread
{
    Q_OBJECT
public:
    Consumer(QObject *parent = NULL) : QThread(parent)
    {
    }
    // 消费 产品
    int ConsumeItem(ItemRepository &itemRepo)
    {
        int data;
        std::unique_lock<std::mutex> lock(itemRepo.mutex);
        if (itemRepo.itemQueue.empty())
        {
            itemRepo.bufferNotEmpty.wait(lock);
        }
        data = itemRepo.itemQueue.front();
        itemRepo.itemQueue.pop_front();
        itemRepo.bufferNotFull.notify_all();
        return data;
    }

    void run() override
    {
        while (true)
        {
            int item = 0;
            std::unique_lock<std::mutex> lock(gItemRepository.mutexConsumer);  // 仓库产品消费计数器保持多线程互斥
            if (gItemRepository.itemCounterUse < kItemsToProduce)
            {
                item = ConsumeItem(gItemRepository);    // 消费产品
                gItemRepository.itemCounterUse++;  // 每消费一次进行计数器+1
            }

            lock.unlock();

            if (this->objectName() == "thread0")
            {
#ifdef Q_OS_WIN
                Sleep(2000);
#endif
            }
            else if (this->objectName() == "thread1")
            {
#ifdef Q_OS_WIN
                Sleep(3000);
#endif
            }
            else if (this->objectName() == "thread2")
            {
#ifdef Q_OS_WIN
                Sleep(2000);
#endif
            }
            qDebug()<<"Consumer------------"<<this->objectName()<<"-----"<<item<<endl;
        }
    }
};

有一点需要注意的是,消费的时候,应该在消费者的外面,所以我打的log是解锁之后sleep。这时候应该及时通知其它消费者线程赶紧调度。这样避免cpu的资源浪费。

稍微吐槽下,目前百度 Qt 单生产者多消费者的例子,十有八九是用Qt的信号量来实现的,可是写那篇文章的人也没有研究明白,那个例子有严重的bug。稍微变通下生产总数跟缓冲区总数就输出有问题了。慎用。

CSDN的文章就是相互抄袭啊。逃)

工程代码在这里

https://github.com/CryFeiFei/Qt_Teach/tree/master/Qt_Teach/Thread_Muliconsmer


文章作者: 张小飞
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明来源 张小飞 !
 上一篇
Qt释放线程资源的一些工程上的方法 Qt释放线程资源的一些工程上的方法
Qt官方文档的方法 QThread创建在栈上,然后QObject需要配合QThread释放资源直接上代码。结束的时候线程quit and wait 直接上代码 class Controller : public QObject {
下一篇 
Qt线程同步-单生产者单消费者 Qt线程同步-单生产者单消费者
生产者消费者生产者消费者是个很经典的模型,我当时上学的时候,记得操作系统老师就讲过。 现在我们用Qt的条件变量来实现。 QWaitConditionQt的环境变量为QWaitCondition,对应的CPP的类就是std::conditio
  目录