Linux系统编程三十线程池实现

线程池实现

  • 一.线程池的本质
  • 二.类内创建线程
  • 三.代码实现

一.线程池的本质

线程池里面存储的都是一批已经创建好的线程,当线程池里有数据时,这批线程就会被唤醒去竞争数据,当线程池里没有数据时,这批线程就去休眠等待。

线程池的本质就是一个生产消费模型,当有生产者线程往线程池里发送任务时,线程池里的消费者线程就会竞争任务。

比如主线程往线程池里投递一个任务,线程池里的若干线程就会被立刻唤醒,然后去竞争抢任务执行。
所以一开始线程池里的线程必须早就被创建出来的,只不过线程池里没有东西,它们都去条件变量下等待了。

所以如果要实现一个线程池,那么需要些什么呢?

1.首先线程池肯定需要存储任务数据的场所,我们可以用队列来存储。也就是线程池里需要一个任务队列,用来存储接收到的任务。
2.其次线程池里存储很多已经被创建的线程,所以里面还需要能够找到这个已经被创建的线程,如何找到呢?根据线程的tid,所以我们可以用vector数组存储线程的tid,来找到所有线程。
3.然后就是我们要保证生产消费过程中的安全,比如消费者只能互斥竞争任务,不能同时竞争任务,生产者在生产,消费者不能去消费等,所以需要加锁保护。
4.最后就是要保证,生产消费之间的有序性,线程池里线程不能一直去竞争任务,如果线程池里没有任务了,那么就需要到条件变量下等待。

二.类内创建线程

在实现的过程中,有一个细节,那就是在类内部创建线程,因为线程池中的线程是早就被创建好的,所以程序一开始运行时就需要存在,只不过这时线程池里的线程都在休眠。但是在创建线程时,需要传递线程的执行函数,线程的执行函数有特定的形式,必须要返回值是void类型,参数也是void类型。
当在类内部创建时,这个线程执行函数默认是类成员函数,成员函数的参数里默认会有一个this指针。
也就是在类内创建线程时,该线程执行函数的参数是有两个的,不符合要求。就会创建失败。
在这里插入图片描述

那该如何解决呢?
我们可以在成员函数的前面加上static,就变成了静态成员函数了,静态成员函数是没有this指针的。

但设置成静态成员函数后,又会存在一个问题,那就是该静态成员函数是无法直接访问成员变量的。而线程的执行函数是需要去到线程池里的任务队列里去竞争任务的,所以必须要访问类成员变量。在这里插入图片描述

【解决方法】
我们可以在创建线程时,将该类的this指针传给线程函数,这样线程的执行函数,就可以通过类型转换访问到类的成员变量了。
在这里插入图片描述

三.代码实现

#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
#include <string>
#include <vector>
//线程池的本质就是生产消费模型
//一个生产者往线程池里放任务,然后其他消费者者就竞争这个任务执行
//线程池里有很多线程,所以它的基本属性肯定有识别线程的id


static const int defaultnum=3;//默认线程池里有3个线程
struct ThreadInfo
{
   pthread_t tid;
   std::string name;
};
template <class T>
class ThreadPool
{
public:

    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }
    void Unlock()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Makeup()
    {
        pthread_cond_signal(&_cond);
    }
    bool isQueueEmpty()
    {
        return _task.empty();
    }
    void ThreadSleep()
    {
        pthread_cond_wait(&_cond,&_mutex);
    }
    T Pop()
    {
       T t=_task.front();
       _task.pop();
       return t;
    }
   
   std::string GetthreadName(pthread_t id)
   {
      for(const auto&ti :_thread)
      {
        if(ti.tid==id)
        return ti.name;
      }
      return "None";
   }
public:
     ThreadPool(int num=defaultnum):_thread(num)
     {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
     }
     void Push(const T& in)//往线程池里发送任务,发送是没有条件的,但一旦发送了,就说明消费条件满足了,就要唤醒线程池里的线程去执行
     {
        Lock();
        _task.push(in);
        Makeup();//唤醒在条件变量下等待的线程
        Unlock();
     }

    
    
    //要注意,在类内部创建线程时,线程执行的函数里,会有this指针,不满足要求,所以必须要使用静态成员函数
    //这样才可以没有this指针,只有一个参数,但静态成员函数又不能访问类成员,所以在给线程函数传递参数时,我们传
    //该类的this指针,这样就可以通过this指针访问类成员
    static void *Handler(void* args)//去线程池里的任务队列里竞争任务
    {
      ThreadPool<T>* td=static_cast< ThreadPool<T>*>(args);
      std::string name=td->GetthreadName(pthread_self());//根据tid来获取到对应的名字
      //线程创建出来就去竞争任务,任务在哪里?在任务队列里,任务队列里没有怎么办?去条件变量下等待
      while(true)
      {
        td->Lock();

        while(td->isQueueEmpty())//防止伪唤醒
        {
            td->ThreadSleep();//没有任务那么就去条件变量下等待       
        }
       //如果有任务,那么就将任务拿出来,并执行
       T t=td->Pop();
       td->Unlock();
       
       t();//处理任务
      std::cout<<name<<"run, "<<"reslut: "<<t.Getresult()<<std::endl;
      }

    }
     //当我们去调用这个线程池的时候,线程池就应该给我们创建若干个线程在线程池里。而当有人往线程池里发送任务时,线程
     //池里的线程会立刻被唤醒,去竞争任务
     
     void Start()//线程池里的线程刚被创建出来,就会去线程池里的队列里竞争任务,如果没有任务,那么它就会去休眠
     {
      
      int num=_thread.size();
      for(int i=0;i<num;i++)
      {
         
        _thread[i].name="thread- "+std::to_string(i+1);

        pthread_create(&(_thread[i].tid),nullptr,Handler,this);
      }   
     }

     ~ThreadPool()
     {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
     }

private:
 

  std::vector<ThreadInfo> _thread;//根据这个来找到要分配任务的线程,线程池里存储的线程
  std::queue<T> _task;//线程池里存放的任务

  pthread_mutex_t _mutex;
  pthread_cond_t _cond;
   
};
#include <iostream>
#include "ThreadPool.hpp"
#include "TASK.hpp"
#include <ctime>
#include <unistd.h>
int main()
{

    ThreadPool<TASK> *tp = new ThreadPool<TASK>();
    tp->Start();
    int len = opera.size();
    srand(time(nullptr));
    while (true)
    {
        // 1.获取数据
        int x = rand() % 10 + 1;
        usleep(10);
        int y = rand() % 10;
        char op = opera[rand() % len];
        TASK t(x, y, op);
        // 2.生产数据
        tp->Push(t);

        std::cout<<"main thread make task"<<t.GetTASK()<<std::endl;

        sleep(1);
    }
}
#pragma once
#include <iostream>
#include <string>

std::string opera="+-*/%";
class TASK
{

public:
    TASK()
    {}
    TASK(int data1, int data2, char op) : _data1(data1), _data2(data2), _oper(op)
    {
    }
    void run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
        {
            if (_data2 == 0)
                _exitcode = 1;
            else
                _result = _data1 / _data2;
        }
        break;
        case '%':
        {
            if (_data2 == 0)
                _exitcode = 2;
            else
                _result = _data1 % _data2;
        }
        break;

        default:
        _exitcode=3;
            break;
        }
    }

    std::string GetTASK()
    {
       std::string r=std::to_string(_data1);
       r+=_oper;
       r+=std::to_string(_data2);
       r+="=?";
       return r;
    }
    std::string Getresult()
    {
        std::string result=std::to_string(_data1);
        result+=_oper;
        result+=std::to_string(_data2);
        result+='=';
        result+=std::to_string(_result);
        result+="[code:";
        result+=std::to_string(_exitcode);
        result+=']';
        return result;
    }
    void operator()()
    {
        run();
    }
private:
    int _data1;
    int _data2;
    char _oper;

    int _result;
    int _exitcode=0;
};