基于文件的进程间通信——用100个进程抢算累加和

需求描述

  1. 设置一个并发度INS,表示要开的进程数量
  2. 使用这INS个进程,计算从startend之间的数字累加和
  3. startend通过getopt解析命令行参数获取
./a.out -s 12 -e 24
  1. 输出一个整型结果:sum

[注意]

  • 主要涉及文件及进程相关操作
  • 使用文件进行数据共享,需要考虑数据竞争(data race)
  • 尝试使用文件锁来模拟线程之间的互斥锁
  • 通过文件锁实现临界数据(多个进程或线程竞争修改的数据)的同步访问
  • 需要学习flock:man 2 flock

最终结果

  • 用100个进程算1到1000的累加和,效果截取如下:
    • 图片
    • 图片
    • 成功让进程们在同一块数据上抢着计算累加和

实现过程

思路流程图

  • 图片
  • 把握好父进程和子进程的任务
  • 关键:多进程访问同一文件的加锁操作,让读写数据成为“原子操作” [不可分割的最小单位]
    • 可以理解为原子操作,但是本质上只是让数据的读写过程完整
    • 进程可能因为时间片用完而中断,但因为锁的存在,此时其它进程还无法访问这些数据

获取命令行参数

捕获-s、-e选项,使用该选项必须带参数

#include "head.h"
int main(int argc, char **argv) {
    int opt, start = 0, end = 0;
    while ((opt = getopt(argc, argv, "s:e:")) != -1) {
        switch (opt) {
            case 's':
                start = atoi(optarg);  // atoi: 字符串->整数
                break;
            case 'e':
                end = atoi(optarg);
                break;
            default:
                fprintf(stderr, "Usage : %s -s start_num -e end_num\n", argv[0]);
                exit(1);
        }
    }
    printf("start = %d\nend = %d\n", start, end);
    return 0;
}
  • 头文件"head.h"见末尾
  • atoi:字符串👉整数,optarg是字符数组
  • 效果如下:
    • 图片
    • 🆗

创建INS个进程

使用fork创建INS个进程,注意使用wait防止僵尸进程的产生

#define INS 100
pid_t pid;
int x = 0;        // x: 第几号进程
for (int i = 1; i <= INS; i++) {
    if ((pid = fork()) < 0) {
        perror("fork");
        exit(1);  // 只是图方便,工作中不如此操作
    }
    if (pid == 0) {
        x = i;    // 给子进程编号
        break;    // 关键,否则会不断套娃
    }
}
if (pid != 0) {
    // 防止产生僵尸进程 [等待完所有的子进程]
    for (int i = 1; i <= INS; i++) {
        wait(NULL);
    }
    // 父进程
    printf("I'm parent!\n");  
} else {
    printf("I'm %dth child!\n", x);
}
  • 该段代码放在主函数中获取命令行参数后
  • INS定义为宏
  • 子进程创建失败直接exit(1),是为了方便,工作中忌
  • 效果如下:
    • 图片
    • 成功创建100个子进程

基于文件的数据读写接口

使用文件作为进程之间共享数据的载体

  • 如何在文件中存储数据?ASCII码 [字符]、int [低16位+高16位]...
  • 这里使用结构体存储数据,结构清晰
    • 图片
    • 存储加数、和数
char data_file[] = "./.data";
char lock_file[] = "./.lock";  // [可选] 设置专门的一把锁
// 要传递的数据
struct Msg {
    int now;                   // 加数
    int sum;                   // 和
};
struct Msg data;               // 结构体数据
// 写入结构体数据
size_t set_data(struct Msg *msg) {
    FILE *f = fopen(data_file, "w");                         // 写
    if (f == NULL) {
        perror("fopen");
        return -1;                                          // 在一个小函数里面exit过于粗鲁
    }
    size_t nwrite = fwrite(msg, 1, sizeof(struct Msg), f);  // 每次写1个字节
    fclose(f);
    return nwrite;             // 返回的是成功写的字节数,如果出错也返回给上层
}
// 读取结构体数据
size_t get_data(struct Msg *msg) {
    FILE *f = fopen(data_file, "r");
    if (f == NULL) {
        perror("fopen");
        return -1;
    }
    size_t nread = fread(msg, 1, sizeof(struct Msg), f);    // 读入结构体数据到msg中
    fclose(f);
    return nread;
}
  • 创建全局变量data,用来在进程中操作数据
  • 利用标准文件操作,底层文件操作也可行
  • 返回值可供调用者检查读写成功与否

加入锁⭐

让进程抢着维护共享数据,并保护数据文件不被同时操作

【两种思路】使用一个文件;使用两个文件

  • 思路一:直接对数据文件加锁
char data_file[] = "./.data";
// 做加法 [原子操作:读 + 写];end:加法停止条件;id:孩子的编号 [可用上帝视角监控]
void do_add(int end, int id) {
    // 孩子一直在里面做加法
    while (1) {
        /*
         * 思路一:一个文件,直接在数据文件加锁
         */
        // 打开data_file用来加锁
        FILE *f = fopen(data_file, "r");
        // 加互斥锁
        flock(f->_fileno, LOCK_EX);
        // 从文件读取数据 [get_data函数里会再次打开data_file文件,对应新的fd,锁不共用]
        if (get_data(&data) < 0) continue;
        // 加数+1,并判断加数是否超过范围
        if (++data.now > end) {
            fclose(f);
            break;
        }
        // 做加法
        data.sum += data.now;
        printf("The <%d>th Child : now = %d, sum = %d\n", id, data.now, data.sum);
        // 将数据写入文件
        if (set_data(&data) < 0) continue;
        // 解锁 [后面关闭其实也会自动释放锁]
        flock(fileno(f), LOCK_UN);
        fclose(f);
    }
}
  • 函数参数:end作为加法的停止条件参照,id可用来观察每次加法是哪个孩子算的
  • 加锁 👉 解锁中间的过程就是原子操作[不可分割的最小单位]
    • 封装了读数据、做计算、写数据操作,过程中数据不会被抢占
  • 由文件指针FILE* f获得文件描述符fd
    • ① f->_fileno
    • ② fileno(f)
  • [PS]
    • 重复打开一个文件会得到不同的文件描述符,锁也相互独立
    • 文件关闭会自动释放锁
    • 在每次调用读写接口后,利用好返回值判断操作成功与否
  • 思路二:设置专门的文件加锁
char data_file[] = "./.data";
char lock_file[] = "./.lock";  // 设置专门的一把锁
void do_add(int end, int id) {
    while (1) {
        /*
         * 思路二:两个文件,使用单独的文件作为锁 [更易理解]
         */
        // 打开或创建一个锁文件;如果文件已加锁,将等待使用者解锁
        FILE *lock = fopen(lock_file, "w");  // "w":如果不存在文件,会创建一个
        if (lock == NULL) {
            perror("fopen");
            exit(1);
        }
        // 加锁
        flock(lock->_fileno, LOCK_EX);
        // 从文件读取数据
        if (get_data(&data) < 0) {
            fclose(lock);                    // 关闭锁文件,释放锁
            continue;
        }
        // 加数+1,并判断是否满足停止加的条件
        if (++data.now > end) {
            fclose(lock);
            break;
        }
        // 做加法
        data.sum += data.now;
        printf("The <%d>th Child : now = %d, sum = %d\n", id, data.now, data.sum);
        // 将数据写入文件
        if (set_data(&data) < 0) continue;
        // 解锁
        flock(lock->_fileno, LOCK_UN);
        fclose(lock);
    }
}
  • lock_file就是单纯为了做锁
  • 效果如下:【单核,5个进程,计算1~100】
    • 图片
    • 图片
    • 单核的效果比多核更整齐
      • 单核一次只能运行一个进程
      • 可以使用usleep()提前挂起进程,不让一个进程计算那么久,让顺序更乱
    • 若将输出传给more,它会将输出按进程区分,重新排列展示
  • 【注意】
    • 在主函数中,将数据初始值先写入文件,否则文件为空 [详见完整代码]
    • 在主函数,子进程逻辑中调用do_add()函数即可,父进程逻辑中在等待所有子进程结束后从数据文件获取并输出最终结果即可
  • ❗ 如果不加锁,结果仍然对
    • 加数和和数是包装在一起,加法不会出错
    • 但每个进程都会完整地算一遍结果,可能是缓冲区的缘故?不是
      • 在所有写操作后都加入fflush,尽管有一些接着算的情况,但是每个进程都还是会跑到最后的正确结果
      • 相当于有进程算完了,数据写入文件,但是另一个进程读取的还不是最新的数据,还会自己再算一遍累加
    • 解释
      • 多个进程打开同一文件,每个进程都有它自己的文件表项(file对象),包含它自己的文件位移量
      • 所以对于多个进程读同一文件都能正确工作,但写同一文件可能产生预期不到的结果,可参考使用pread、pwrite
      • 还可参考Linux下多进程同时操作文件——cnblogs

完整代码

sum.c

#include "head.h"
#define INS 100
char data_file[] = "./.data";
char lock_file[] = "./.lock";  // [可选] 设置专门的一把锁
// 要传递的数据
struct Msg {
    int now;                   // 加数
    int sum;                   // 和
};
struct Msg data;               // 结构体数据
// 写入结构体数据
size_t set_data(struct Msg *msg) {
    FILE *f = fopen(data_file, "w");                         // 写
    if (f == NULL) {
        perror("fopen");
        return -1;                                          // 在一个小函数里面exit过于粗鲁
    }
    size_t nwrite = fwrite(msg, 1, sizeof(struct Msg), f);  // 每次写1个字节
    fclose(f);
    return nwrite;             // 返回的是成功写的字节数,如果出错也返回给上层
}
// 读取结构体数据
size_t get_data(struct Msg *msg) {
    FILE *f = fopen(data_file, "r");
    if (f == NULL) {
        perror("fopen");
        return -1;
    }
    size_t nread = fread(msg, 1, sizeof(struct Msg), f);    // 读入结构体数据到msg中
    return nread;
}
// 做加法 [原子操作:读 + 写];end:加法停止条件;id:孩子的编号 [可用上帝视角监控]
void do_add(int end, int id) {
    // 孩子一直在里面做加法
    while (1) {
        /*
         * 思路二:两个文件,使用单独的文件作为锁 [更易理解]
         */
        // 打开或创建一个锁文件;如果文件已加锁,将等待使用者解锁
        FILE *lock = fopen(lock_file, "w");  // "w":如果不存在文件,会创建一个
        if (lock == NULL) {
            perror("fopen");
            exit(1);
        }
        // 加锁
        flock(lock->_fileno, LOCK_EX);
        // 从文件读取数据
        if (get_data(&data) < 0) {
            fclose(lock);                    // 关闭锁文件,释放锁
            continue;
        }
        // 加数+1,并判断是否满足停止加的条件
        if (++data.now > end) {
            fclose(lock);
            break;
        }
        // 做加法
        data.sum += data.now;
        printf("The <%d>th Child : now = %d, sum = %d\n", id, data.now, data.sum);
        // 将数据写入文件
        if (set_data(&data) < 0) continue;
        // 解锁
        flock(lock->_fileno, LOCK_UN);
        fclose(lock);
        /*
         * 思路一:一个文件,直接在数据文件加锁
         */
        /*
        // 打开data_file用来加锁
        FILE *f = fopen(data_file, "r");
        // 加互斥锁
        flock(f->_fileno, LOCK_EX);
        // 从文件读取数据 [get_data函数里会再次打开data_file文件,对应新的fd,锁不共用]
        if (get_data(&data) < 0) continue;
        // 加数+1,并判断加数是否超过范围
        if (++data.now > end) {
            fclose(f);
            break;
        }
        // 做加法
        data.sum += data.now;
        printf("The <%d>th Child : now = %d, sum = %d\n", id, data.now, data.sum);
        // 将数据写入文件
        if (set_data(&data) < 0) continue;
        // 解锁 [后面关闭其实也会自动释放锁]
        flock(fileno(f), LOCK_UN);
        fclose(f);
        */
    }
}
int main(int argc, char **argv) {
    int opt, start = 0, end = 0;
    while ((opt = getopt(argc, argv, "s:e:")) != -1) {
        switch (opt) {
            case 's':
                start = atoi(optarg);       // atoi: 字符串->整数
                break;
            case 'e':
                end = atoi(optarg);
                break;
            default:
                fprintf(stderr, "Usage : %s -s start_num -e end_num\n", argv[0]);
                exit(1);
        }
    }
    printf("start = %d\nend = %d\n", start, end);
    // 先将初始数据写入文件
    if (set_data(&data) < 0) return -1;     // data为全局变量,成员默认均为0
    pid_t pid;
    int x = 0;                              // x: 第几号进程
    for (int i = 1; i <= INS; i++) {
        if ((pid = fork()) < 0) {
            perror("fork");
            exit(1);                        // 只是图方便,工作中不如此操作
        }
        if (pid == 0) {
            x = i;                          // 给子进程编号
            break;                          // 关键,否则会不断套娃
        }
    }
    if (pid != 0) {
        // 防止产生僵尸进程 [等待完所有的子进程]
        for (int i = 1; i <= INS; i++) {
            wait(NULL);
        }
        if (get_data(&data) < 0) return -1; // 获得最终结果
        printf("sum = %d\n", data.sum);
    } else {
        do_add(end, x);                     // 子进程唯一的事
    }
    return 0;
}

head.h

#ifndef _HEAD_H
#define _HEAD_H
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <sys/file.h>
#endif
  • 可能有多余的头文件,不是重点

参考