Linux:线程(四)POSIX信号量

POSIX信号量

  • 一.进一步探析生产者消费者模型
  • 二.POSIX信号量
    • 1.基本使用
    • 2.基于环形队列的生产消费模型
      • 1.概念
      • 2.模拟
      • 3.一个使用例子

一.进一步探析生产者消费者模型

前面我们详解过生产者消费者模型,也了解了它的一些优点,例如:解耦,支持并发,支持忙闲不均…但其实也有人说它高效,这体现在哪呢?

需要理解的一个重要细节是:在生产者进行生产时,需要首先从其它地方获取”原材料”;在消费者消费数据后,也需要对数据进行加工。而生产者消费者模型的高效就体现在当生产者持有锁时,消费者不能进行消费但可以进行数据加个;当消费者持有锁时,生产者不能进行生产但可以获取“原材料”。

再回过头来看看上一篇博客写的模拟阻塞队列(生产者消费者模型)代码,可以发现每一个线程都是先加锁,再进行判断。这很好理解,因为判断也是临界资源。

在这里插入图片描述

当判断不成立时就会让该线程进入等待状态,同时注意pthread_cond_wait会自动释放锁。这样其他线程就能拿到锁,正常运行。当该线程资源充足后再将该线程唤醒,该线程又重新拿到锁,开始继续执行代码。那么一个问题,如果该线程误唤醒(伪唤醒)了呢?

什么是伪唤醒(误唤醒)

就以生产者消费者模型为例。上一篇博客里的模拟代码是单生产者和单消费者,很明显在实际中往往是多生产者和多消费者。当有多个生产者在进行等待(为什么会有多个生产者等待,因为pthread_cond_wait会自动释放锁,从而其他生产者就可以持有锁再进入等待队列)。

在这里插入图片描述

如果使用一个函数(例如:pthread_cond_broadcast)将它们全部唤醒,毫无疑问它们要再重新持有锁,那么就必然会进行锁的竞争。当一个生产者拿到了锁生产完毕后再释放锁,紧接着另一个生产者又抢到了锁,再进行生产…假设生产者的锁的竞争力更强,消费者一直没抢到锁,没办法进行消费(注意生产者和消费者用的是同一把锁)。当队列已经满后,生产者还在被唤醒进行生产,此时的唤醒被称为误唤醒(伪唤醒)。

为了防止这种情况,建议把if判断改成while判断。

在这里插入图片描述

二.POSIX信号量

前面在进程间通信里已经写过信号量了,那里主要是SystemV信号量,这里是POSIX信号量。POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 POSIX可以用于线程间同步。

信号量是保证PV操作原子性的一个计数器。申请信号量就是对计数器进行减减,释放信号量就是对计数器进行加加。 例如:我像让三个线程访问一个数组,把该数组分为三部分,让每个线程分别访问它的三个部分。那么如何保证进入的只有三个线程而不是四个线程呢,就依靠信号量这一个计数器进行计数。

1.基本使用

初始化信号量

在这里插入图片描述

参数:

sem:创建的信号量
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值

销毁信号量

在这里插入图片描述

等待信号量

功能:等待信号量,会将信号量的值减1

在这里插入图片描述

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1

在这里插入图片描述

2.基于环形队列的生产消费模型

1.概念

循环队列相信大家都不陌生,这是属于数据结构的知识。这里的循环队列与数据结构里的也几乎一样。

在这里插入图片描述

我们规定按照顺时针走,P生产一个走一格,C消费一个走一格。

有三个条件:

1.当指向同一位置时,只能有一方访问,避免冲突。(空:P走;满:C走)
2.C不能超过P
3.不能发生套圈情况

这里看是空还是满,就可以直接使用信号量进行判别。

在这里插入图片描述

我们用信号量保证生产者和消费者间的互斥,用锁来保证消费者和消费者,生产者和生产者间的互斥。

2.模拟

RingQueue.hpp

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>

const static int defaultcap = 5;

template<class T>
class RingQueue{
private:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap = defaultcap)
    :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
    {
        sem_init(&cdata_sem_, 0, 0);
        sem_init(&pspace_sem_, 0, cap);

        pthread_mutex_init(&c_mutex_, nullptr);
        pthread_mutex_init(&p_mutex_, nullptr);
    }
    void Push(const T &in) // 生产
    {
        P(pspace_sem_);

        Lock(p_mutex_); // ?
        ringqueue_[p_step_] = in;
        // 位置后移,维持环形特性
        p_step_++;
        p_step_ %= cap_;
        Unlock(p_mutex_); 

        V(cdata_sem_);

    }
    void Pop(T *out)       // 消费
    {
        P(cdata_sem_);

        Lock(c_mutex_); // ?
        *out = ringqueue_[c_step_];
        // 位置后移,维持环形特性
        c_step_++;
        c_step_ %= cap_;
        Unlock(c_mutex_); 

        V(pspace_sem_);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem_);
        sem_destroy(&pspace_sem_);

        pthread_mutex_destroy(&c_mutex_);
        pthread_mutex_destroy(&p_mutex_);
    }
private:
    std::vector<T> ringqueue_;
    int cap_;

    int c_step_;       // 消费者下标
    int p_step_;       // 生产者下标

    sem_t cdata_sem_;  // 消费者关注的数据资源
    sem_t pspace_sem_; // 生产者关注的空间资源

    pthread_mutex_t c_mutex_;
    pthread_mutex_t p_mutex_;
};

3.一个使用例子

Task.hpp

#pragma once
#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task()
    {}
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

Main.cc

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"

using namespace std;

struct ThreadData
{
    RingQueue<Task> *rq;
    std::string threadname;
};

void *Productor(void *args)
{
    // sleep(3);
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    int len = opers.size();
    while (true)
    {
        // 1. 获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        // 2. 生产数据
        rq->Push(t);
        cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;

        sleep(1);
    }
    return nullptr;
}

void *Consumer(void *args)
{
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;

    while (true)
    {
        // 1. 消费数据
        Task t;
        rq->Pop(&t);
       
        // 2. 处理数据
        t();
        cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
        // sleep(1);

    }
    return nullptr;
}

int main()
{
    srand(time(nullptr) ^ getpid());
    RingQueue<Task> *rq = new RingQueue<Task>(50);

    pthread_t c[5], p[3];

    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Productor-" + std::to_string(i);

        pthread_create(p + i, nullptr, Productor, td);
    }
    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Consumer-" + std::to_string(i);

        pthread_create(c + i, nullptr, Consumer, td);
    }

    for (int i = 0; i < 1; i++)
    {
        pthread_join(p[i], nullptr);
    }
    for (int i = 0; i < 1; i++)
    {
        pthread_join(c[i], nullptr);
    }

    return 0;
}