nxdong October 17, 2022 [linux ] #atomic
众所周知,多线程对同一个变量进行操作,有可能导致结果异常,这是cpu的工作特性导致的。
背景&&原理
对一个变量做自增操作,CPU微观指令级别分成3步操作:
将变量的值加载进寄存器
对寄存器的值做自增操作
将寄存器的值写回变量
在单核系统中,上述过程可以正常进行,但是在多核系统中,可能会出现以下情况。
+ +----------------------+----------------------+
| | CPU0 operation | CPU1 operation |
| +----------------------+----------------------+
| | read counter (== 0) | |
| +----------------------+----------------------+
| | increase | read counter (== 0) |
| +----------------------+----------------------+
| | write counter (== 1) | increase |
| +----------------------+----------------------+
| | | write counter (== 1) |
| +----------------------+----------------------+
V
timeline
为了解决这个问题,硬件引入原子自增指令。
保证CPU0递增原子变量counter之间,不被其他CPU执行自增指令导致不想要的结果。
对于intel的cpu。使用LOCK 前缀:
LOCK (prefix): Perform atomic access to memory (can be applied to a number of general purpose instructions that provide memory source/destination access).
出自Intel手册Volume 1: Basic Architecture 的5.20 SYSTEM INSTRUCTIONS 一节
在汇编上,也可能是 XCHG
指令, 这个指令会自动加LOCK
前缀。
这部分可以参考Intel开发手册的Volume 1: Basic Architecture 的7.3.1.2 Exchange Instructions
一章。
关于intel处理器的原子操作部分,在开发手册的Volume 3 (3A, 3B, 3C & 3D):System Programming Guide 的 CHAPTER 8 MULTIPLE-PROCESSOR MANAGEMENT
一章有更详细的说明,其第一章就是加锁的原子操作。
(留个坑,这个地方有时间翻译一下手册的第八章部分)
目前,暂且假定原子操作在intel的cpu上是通过总线锁控制内存地址的访问或者缓存一致性协议(Cache Coherency protocols)来实现的。
线程原子操作代码
add_multithread.c
文件代码如下:
#include < stdio.h>
#include < stdlib.h>
#include < pthread.h>
#include < unistd.h>
#include < sys/types.h>
#include < sys/syscall.h>
#define WORK_SIZE 5000000
#define WORKER_COUNT 16
pthread_t g_tWorkerID[ WORKER_COUNT] ;
int g_sum = 0 ;
pthread_mutex_t g_mutex;
pid_t gettid ( )
{
return syscall ( SYS_gettid ) ;
}
void * add_worker ( void * atomic_flag )
{
int i = 0 ;
for ( i = 0 ; i < WORK_SIZE; ++ i)
{
if ( * ( int * ) atomic_flag == 1 )
{
__sync_fetch_and_add ( & g_sum, 1 ) ;
}
else if ( * ( int * ) atomic_flag == 2 )
{
pthread_mutex_lock ( & g_mutex) ;
g_sum++ ;
pthread_mutex_unlock ( & g_mutex) ;
}
else
{
g_sum++ ;
}
}
return NULL ;
}
int main ( int argc , const char * argv [ ] )
{
int method_flag = 0 ;
if ( argc == 2 )
{
method_flag = atoi ( argv[ 1 ] ) ;
}
else
{
printf ( " Args Help:\n " ) ;
printf ( " 0: no sync \n " ) ;
printf ( " 1: atomic add \n " ) ;
printf ( " 2: mutex \n " ) ;
exit ( 0 ) ;
}
pthread_mutex_init ( & g_mutex, NULL ) ;
int i = 0 ;
for ( i = 0 ; i < WORKER_COUNT; ++ i)
{
pthread_create ( & g_tWorkerID[ i] , NULL , add_worker, & method_flag) ;
}
printf ( " Created %d add threads\n " , i) ;
for ( int i = 0 ; i < WORKER_COUNT; ++ i)
{
pthread_join ( g_tWorkerID[ i] , NULL ) ;
}
pthread_mutex_destroy ( & g_mutex) ;
printf ( " All add threads finished.\n " ) ;
printf ( " The sum: %d Method: %d \n " , g_sum, method_flag) ;
return 0 ;
}
编译&&运行
只要朴素的编译方式:
gcc add_multithread.c
运行获取非同步结果:
$ time ./a.out 0
Created 16 add threads
All add threads finished.
The sum: 9960636 Method: 0
./a.out 0 2.00s user 0.01s system 1037% cpu 0.193 total
这个由于没有控制变量访问,所以结果不对。
运行获取原子变量结果:
time ./a.out 1
Created 16 add threads
All add threads finished.
The sum: 80000000 Method: 1
./a.out 1 15.90s user 0.03s system 1141% cpu 1.396 total
这个结果是对的,每个线程跑500w 自增操作,有16个线程,结果是8000w,耗时1.396 秒。
运行获取互斥量加锁结果:
time ./a.out 2
Created 16 add threads
All add threads finished.
The sum: 80000000 Method: 2
./a.out 2 19.04s user 49.10s system 1102% cpu 6.182 total
这个结果也是对的,但是耗时比原子操作多很多。
汇编代码
通过 Compiler Explorer (godbolt.org) 查看该代码的汇编结果。
可以发现,原子操作对应的汇编是一个带lock前缀的add指令。
而mutex相关的汇编是两个系统调用,所以这个方式的耗时比原子操作多。
跨进程的原子操作
根据原子操作的理论,如果两个进程通过共享内存的方式共享变量,在这个共享变量上执行原子操作,理论上可以进行跨进程的同步。
在写代码之前,我们先创建个共享内存。当然也可以写代码创建,但是没必要:)
ipcmk -- shmem 4
lsipc - m
KEY ID PERMS OWNER SIZE NATTCH STATUS CTIME CPID LPID COMMAND
0xd4f960f4 1 rw-r--r-- sss 4B 0 23:17 3428 0
现在搞一个对共享内存进行操作的代码,add_multithread_ipc.c
内容如下:
#include < stdio.h>
#include < stdlib.h>
#include < pthread.h>
#include < unistd.h>
#include < sys/types.h>
#include < sys/syscall.h>
#include < sys/shm.h>
#define WORK_SIZE 50000000
#define WORKER_COUNT 16
pthread_t g_tWorkerID[ WORKER_COUNT] ;
int * shard_mem_int = NULL ;
void * add_worker ( void * param )
{
int i = 0 ;
for ( i = 0 ; i < WORK_SIZE; ++ i)
{
__sync_fetch_and_add ( shard_mem_int, 1 ) ;
}
return NULL ;
}
int main ( int argc , const char * argv [ ] )
{
int shm_id = 0 ;
if ( argc == 2 )
{
shm_id = atoi ( argv[ 1 ] ) ;
}
else
{
printf ( " Help:\n " ) ;
printf ( " args: shm_id \n " ) ;
}
shard_mem_int = shmat ( shm_id, 0 , 0 ) ;
int i = 0 ;
for ( i = 0 ; i < WORKER_COUNT; ++ i)
{
pthread_create ( & g_tWorkerID[ i] , NULL , add_worker, NULL ) ;
}
printf ( " Created %d add threads\n " , i) ;
for ( int i = 0 ; i < WORKER_COUNT; ++ i)
{
pthread_join ( g_tWorkerID[ i] , NULL ) ;
}
printf ( " All add threads finished.\n " ) ;
printf ( " The sum: %d shared mem id: %d \n " , * shard_mem_int, shm_id) ;
shmdt ( shard_mem_int ) ;
return 0 ;
}
编译:
gcc add_multithread_ipc.c
运行:
这里我们假设在两个终端运行两个进程,并且共享内存的id是1。
time ./a.out 1
Created 16 add threads
All add threads finished.
The sum: 1542497496 shared mem id: 1
./a.out 1 170.61s user 0.09s system 614% cpu 27.767 total
time ./a.out 1
Created 16 add threads
All add threads finished.
The sum: 1600000000 shared mem id: 1
./a.out 1 172.11s user 0.13s system 638% cpu 26.977 total
可以观察到进程一的结果不是16亿,这是由于进程一先结束导致的,他无法观察到最后结果。
进程二的结果是16亿,符合预期。
在运行过程中,多次实验可能会导致整形溢出。所以提供两个小工具,用于查询当前共享内存的值与重置共享内存。
查询共享内存的值read_ipc.c
代码如下:
#include < stdio.h>
#include < stdlib.h>
#include < pthread.h>
#include < unistd.h>
#include < sys/types.h>
#include < sys/syscall.h>
#include < sys/shm.h>
int main ( int argc , const char * argv [ ] )
{
int shm_id = 0 ;
if ( argc == 2 )
{
shm_id = atoi ( argv[ 1 ] ) ;
}
else
{
printf ( " Help:\n " ) ;
printf ( " args: shm_id \n " ) ;
}
int * shard_mem_int = NULL ;
shard_mem_int = shmat ( shm_id, 0 , 0 ) ;
printf ( " The value: %d shared mem id: %d \n " , * shard_mem_int, shm_id) ;
shmdt ( shard_mem_int ) ;
return 0 ;
}
编译: gcc read_ipc.c -o read_ipc
运行: read_ipc 1
重置共享内存的reset_ipc.c
代码如下:
#include < stdio.h>
#include < stdlib.h>
#include < pthread.h>
#include < unistd.h>
#include < sys/types.h>
#include < sys/syscall.h>
#include < sys/shm.h>
int main ( int argc , const char * argv [ ] )
{
int shm_id = 0 ;
if ( argc == 2 )
{
shm_id = atoi ( argv[ 1 ] ) ;
}
else
{
printf ( " Help:\n " ) ;
printf ( " args: shm_id \n " ) ;
}
int * shard_mem_int = NULL ;
shard_mem_int = shmat ( shm_id, 0 , 0 ) ;
* shard_mem_int = 0 ;
printf ( " reset shared mem id: %d as %d \n " , shm_id, * shard_mem_int) ;
shmdt ( shard_mem_int ) ;
return 0 ;
}
编译: gcc reset_ipc.c -o read_ipc
运行: reset_ipc 1
实验结束,删除共享内存:
ipcrm - m 1
参考
linux的C使用pthread_mutex互斥锁和条件变量
Intel® 64 and IA-32 Architectures Software Developer Manuals
缓存一致性(Cache Coherency)入门
atomic实现原理 - 知乎 (zhihu.com)
shmat(3p) - Linux manual page (man7.org)