POSIX thread 库是 POSIX 的线程标准库,为 C/C++ 定义了一套创建和操纵线程的 API。在多处理器/多核系统上使用线程,可以通过分布式或并行的方式提升处理速度。创建新线程fork进程要快得多,所需的开销也更少,这是因为系统不会为线程初始化新的虚拟内存空间和运行环境。

pthread 实在是太常用了,刚好女朋友这几天在准备面试,那么我就在这里记录一下这个库常见的用法(顺便更新一下这个凉了巨久的博客

线程基础知识

  • 同一个进程下的所有线程拥有相同的地址空间
  • 同一个进程下的所有线程共享:
    • 进程的公有数据
    • 进程指令
    • 进程打开的文件描述符
    • 信号及信号处理器
    • 进程工作目录
    • 进程用户ID(user_id)及组ID(group_id)
  • 每一个线程都具有独立的:
    • 线程ID
    • 寄存器(组)
    • 线程的栈空间
    • 信号掩码
    • 优先级
    • 错误返回值(errno
  • 一个线程既不能管理其他线程,也不知道是哪个线程创建了它本身
  • 所有 pthread 函数若成功执行则返回 0
  • 线程的基本操作包括:
    • 线程的创建 (creation)
    • 线程的销毁 (termination)
    • 线程的同步 (synchronization),包括等待 (join) 与阻塞 (blocking)
    • 线程的调度 (scheduling)
    • 线程的数据管理 (data management)

线程的创建及结束

线程创建

为了创建一个线程,我们可以使用 pthread_create() 函数

int pthread_create(pthread_t *thread, 
                   const pthread_attr_t *attr,
                   void *(*start_routine)(void *),
                   void *arg);
  • thread 表示待创建的线程ID,可以在线程内通过调用 pthread_self() 获得
  • attr 线程属性,常用属性包含
    • detachstate 表示新线程是否与进程中其他线程脱离同步。PTHREAD_CREATE_JOINABLE 表示可同步,PTHREAD_CREATE_DETACHED 表示脱离同步,不可用 pthread_join()
    • schedpolicy 表示线程调度策略,包含 正常非实时的 SCHED_OTHER、实时轮转的 SCHED_RR、实时先入先出的 SCHED_FIFO,后两者只能给超级管理员使用。线程运行中也可以通过调用 pthread_attr_setschedpolicy() 来改变调度策略
    • schedparam 内含一个 struct schedparam 结构,仅有一个 sched_priority 的整型变量表示线程的优先级,仅当调度策略为 SCHED_RRSCHED_FIFO 时有效,可通过调用 pthread_attr_setschedparam() 来改变优先级
    • inheritsched 表示调度策略及参数是否继承调用者的值。当该属性为 PTHREAD_EXPLICIT_SCHED 时,新线程需要被显式地指定调度策略及参数。当该属性为 PTHREAD_INHERIT_SCHED 时,新线程直接继承调用者的值。缺省情况下为 PTHREAD_EXPLICIT_SCHED
    • scope 表示线程之间竞争 CPU 的范围,即运行优先级的范围。PTHREAD_SCOPE_SYSTEM 表示与系统中所有线程同时竞争 CPU,PTHREAD_SCOPE_PROCESS 表示只与主进程内的其他线程竞争 CPU
  • void *(*start_routine) 表示新线程里需要运行的一个函数,且该函数必须只能有一个 void* 参数
  • *arg 表示运行函数的参数表,如果要传递多个参数,可以使用指向 struct 的指针

线程结束

结束线程可以使用 pthread_exit(void *retval) 函数,传入参数为保存线程返回值的一个指针。需要注意的是 pthread_exit() 不会返回任何值,如果线程不是 detached 的,则可以使用 pthread_join() 从另一个线程里得到线程ID和返回值。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void *foo(void *arg)
{
     char *msg;
     msg = (char *)arg;
     printf("%d: %s \n", pthread_self(), msg);
}

int main()
{
     pthread_t threadA, threadB;
     char *msg1 = "message from thread A";
     char *msg2 = "message from thread B";
     int retA, retB;

     retA = pthread_create(&threadA, NULL, foo, (void *)msg1);
     retB = pthread_create(&threadB, NULL, foo, (void *)msg2);

     pthread_join(threadA, NULL);       // 等待 thread A 结束
     pthread_join(threadB, NULL);       // 等待 thread B 结束

     printf("Thread 1 returns: %d\n", retA);
     printf("Thread 2 returns: %d\n", retB);
     return 0;
}

线程的同步

pthread 库里提供了三种的同步机制:

  • mutexes 互斥锁,通过设定一组变量来控制线程访问
  • joins 让线程等待,直到其它线程结束
  • condition variable 一组 pthread_cond_t 类型的条件变量

接下来我们逐个分析一下这三种同步机制。

Mutexes

互斥锁是一类很常见的同步机制,当需要控制多个线程对某一临界资源进行互斥访问时,就需要用到互斥锁。在 pthread 中使用 pthread_mutex_t 表示一个互斥信号量。使用 pthread_mutex_lock(*t) 进行锁操作,使用 pthread_mutex_unlock(*t) 进行释放。

#include <pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *foo() {
    pthread_mutex_lock(&mutex);   // 上锁
    // DO SOMETHING HERE...
    pthread_mutex_unlock(&mutex); // 释放
}

int main() {
    pthread_t t1, t2;
    
    pthread_create(&t1, NULL, &foo, NULL);
    pthread_create(&t2, NULL, &foo, NULL);
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    
    return 0;
}

Joins

当需要等待其他线程结束时,就要用到 join 操作。一个线程可能会在执行过程中唤起多条其他线程,并且等候它们运行结束。

#include <pthread.h>
#include <stdio.h>

const int THREADS_N = 42;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int count = 0;

void *foo() {
    pthread_mutex_lock(&mutex);
    for(int i = 0; i <= 1000; i++)
        count += 1;
    printf("thread %ld: %d\n", pthread_self(), count);
    pthread_mutex_unlock(&mutex);
}

int main() {
    int i;
    pthread_t thread[THREADS_N];
    
    for(i = 0; i < THREADS_N; i++) { pthread_create(thread+i, NULL, foo, NULL); }
    for(i = 0; i < THREADS_N; i++) { pthread_join(thread[i], NULL); }
    printf("main thread finished!\n");
    return 0;
}

Condition Variables

在仅使用 mutex 的情况下,存在这样一个问题:当线程在等待共享资源时,需要不断地给 mutex 加锁解锁,并查询共享资源是否可用。当线程数较多时,这会是一个巨大的性能消耗。因此,可以使用条件变量机制来解决这一问题,它允许一个线程向另一个线程发送信号,也允许阻塞正在等待资源的线程。

pthread 库中,使用 pthread_cond_t 来声明一个条件变量。

  • 创建/销毁
    • pthread_cond_init
    • pthread_cond_t cond = PTHREAD_COND_INITIALIZER
    • pthread_cond_destroy
  • 条件等待
    • pthread_cond_wait
    • pthread_cond_timedwait (指定等待时间上限)
  • 条件唤醒
    • pthread_cond_signal
    • pthread_cond_broadcast

看下面这个例子:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

int count = 0;

pthread_mutex_t condition_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition = PTHREAD_COND_INITIALIZER;

void *producer() {
    while(1) {
        pthread_mutex_lock(&count_mutex);
        count += 4;
        printf("producer: count = %d (+)\n", count);
        if (count > 0) {
            pthread_cond_broadcast(&condition);     // 广播给所有 consumer
        }
        pthread_mutex_unlock(&count_mutex);
        sleep(2);
    }
}

void *consumer() {
    while(1) {
        pthread_mutex_lock(&condition_mutex);
        while(count == 0)
            pthread_cond_wait(&condition, &condition_mutex);
        pthread_mutex_unlock(&condition_mutex);
        pthread_mutex_lock(&count_mutex);
        if (count > 0) {
            count -= 1;
            printf("consumer(%d): count = %d (-)\n", pthread_self(), count);
        }
        pthread_mutex_unlock(&count_mutex);
        sleep(1);
    }
}

int main() {
    pthread_t p1; 
    pthread_t c1, c2, c3;
    pthread_create(&c1, NULL, consumer, NULL);
    pthread_create(&c2, NULL, consumer, NULL);
    pthread_create(&c3, NULL, consumer, NULL);
    pthread_create(&p1, NULL, producer, NULL);
    pthread_join(c1, NULL);
    pthread_join(c2, NULL);
    pthread_join(c3, NULL);
    pthread_join(p1, NULL);
    return 0;
}

上述代码中,producer 线程 p1 每2秒将 count 提升4,同时使用 pthread_cond_broadcast 通知所有 consumer 线程来消耗 count。运行结果如下:

producer: count = 4 (+)
consumer(1289328384): count = 3 (-)
consumer(1306113792): count = 2 (-)
consumer(1297721088): count = 1 (-)
consumer(1289328384): count = 0 (-)
producer: count = 4 (+)
consumer(1306113792): count = 3 (-)
consumer(1297721088): count = 2 (-)
consumer(1289328384): count = 1 (-)
consumer(1306113792): count = 0 (-)
producer: count = 4 (+)
...