线程同步与互斥


来源:《基于UNIX/Linux的C系统编程》

由于同一程序(进程)的多个线程共享同样的数据和资源,所以不可避免会出现同步、排队或竞争等问题,当然也可能导致死锁或饥饿等现象发生,这些都需要在程序中加以控制。相较于进程,线程间的通信似乎很容易。线程间可以通过设置和读取这些全局变量来进行通信,而无须特殊的机制,但是,对全局变量(实质上是共享内存)的访问并非无懈可击,若不善加利用,将对线程的安全性构成直接威胁。

例1:三个线程分别用于递增操作、递减操作和打印计数,以展示线程间共享全局变量无特殊机制时的相互干扰。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

int count = 0;

void *print(void *m) {
	int i;
	for (i = 0; i < 5; i++) {
		printf("count = %d\n", count);
		sleep(1);
	}
	return NULL;
}

void *counter2(void *m) {
	while (1) {
		if (count < 0) break;
		count--;
	}
	return NULL;
}

int main(void) {
	pthread_t t1, t2;
	int i;
	pthread_create(&t1, NULL, print, NULL);
	pthread_create(&t2, NULL, counter2, NULL);
	for (i = 0; i < 5; i++) {
		count++;
		sleep(1);
	}
	pthread_join(t1, NULL);
	pthread_join(t2, NULL);
	return 0;
}

2016-01-05 14:11:06屏幕截图

互斥量

互斥量的本质是一把锁,所以又称互斥锁。在访问共享资源时先对互斥量加锁,待访问结束后解锁。本质上讲,互斥量其实是二进制信号量的一种特殊形式。其函数原型如下:

#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

其中pthread_mutex_init函数用于创建互斥量,pthread_mutex_destroy函数用于删除互斥量,pthread_mutex_lock函数用于加锁,pthread_mutex_unlock函数用于解锁,pthread_mutex_trylock函数用于无阻塞调用线程。

线程的互斥量数据类型是pthread_mutex_t,使用前需要初始化:

  • 对于静态分配的互斥量,可以初始化为PTHREAD_MUTEX_INITIALIZER或调用pthread_mutex_init函数。
  • 对于动态分配的互斥量,在申请内存之后,通过pthread_mutex_init函数进行初始化,并且在释放内存前需要调用pthread_mutex_destroy函数予以释放。

若使用默认的属性初始化互斥量,在attr参数应设置为NULL。若删除一个互斥量,则意味着释放它所占用的资源。需要指出,对于Linux,由于该系统下的互斥量并不占用任何资源,所以在Linux下的pthread_mutex_destroy函数除了检查锁以外没有其他动作。

对于共享资源的访问控制,若互斥量加锁,则调用线程将阻塞,直到互斥量被解锁,所以加锁和解锁操作是成对出现的。

函数pthread_mutex_trylock是非阻塞调用形式。也就是说,若互斥量没被锁住,则该函数将把互斥量加锁,并获得对共享资源的访问权限;若互斥量被锁住,则该函数将不会阻塞、等待,而是直接返回并将EBUSY写入errno,表示共享资源处于忙状态。

互斥量在使用时,不能在两个不同的线程中分别加锁和解锁,也不能对已解锁的互斥量再解锁,这些行为都将导致不可预料的后果,所以当系统中使用较多互斥量时,则要格外留意是否有产生死锁的条件存在。一般规律是:若线程之间对所有的信号量都按照相同的顺序加锁,那么死锁将不会发生;但是若不能对所有锁进行排序,则可以使用pthread_mutex_trylock函数先测试锁状态,若不能获取锁状态,也可以先释放已占有的锁,过一段时间再重新尝试加锁,这些同样可以避免死锁的发生。

例2:修改上例,在递增计数器和递减计数器上增加互斥量。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

int count = 0;
pthread_mutex_t counter_lock = PTHREAD_MUTEX_INITIALIZER;

void *print(void *m) {
	int i;
	for (i = 0; i < 5; i++) {
		printf("count = %d\n", count);
		sleep(1);
	}
	return NULL;
}

void *counter2(void *m) {
	while (1) {
		if (count < 0) break;
		pthread_mutex_lock(&counter_lock);
		count--;
		pthread_mutex_unlock(&counter_lock);
	}
	return NULL;
}

int main(void) {
	pthread_t t1, t2;
	int i;
	pthread_create(&t1, NULL, print, NULL);
	pthread_create(&t2, NULL, counter2, NULL);
	for (i = 0; i < 5; i++) {
		pthread_mutex_lock(&counter_lock);
		count++;
		sleep(1);
		pthread_mutex_unlock(&counter_lock);
	}
	pthread_join(t1, NULL);
	pthread_join(t2, NULL);
	pthread_mutex_destroy(&counter_lock);
	return 0;
}

2016-01-05 14:11:47屏幕截图

从结果分析,两个线程可以安全地共享这个全局变量counter。当一个线程调用pthread_mutex_lock时,若另一个线程已经将这个互斥量锁定,则该线程只好阻塞等待锁被打开,待开锁后线程才能对counter操作。每个线程对counter进行操作后,都将互斥量解锁,然后继续循环。

条件变量

使用互斥量固然可以解决多个线程访问同一共享所带来的冲突问题,但是其执行效率不高。主要原因是以控制资源访问(将资源设置为“锁定”和“非锁定”状态)来实现线程同步的互斥量机制无法满足复杂编程的要求。比如线程虽然已获得某一共享资源,但它在等待某一事件发生之后才能运行,而在当前条件未得到满足情况下,该线程只能将资源设置为“非锁定”状态,让其他线程先使用。在这种情况下,若是简单地在互斥量机制中加入条件判断,那么这个判断结果如何及时通知到各线程呢?若各线程对条件判断的要求各不相同时,则编程将更为复杂。为此引入了条件变量机制。

条件变量机制是通过允许线程阻塞和等待另一个线程发送信号的方法来实现线程间同步。其原理是通过条件变量来阻塞某一线程,待条件不满足时,线程将资源设置为“非锁定”状态以供其他线程使用,并等待条件发生变化;一旦其他某个线程改变了这个条件变量并使条件得到满足,则该线程将通过信号唤醒一个或多个正被此条件变量阻塞的线程,而后这些被唤醒的线程再按照互斥量机制实现同步。条件变量的相关函数原型如下:

#include <pthread.h>
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_timewait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);

其中pthread_cond_init函数用于初始化条件变量。

条件变量与互斥量的用法类似,都有静态和动态两种创建方式:

  • 对于静态分配的条件变量,可以初始化为PTHREAD_COND_INITIALIZER;
  • 对于动态分配的条件变量,在申请内存之后,通过pthread_cond_init函数进行初始化,并且在释放内存前需要调用pthread_cond_destroy函数予以释放。

函数pthread_cond_destroy用于注销一个条件变量,但只在没有线程在该条件变量上等待时才能注销这个条件变量,否则函数将返回-1并将errno设置为EBUSY。对于Linux所实现的条件变量由于没有分配什么资源,所以注销操作只是检查是否有等待线程。

函数pthread_cond_wait将使线程进入休眠状态。

函数pthread_cond_signal用于将符合条件的信息以信号形式发送出去,使得等待条件的线程队列中的第一个线程收到此信号。此外还有一个函数pthread_cond_broadcast(&mycond)是将符合条件的信息以广播信号的形式发送出去,使等待条件的所有线程都能收到该信号。

函数pthread_cond_timedwait可以自动解锁互斥量并等待条件变量,并且还可以限定等待时间。若在参数abstime指定的时间内条件变量cond并未触发,则互斥量量mutex将被重新加锁,函数返回-1并将错位码ETIMEDOUT写入errno。abstime参数指定一个绝对时间,时间原点与time和gettimeofday相同。

例3:针对上述函数设计一程序,以验证这些函数的用法。要求:建立两个线程t1、t2,两个线程分别访问共享资源gnum,并进行加1操作,但共享资源是3的倍数时,线程t1通知线程t2工作,当共享资源不是3的倍数时,线程t1工作,线程t2挂起并等待条件为真。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int gnum = 0;

void *thread1(void *m) {
	for (;;) {
		pthread_mutex_lock(&mutex);
		if (gnum % 3 == 0)
			pthread_cond_signal(&cond);
		else {
			printf("thread1:%d\n", gnum);
			gnum++;
		}
		pthread_mutex_unlock(&mutex);
		sleep(1);
	}
}

void *thread2(void *m) {
	for (;;) {
		pthread_mutex_lock(&mutex);
		if (gnum % 3 != 0)
			pthread_cond_wait(&cond, &mutex);
		printf("thread2:%d\n", gnum);
		gnum++;
		pthread_mutex_unlock(&mutex);
		sleep(1);
	}
}

int main(void) {
	pthread_t t1, t2;
	pthread_create(&t1, NULL, thread1, (void *)NULL);
	pthread_create(&t2, NULL, thread2, (void *)NULL);
	pthread_join(t1, NULL);
	pthread_join(t2, NULL);
	pthread_mutex_destroy(&mutex);
	pthread_cond_destroy(&cond);
	return 0;
}

2016-01-05 14:14:17屏幕截图

线程同步中的信号量

UNIX有两组信号量函数,较早的一组信号量函数用于进程间的同步或通信,在系统V中就已存在;另外一组则是POSIX实时扩展的用于线程间同步的信号量函数。

用于线程同步的信号量按照信号量的取值不同,可以分为通用信号量和二进制信号量。

通用信号量是计数信号量,其信号量值为有符号整型数。当信号量大于零时表示可用资源的数目,等于零时表示资源全部被占用,小于零时表示资源都被占用,此外负数的绝对值表示当前有多少线程正在等待此资源。

而二进制信号量是计数信号量的子集,只有0和1两种取值,一般用于同步和互斥。互斥时,二进制信号量初始化为可用(1),一个线程得到信号量时,其他期望得到信号量的线程都将被阻塞,直至信号量被释放;同步时,信号量初始化为不可用(0),只在释放信号量时,等待的线程才开始执行。

用于线程间同步的相关函数的原型如下:

#include <pthread.h>
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_destroy(sem_t *sem);

其中sem_init函数用于创建信号量,value是信号量初值,pshared是共享属性,目前Linux还不支持这个特性,只能初始化为0。

函数sem_wait用于获取信号量,函数sem_post用于释放信号量,这些都是原子操作。

函数sem_trywait也是用于获取信号量,不过如果当前信号量不可用,调用此函数的线程不会被阻塞。

函数sem_destroy用于删除信号量,并释放其资源。注意,如果当前信号量正在被占用,或者被wait,那么删除将会失败。所以,如果线程占用了信号量,却中途退出,那么必须在线程清理函数中释放这些信号量,否则信号量将永远不能被删除,资源得不到释放。

例4:程序生成4个线程,其中两个线程负责从文件读取数据到公共的缓冲区,另两个线程从缓冲区读取数据作不同的操作(加和乘运算),其中file1内容为1 2 3 4 5 6 7 8 9 10;file2内容为10 9 8 7 6 5 4 3 2 1。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

#define MAXSTACK 100
int stack[MAXSTACK][2];
int size = 0;
sem_t sem;

void readdata1(void) {
	FILE *fp = fopen("file1", "r");
	while (!feof(fp)) {
		fscanf(fp, "%d %d", &stack[size][0], &stack[size][1]);
		sem_post(&sem);
		size++;
	}
	fclose(fp);
}

void readdata2(void) {	
	FILE *fp = fopen("file2", "r");
	while (!feof(fp)) {
		fscanf(fp, "%d %d", &stack[size][0], &stack[size][1]);
		sem_post(&sem);
		size++;
	}
	fclose(fp);
}

void handledata1(void) {
	while (1) {
		sem_wait(&sem);
		printf("Plus: %d+%d=%d\n", stack[size][0], stack[size][1], stack[size][0] + stack[size][1]);
		size--;
	}
}

void handledata2(void) {
	while (1) {
		sem_wait(&sem);
		printf("Mulitply:%d*%d=%d\n", stack[size][0], stack[size][1], stack[size][0] * stack[size][1]);
		size--;
	}
}

int main(void) {
	pthread_t t1, t2, t3, t4;
	sem_init(&sem, 0, 0);
	pthread_create(&t1, NULL, (void *)handledata1, NULL);
	pthread_create(&t2, NULL, (void *)handledata2, NULL);
	pthread_create(&t3, NULL, (void *)readdata1, NULL);
	pthread_create(&t4, NULL, (void *)readdata2, NULL);
	pthread_join(t1, NULL);
	pthread_join(t2, NULL);
	pthread_join(t3, NULL);
	pthread_join(t4, NULL);
	sem_destroy(&sem);
	return 0;
}

2016-01-05 14:15:52屏幕截图

数值并未按原先的顺序显示出来,这是由于size可以被各个线程任意修改,各线程之间存在着竞争关系的缘故。

Leave a comment

邮箱地址不会被公开。 必填项已用*标注