linux进程通信及示例

介绍linux进程通信的几种方式,以及每一种方式的适用场景。并给出了相应示例。
                              —— By Jihan


前言

需要一定linux的使用背景以及C语言基础。
什么是进程通信:
进程间通信(InterProcess Communication, IPC)是指在不同进程之间传播或交换信息。
IPC的方式通常有管道(包括无名管道和命名管道)、消息队列、信号量、共享内存、Socket、Streams等。其中 Socket和Streams支持不同主机上的两个进程IPC。本文只针对经典的IPC,即:管道、消息队列、信号量及共享内存。

进程通信

管道

我们常说的管道,都指匿名管道,FIFO通常会用命名管道一词
管道的使用会存以下两个限制:

  1. 管道的通信都是半双工的(即数据只能在一个方向流动)
  2. 管道只能在具有公共祖先的两个进程之间通信。通常一个管道进程fork出子进程后,管道就能在父子进程之间通信了。

管道创建函数:

1
2
3
#include <unistd.h>

int pipe(int fd[2]);

fd[0]为读打开,即输出,fd[1]为写打开,即为输入。一个典型的工作场景:
即进程先调用pipe,再进行fork

然后我们根据需要各自关闭父子进程的读/写通道。

示例

我们在程序中需要执行一个命令,并且获得命令输出的结果。比如我们要执行date命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <stdio.h>
#include <sys/wait.h>
#include <unistd.h>

#define RUN_CMD "/usr/bin/date"

int main(int argc, char *argv[]){
int fd[2];
int pid;
char output[128] = {0};

if (pipe(fd) < 0 ){
printf("pipe error");
return -1;
}

pid = fork();
if (pid <0){
printf("fork error");
return -1;
}else if (pid > 0){ //parent
close(fd[1]);
if (read(fd[0], output, sizeof(output)-1) < 0){
printf("read error");
return -1;
}
printf("Parent:\n%s", output);
}else{ //child
close(fd[0]);
dup2(fd[1], STDOUT_FILENO);
execv(RUN_CMD, argv);
}
return 0;
}

执行结果:

1
2
3
$ ./main 
Parent:
Wed Aug 3 16:10:57 CST 2022

execv函数参考
当然上述功能也可以用popen函数实现。更加简单。

适用场景

由于管道的限制,常常用于父进程和子进程之间的通信。
最为常见的应用就是shell中使用的|管道符了。

命名管道

命名管道(FIFO),相比于管道来说,他的限制只有一个:

  1. 通信都是半双工的(即数据只能在一个方向流动)

这就意味着它能用于不相干的进行之间进行数据通信。
FIFO本质上就是一种文件类型,可通过查看文件的stat结构中的st_mode编码得知。比如FIFO文件,我们执行stat file.name会看到fifo字样。
FIFO创建函数(有点类似文件创建):

1
2
3
4
5
#include <sys/stat.h>

int mkfifo (const char *__path, __mode_t __mode);
int mkfifoat (int __fd, const char *__path, __mode_t __mode);
//函数返回值成功为0,错误为-1

mkfifo函数与open函数类似。
mkfifoat函数则用来在fd文件描述符表示的目录相关的位置创建一个FIFO
函数具体参数可参考man手册

我们也可以可以通过mkfifo命令来创建FIFO文件
同样,对FIFO的读写操作,类似于对文件的读写。
open一个FIFO文件时,可指定是否阻塞:O_NONBLOCK标记。

  • 不指定:读/写都会阻塞,直到有对应的写/读到来
  • 指定:如果读/写时没有对应的写/读,会返回相应的错误码

如果是多个进程进行写操作,为了防止写入数据交叉,需要设置操作原子性,PIPE_BUF就是对应原子性数据的大小。
我们常常遇到的一种情况:

示例

我们创建两个子进程写入数据,父进程读出数据(当然不一定要父子进程):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <stdio.h>
#include <string.h>

const char fifo_file[] = "/tmp/fifo_file";

int main(int argc, char *argv[]){
int fd = 0;
int pid1=0, pid2=0;
char buf[128] = {0};
mkfifo(fifo_file,O_CREAT |O_RDWR);
pid1 = fork();
if (pid1 > 0 ) {
pid2 = fork();
}
if (pid1 > 0 && pid2 > 0 ){ //parent
// 读写方式打开,read函数则会阻塞,一直等待数据来临
// 只读方式打开,则子进程写完(也可能没写完),read函数返回0值,退出
//fd = open(fifo_file, O_RDWR);
fd = open(fifo_file, O_RDONLY);
//确保2个进程都写完
int status;
waitpid(pid1,&status,0);
waitpid(pid2,&status,0);
while (read(fd, buf, sizeof(buf) - 1) > 0)
{
printf("read:%s", buf);
memset(buf, 0, sizeof(buf));
}
}else if(pid1 < 0 || pid2 < 0) { //error
printf("fork error");
return -1;
}else{ //children
fd = open(fifo_file, O_WRONLY);
sprintf(buf, "hello: %d, %d\n",pid1, pid2);
write(fd, buf, strlen(buf));
}
return 0;
}

执行结果:

1
2
3
$ ./main 
read:hello: 0, 0
hello: 17967, 0

也可以用命令行达到上面的效果:

1
2
3
4
5
6
7
8
9
$ echo "hello1" >> fifo &
[1] 30040
$ echo "hello2" >> fifo &
[2] 30041
$ cat fifo
hello1
hello2
[1]- Done echo "hello1" >> fifo
[2]+ Done echo "hello2" >> fifo

注意,使用命令的时候,都是阻塞的,读写不需同时存在,否则会阻塞。

适用场景
  1. 数据缓冲,缓存来不及处理的数据。
  2. 时钟域隔离。
  3. 用于不同宽度的数据接口。

后两种可参考网上解释

消息队列

消息队列是消息的链接表,存储在内核中,有消息队列表示符标识。
意味着内核里有现成的消息队列,让你可直接使用。
msgget则是创建或者打开一个现有队列。msgget详解

1
2
3
#include <sys/msg.h>

int msgget (key_t __key, int __msgflg);

每个队列都包含一个msqid_ds的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct msqid_ds
{
struct ipc_perm msg_perm; /* structure describing operation permission */
__time_t msg_stime; /* time of last msgsnd command */
#ifndef __x86_64__
unsigned long int __unused1;
#endif
__time_t msg_rtime; /* time of last msgrcv command */
#ifndef __x86_64__
unsigned long int __unused2;
#endif
__time_t msg_ctime; /* time of last change */
#ifndef __x86_64__
unsigned long int __unused3;
#endif
__syscall_ulong_t __msg_cbytes; /* current number of bytes on queue */
msgqnum_t msg_qnum; /* number of messages currently on queue */
msglen_t msg_qbytes; /* max number of bytes allowed on queue */
__pid_t msg_lspid; /* pid of last msgsnd() */
__pid_t msg_lrpid; /* pid of last msgrcv() */
__syscall_ulong_t __unused4;
__syscall_ulong_t __unused5;
};

我们可以通过msgctl函数获取或设置msqid_ds操作

1
2
3
#include <sys/msg.h>

int msgctl (int __msqid, int __cmd, struct msqid_ds *__buf);

可以使用msgsnd来添加消息到队列尾,使用msgrcv获取消息,当然不一定要先入先出,也可以按照消息类型来。

1
2
3
4
5
6
#include <sys/msg.h>

int msgsnd (int __msqid, const void *__msgp, size_t __msgsz,
int __msgflg);//成功:0,失败:-1
ssize_t msgrcv (int __msqid, void *__msgp, size_t __msgsz,
long int __msgtyp, int __msgflg);//成功:接收字符长度,失败:-1

同样可以通过设置__msgflg来确定是阻塞还是非阻塞IO操作。
msgsnd中的__msgp可以是如下结构:

1
2
3
4
struct mymesg {
long msgtyp;
char mtext[512]; //对应着__msgsz的大小
}

其中队列的最大消息数是根据最大队列数和最大数据量来决定的。可以通过msqid_ds中的msg_qbytes得知

示例

还是常见示例,做一个父子进程的消峰值处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <stdio.h>
#include <sys/msg.h>
#include <sys/wait.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define msg_bytes 512

typedef struct
{
long type;
char text[msg_bytes];
} msg;


int main(int argc, char *argv[]){
int pid, pid1=0, pid2=0;
int ret,msgid;

msgid = msgget(IPC_PRIVATE,0666);
if (msgid < 0){
printf("msgget error\n");
return -1;
}
pid1 = fork();
if (pid1 > 0 ) {
pid2 = fork();
}
if (pid1 > 0 && pid2 > 0 ){ //parent
pid = getpid();
msg tmp;
//阻塞方式
//while ((ret = msgrcv(msgid, (void *)&tmp, msg_bytes, 1, 0)) > 0){
//非阻塞方式
while ((ret = msgrcv(msgid, (void *)&tmp, msg_bytes, 1, IPC_NOWAIT))){
if ((ret < 0) && (errno != ENOMSG)) {
break;
}
printf("%d, %d read:%s\n", pid, ret, tmp.text);
memset(&tmp.text, 0, sizeof(tmp.text));
sleep(2);
}
printf("msg receive over. %s\n", strerror(errno));
int status;
waitpid(pid1,&status,0);
waitpid(pid2,&status,0);
printf("parent process[%d] exit\n", pid);
}else if(pid1 < 0 || pid2 < 0) { //error
printf("fork error\n");
return -1;
}else{ //children
int i;
pid = getpid();
msg tmp = {.type = 1};
for (i = 0; i < 5;i++){
tmp.type = 1;
sprintf(tmp.text, "hello: %d, N:%d\n", pid, i);
if ((ret = msgsnd(msgid, &tmp, msg_bytes, 0)) < 0){
printf("msg send error:%s\n",strerror(errno));
return -1;
}
memset(&tmp.text, 0, sizeof(tmp.text));
sleep(1);
}
printf("child process[%d] exit\n", pid);
}
return 0;
}
适用场景

从速度上来说,消息队列unix sock并没有太大差别,消息队列的主要用途:

  1. 应用解耦
  2. 异步消息
  3. 流量削锋

信号量

和之前的IPC不同,信号量是一个计数器,用于为多个进程提供共享数据对象的访问。

示例
适用场景

共享内存

简介
示例
适用场景

各个进程通信比较

进程对于共享资源的访问互斥方式

  1. 信号量,就是标记共享资源还有多少可使用,使用了就-1。本质上就是存在于内核的一个计数器,可以通过semget系列函数来获取信号量。
  2. 使用记录锁,即创建一个空文件,并用改文件的第一个字节(不一定存在)作为锁字节。获取和释放资源时会进行写锁和释放锁。进程终止时,内核会自动释放该锁。
  3. 互斥量,需要将共享资源加载到内存,互斥量在文件的相同偏移出初始化互斥量。

区别和比较

性能:信号量和记录锁的性能差别不大,互斥量则有量级的性能提升。
这里通常使用的还是记录锁,信号量相比记录锁操作更为繁琐,而互斥量的稳定性和适配则不如记录锁。除非异常追求性能,不然通常都选择记录锁。

参考链接

Linux 进程间通信(IPC)—大总结

-------------本文结束感谢您的阅读-------------