26

lxcfs容器隔离技术实现原理分析之loadavg、cpuonline

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzUxMTcwOTM4Mg%3D%3D&%3Bmid=2247486074&%3Bidx=1&%3Bsn=c7c76dde5189abe3538cd5daef7b750f
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

奇技指南

我们知道runc没有做到完全隔离/proc、/sys路径下的文件,所以容器内通过top、free等命令看到的数据都是物理机上的。利用lxcfs可以实现将容器内/proc、/sys文件与物理机隔离,让top等命令显示容器内真实数据。本文将来详细介绍一下。

本文转载自360云计算

lxcfs是什么

我们知道runc没有做到完全隔离/proc、/sys路径下的文件,所以容器内通过top、free等命令看到的数据都是物理机上的。对于习惯了虚机,物理机的同学来说不太友好,而且这些命令似乎也失去了本质意义。 lxcfs作用就是将容器内/proc、/sys文件与物理机隔离,让top等命令显示容器内真实数据。

说明

lxcfs是以用户空间文件系统(Filesystem in Userspace)为基础,以cgroup技术实现的用户空间的虚拟文件系统。先对fuse和cgroup有大致了解,看本文效果更好些。本文不介绍lxcfs的安装及使用,网上不乏这样的好文章。我们主要介绍下lxcfs对cpuonline、loadavg的现实,这两部分弄懂,其它也大体相同。

容器中读取lxcfs文件系统

lxcfs程序启动时会指定一个路径(如下图是/var/lib/lxcfs)作为挂载点,以后读取这个路径的下文件(cgroup、proc、sys)vfs都会调用内核fuse,fuse回调lxcfs实现的文件操作函数。容器内读取lxcfs文件系统中的数据时,通过gblic系统调用vfs接口然后转到fuse内核模块,内核模块fuse回调lxcfs程序中实现的回调函数,获取容器的cgroup,然后去宿主机对应cgroup下读取并计算后得到容器的实际mem、cpu等信息。lxcfs将物理机的cgroups挂载到运行时环境/run/lxcfs/controllers,但直接在物理机上看不见,因为程序中用unshare做了mounts namespace隔离。lxcfs程序中所有的cgroups信息都从/run/lxcfs/controllers下获得。

6nAF7ju.jpg!web

源码

因为工作中正好需要这两部分,所以主要介绍下cpuonline和loadavg的实现。nginx、java等程序根据cpu核心数启动相应个数的进程,cpuonline是相关系统调用的数据来源。没隔离导致的容器内获取到cpu核数是物理机的,本应该创建2个进程,实际却创建40个(容器2c,物理机40c),由于更多的上下文切换导致明显的性能下降。loadavg目前没看到有关的分析,这里也简单介绍下。

隔离效果

物理机40c128g

1、cpuonline

物理机

容器2c4g

zEvque3.jpg!web

2、loadavg

物理机

veAzEfM.jpg!web

容器2c4g

nuyAjaj.jpg!web

可以看到cpuonline和load average都已经隔离

实现分析

注:cgropu的各个controller文件在main函数执行前打开,保存在fd_hierarchies中,后面使用直接掉openat,不是每次都要open、close文件。通过c语言的attribute((constructor)) 属性,声明collect_and_mount_subsystems这个函数

看下collect_and_mount_subsystems

static void __attribute__((constructor)) collect_and_mount_subsystems(void)
{
FILE *f;
char *cret, *line = NULL;
char cwd[MAXPATHLEN];
size_t len = 0;
int i, init_ns = -1;
bool found_unified = false;

if ((f = fopen("/proc/self/cgroup", "r")) == NULL) {
lxcfs_error("Error opening /proc/self/cgroup: %s\n", strerror(errno));
return;
}
// 读取宿主机上namespaces controller保存到hierarchies
while (getline(&line, &len, f) != -1) {
......
if (!store_hierarchy(line, p))
goto out;
}

/* Preserve initial namespace. */
init_ns = preserve_mnt_ns(getpid());
if (init_ns < 0) {
lxcfs_error("%s\n", "Failed to preserve initial mount namespace.");
goto out;
}

fd_hierarchies = malloc(sizeof(int) * num_hierarchies);
if (!fd_hierarchies) {
lxcfs_error("%s\n", strerror(errno));
goto out;
}

for (i = 0; i < num_hierarchies; i++)
fd_hierarchies[i] = -1;

cret = getcwd(cwd, MAXPATHLEN);
if (!cret)
lxcfs_debug("Could not retrieve current working directory: %s.\n", strerror(errno));

/* This function calls unshare(CLONE_NEWNS) our initial mount namespace
* to privately mount lxcfs cgroups. */

// 关键是这里,将cgroup下各个控制模块,挂载到lxcfs进程的自由的mount ns下(/run/lxcfs/container)
if (!cgfs_setup_controllers()) {
lxcfs_error("%s\n", "Failed to setup private cgroup mounts for lxcfs.");
goto out;
}
......
}
static bool cgfs_setup_controllers(void)
{
// 主要调用unshare 创建私有的mount ns
if (!cgfs_prepare_mounts())
return false;

if (!cgfs_mount_hierarchies()) {
lxcfs_error("%s\n", "Failed to set up private lxcfs cgroup mounts.");
return false;
}

if (!permute_root())
return false;

return true;
}
static bool cgfs_mount_hierarchies(void)
{
char *target;
size_t clen, len;
int i, ret;

for (i = 0; i < num_hierarchies; i++) {
char *controller = hierarchies[i];

clen = strlen(controller);
len = strlen(BASEDIR) + clen + 2;
target = malloc(len);
if (!target)
return false;

ret = snprintf(target, len, "%s/%s", BASEDIR, controller);
if (ret < 0 || ret >= len) {
free(target);
return false;
}
if (mkdir(target, 0755) < 0 && errno != EEXIST) {
free(target);
return false;
}
if (!strcmp(controller, "unified"))
ret = mount("none", target, "cgroup2", 0, NULL);
else
ret = mount(controller, target, "cgroup", 0, controller);
if (ret < 0) {
lxcfs_error("Failed mounting cgroup %s: %s\n", controller, strerror(errno));
free(target);
return false;
}
// 将所有cgroup controller 文件打开,保存文件描述符
fd_hierarchies[i] = open(target, O_DIRECTORY);
if (fd_hierarchies[i] < 0) {
free(target);
return false;
}
free(target);
}
return true;
}

lxcfs.c main中主要是解析命令行参数,并调用fuse提供的fuse_main函数将lxcfs相关的文件操作注册,并传入挂载点。

......
if (!fuse_main(nargs, newargv, &lxcfs_ops, opts))
......
const struct fuse_operations lxcfs_ops = {
.getattr = lxcfs_getattr,
.readlink = NULL,
.getdir = NULL,
.mknod = NULL,
.mkdir = lxcfs_mkdir,
.unlink = NULL,
.rmdir = lxcfs_rmdir,
.symlink = NULL,
.rename = NULL,
.link = NULL,
.chmod = lxcfs_chmod,
.chown = lxcfs_chown,
.truncate = lxcfs_truncate,
.utime = NULL,

.open = lxcfs_open,
.read = lxcfs_read,
.release = lxcfs_release,
.write = lxcfs_write,

.statfs = NULL,
.flush = lxcfs_flush,
.fsync = lxcfs_fsync,

.setxattr = NULL,
.getxattr = NULL,
.listxattr = NULL,
.removexattr = NULL,

.opendir = lxcfs_opendir,
.readdir = lxcfs_readdir,
.releasedir = lxcfs_releasedir,

.fsyncdir = NULL,
.init = NULL,
.destroy = NULL,
.access = lxcfs_access,
.create = NULL,
.ftruncate = NULL,
.fgetattr = NULL,
};

cpuonline

1、cpuonline信息在/sys/devices/system/cpu/路径下,lxcfs将对/sys(当然这里使用任何路径都可以)的操作注册到fuse

lxcfs.c:

const struct fuse_operations lxcfs_ops = {
......
.open = lxcfs_open,
.read = lxcfs_read,
.release = lxcfs_release,
.write = lxcfs_write,
......
}

static int lxcfs_read(const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
int ret;
if (strncmp(path, "/cgroup", 7) == 0) {
up_users();
ret = do_cg_read(path, buf, size, offset, fi);
down_users();
return ret;
}
if (strncmp(path, "/proc", 5) == 0) {
up_users();
ret = do_proc_read(path, buf, size, offset, fi);
down_users();
return ret;
}
if (strncmp(path, "/sys", 4) == 0) {
up_users();
ret = do_sys_read(path, buf, size, offset, fi);
down_users();
return ret;
}

return -EINVAL;
}

static int do_sys_read(const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
int (*sys_read)(const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi);
char *error;

dlerror(); /* Clear any existing error */
sys_read = (int (*)(const char *, char *, size_t, off_t, struct fuse_file_info *)) dlsym(dlopen_handle, "sys_read");
error = dlerror();
if (error != NULL) {
lxcfs_error("%s\n", error);
return -1;
}

return sys_read(path, buf, size, offset, fi);
}

文件操作相关代码(bindings.c、sysfs_fuse.c,cpuset.c)被封装成liblxcfs.so动态库,供lxcfs.c调用。上面do_sys_read通过dlsym获取liblxcfs.so动态库中的sys_read函数。

2、接着看下读cpuonline的过程

sysfs_fuse.c:

int sys_read(const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
struct file_info *f = (struct file_info *)fi->fh;

switch (f->type) {
//cpuonline 模块,type在open时设置,这里不做过多介绍,主要看下
//sys_devices_system_cpu_online_read函数的实现
case LXC_TYPE_SYS_DEVICES_SYSTEM_CPU_ONLINE:
return sys_devices_system_cpu_online_read(buf, size, offset, fi);
case LXC_TYPE_SYS_DEVICES:
case LXC_TYPE_SYS_DEVICES_SYSTEM:
case LXC_TYPE_SYS_DEVICES_SYSTEM_CPU:
default:
return -EINVAL;
}
}

static int sys_devices_system_cpu_online_read(char *buf, size_t size,
off_t offset,
struct fuse_file_info *fi)
{
//获取上下文信息,主要是读取cuponline进程(例如cat /sys/devices/system/cpu/online的cat进程,以下简称“调用进程”)的进程id
struct fuse_context *fc = fuse_get_context();
struct file_info *d = (struct file_info *)fi->fh;
char *cache = d->buf;
char *cg;
char *cpuset = NULL;
bool use_view;

int max_cpus = 0;
pid_t initpid;
ssize_t total_len = 0;

if (offset) {
if (!d->cached)
return 0;
if (offset > d->size)
return -EINVAL;
int left = d->size - offset;
total_len = left > size ? size : left;
memcpy(buf, cache + offset, total_len);
return total_len;
}

//获取容器中1号进程在物理机上的进程id;initpid返回为0时,说明调用进程是物理上的进程
initpid = lookup_initpid_in_store(fc->pid);
if (initpid <= 0)
initpid = fc->pid;
//获取容器1号进程的cgroup
//例如:docker/368adedeb87172d68388cee9818e873d73503a5b1d1d2a6b47fbd053f6d68601
cg = get_pid_cgroup(initpid, "cpuset");
if (!cg)
return read_file("/sys/devices/system/cpu/online", buf, size, d);
prune_init_slice(cg);

cpuset = get_cpuset(cg);
if (!cpuset)
goto err;
// 检查cpu、 cpuacct 控制器是否存在,不存在直接返回物理机cpuonine信息
use_view = use_cpuview(cg);

if (use_view)
// 获取容器真正可使用的cpu个数,如果容器没配置cpu quota(默认-1),则直接返回物理信息
max_cpus = max_cpu_count(cg);

if (max_cpus == 0)
return read_file("/sys/devices/system/cpu/online", buf, size, d);
if (max_cpus > 1)
total_len = snprintf(d->buf, d->buflen, "0-%d\n", max_cpus - 1);
else
total_len = snprintf(d->buf, d->buflen, "0\n");
if (total_len < 0 || total_len >= d->buflen) {
lxcfs_error("%s\n", "failed to write to cache");
return 0;
}

d->size = (int)total_len;
d->cached = 1;

if (total_len > size)
total_len = size;

memcpy(buf, d->buf, total_len);
err:
free(cpuset);
free(cg);
return total_len;
}
/*
* Return the maximum number of visible CPUs based on CPU quotas.
* If there is no quota set, zero is returned.
*/

int max_cpu_count(const char *cg)
{
int rv, nprocs;
int64_t cfs_quota, cfs_period;
int nr_cpus_in_cpuset = 0;
char *cpuset = NULL;
// 读取物理机上容器cpu的quota值
if (!read_cpu_cfs_param(cg, "quota", &cfs_quota))
return 0;
// 读取物理机上容器cpu的period值
if (!read_cpu_cfs_param(cg, "period", &cfs_period))
return 0;

cpuset = get_cpuset(cg);
if (cpuset)
nr_cpus_in_cpuset = cpu_number_in_cpuset(cpuset);

if (cfs_quota <= 0 || cfs_period <= 0){
if (nr_cpus_in_cpuset > 0)
return nr_cpus_in_cpuset;

return 0;
}

// 容器何用的cpu计算
rv = cfs_quota / cfs_period;

/* In case quota/period does not yield a whole number, add one CPU for
* the remainder.这里的意思是限制cpu为0.5和,视图效果为1核。1.5 即 2
*/

if ((cfs_quota % cfs_period) > 0)
rv += 1;

/*获取可用的cpu核数sysconf(_SC_NPROCESSORS_ONLN)*/
nprocs = get_nprocs();

if (rv > nprocs)
rv = nprocs;

/* use min value in cpu quota and cpuset */
if (nr_cpus_in_cpuset > 0 && nr_cpus_in_cpuset < rv)
rv = nr_cpus_in_cpuset;

return rv;
}
// 看下quota是怎么获取的
/*
* Read cgroup CPU quota parameters from `cpu.cfs_quota_us` or `cpu.cfs_period_us`,
* depending on `param`. Parameter value is returned throuh `value`.
*/

static bool read_cpu_cfs_param(const char *cg, const char *param, int64_t *value)
{
bool rv = false;
char file[11 + 6 + 1]; // cpu.cfs__us + quota/period + \0
char *str = NULL;

sprintf(file, "cpu.cfs_%s_us", param);

// 重点是这里
if (!cgfs_get_value("cpu", cg, file, &str))
goto err;
......
}
bool cgfs_get_value(const char *controller, const char *cgroup, const char *file, char **value)
{
int ret, fd, cfd;
size_t len;
char *fnam, *tmpc;
// 获取cpu controller文件描述符,到之前说过fd_hierarchies中查
tmpc = find_mounted_controller(controller, &cfd);
if (!tmpc)
return false;

/* Make sure we pass a relative path to *at() family of functions.
* . + /cgroup + / + file + \0
*/

len = strlen(cgroup) + strlen(file) + 3;
fnam = alloca(len);
ret = snprintf(fnam, len, "%s%s/%s", *cgroup == '/' ? "." : "", cgroup, file);
if (ret < 0 || (size_t)ret >= len)
return false;
// fd也就是 /run/lxcfs/controllers/cpu/docker/dockerid/cpu.cfs_quota_us
fd = openat(cfd, fnam, O_RDONLY);
if (fd < 0)
return false;
// 读值cfs_quota_us
*value = slurp_file(fnam, fd);
return *value != NULL;
}

loadavg

  • 平均负载的概念:平均负载是一段时间内 活跃task队列 的平均值,活跃进程指的是TASK_RUNNING, TASK_UNINTERRUPTIBLE状态的进程。内核计算loadavg的方式,感兴趣的同学可以看看源码。

  • loadavg和其他部分不太一样的是,lxcfs需要用daemon进程计算平均负载,因为我们需要的容器(也就是特定进程的cgroup)的平均负载,宿主机没有这部分数据。lxcfs用与内核完全相同的方式计算负载,所以loadavg的值还是相当准足准确的。 宿主机计算的平均负载是根据所有的task(进程、线程)计算得到,容器的平均负载是根据容器内的进程计算而得。

1、loadavg daemon分析

load daemon的调用流程:main-> start_loadavg-> load_daemon-> load_begin

load_begin就像注释写的一样,每5s遍历一次load哈希表,并更新负载值

/*
* Traverse the hash table and update it.
*/

void *load_begin(void *arg)
{

......
while (1) {
if (loadavg_stop == 1)
return NULL;

time1 = clock();
for (i = 0; i < LOAD_SIZE; i++) {
pthread_mutex_lock(&load_hash[i].lock);
if (load_hash[i].next == NULL) {
pthread_mutex_unlock(&load_hash[i].lock);
continue;
}
f = load_hash[i].next;
first_node = 1;
while (f) {
......
// 更新负载
sum = refresh_load(f, path);
if (sum == 0) {
f = del_node(f, i);
} else {
out: f = f->next;
}
free(path);
......
}

if (loadavg_stop == 1)
return NULL;

time2 = clock();
usleep(FLUSH_TIME * 1000000 - (int)((time2 - time1) * 1000000 / CLOCKS_PER_SEC));
}
}

主要分析下refresh_load

/*
* Return 0 means that container p->cg is closed.
* Return -1 means that error occurred in refresh.
* Positive num equals the total number of pid.
*/

static int refresh_load(struct load_node *p, char *path)
{
FILE *f = NULL;
char **idbuf;
char proc_path[256];
int i, ret, run_pid = 0, total_pid = 0, last_pid = 0;
char *line = NULL;
size_t linelen = 0;
int sum, length;
DIR *dp;
struct dirent *file;

do {
idbuf = malloc(sizeof(char *));
} while (!idbuf);
// 这里从/sys/fs/cgroup/cpu/docker/containerid/cgroup.procs to find the process pid.那容器内进程的pid
sum = calc_pid(&idbuf, path, DEPTH_DIR, 0, p->cfd);
/* normal exit */
if (sum == 0)
goto out;

for (i = 0; i < sum; i++) {
/*clean up '\n' */
length = strlen(idbuf[i])-1;
idbuf[i][length] = '\0';
ret = snprintf(proc_path, 256, "/proc/%s/task", idbuf[i]);
if (ret < 0 || ret > 255) {
lxcfs_error("%s\n", "snprintf() failed in refresh_load.");
i = sum;
sum = -1;
goto err_out;
}

dp = opendir(proc_path);
if (!dp) {
lxcfs_error("%s\n", "Open proc_path failed in refresh_load.");
continue;
}
// 遍历/proc/<pid>/task 目录(一个进程中创建的每个线程,/proc/<pid>/task 中会创建一个相应的目录),查找状态为R或者D的task
while ((file = readdir(dp)) != NULL) {
if (strncmp(file->d_name, ".", 1) == 0)
continue;
if (strncmp(file->d_name, "..", 1) == 0)
continue;
total_pid++;
/* We make the biggest pid become last_pid.*/
ret = atof(file->d_name);
last_pid = (ret > last_pid) ? ret : last_pid;

ret = snprintf(proc_path, 256, "/proc/%s/task/%s/status", idbuf[i], file->d_name);
if (ret < 0 || ret > 255) {
lxcfs_error("%s\n", "snprintf() failed in refresh_load.");
i = sum;
sum = -1;
closedir(dp);
goto err_out;
}
f = fopen(proc_path, "r");
if (f != NULL) {
while (getline(&line, &linelen, f) != -1) {
/* Find State */
if ((line[0] == 'S') && (line[1] == 't'))
break;
}
if ((line[7] == 'R') || (line[7] == 'D'))
run_pid++;
fclose(f);
}
}
closedir(dp);
}
/*Calculate the loadavg.*/
// 获取到活跃的task数量后,是时候表演真正的技术了(计算平均负载)。计算公式与内核一致:load(t) = load(t-1) e-5/60 + n (1 - e-5/60)
// 具体含义可以参考:[https://www.helpsystems.com/resources/guideshow-it-works](https://w/unix-load-average-part-1-ww.helpsystems.com/resources/guides/unix-load-average-part-1-how-it-works)

p->avenrun[0] = calc_load(p->avenrun[0], EXP_1, run_pid);
p->avenrun[1] = calc_load(p->avenrun[1], EXP_5, run_pid);
p->avenrun[2] = calc_load(p->avenrun[2], EXP_15, run_pid);
p->run_pid = run_pid;
p->total_pid = total_pid;
p->last_pid = last_pid;

free(line);
err_out:
for (; i > 0; i--)
free(idbuf[i-1]);
out:
free(idbuf);
return sum;
}

2、读取loadavg

负载计算明白了,读就简单了。这里要注意下的是,load_hash,哈希表中的数据是容器第一次读/proc/loadavg时插入的(毕竟没办法事先知道容器的进程cgroup)。

static int proc_loadavg_read(char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
struct fuse_context *fc = fuse_get_context();
struct file_info *d = (struct file_info *)fi->fh;
pid_t initpid;
char *cg;
size_t total_len = 0;
char *cache = d->buf;
struct load_node *n;
int hash;
int cfd, rv = 0;
unsigned long a, b, c;

if (offset) {
if (offset > d->size)
return -EINVAL;
if (!d->cached)
return 0;
int left = d->size - offset;
total_len = left > size ? size : left;
memcpy(buf, cache + offset, total_len);
return total_len;
}
if (!loadavg)
return read_file("/proc/loadavg", buf, size, d);

initpid = lookup_initpid_in_store(fc->pid);
if (initpid <= 0)
initpid = fc->pid;
cg = get_pid_cgroup(initpid, "cpu");
if (!cg)
return read_file("/proc/loadavg", buf, size, d);

prune_init_slice(cg);
hash = calc_hash(cg) % LOAD_SIZE;
// 根据cgroup在hash表查找node
n = locate_node(cg, hash);

/* First time */
// 第一读时,先把节点信息插到hash边
if (n == NULL) {
if (!find_mounted_controller("cpu", &cfd)) {
/*
* In locate_node() above, pthread_rwlock_unlock() isn't used
* because delete is not allowed before read has ended.
*/

pthread_rwlock_unlock(&load_hash[hash].rdlock);
rv = 0;
goto err;
}
do {
n = malloc(sizeof(struct load_node));
} while (!n);

do {
n->cg = malloc(strlen(cg)+1);
} while (!n->cg);
strcpy(n->cg, cg);
n->avenrun[0] = 0;
n->avenrun[1] = 0;
n->avenrun[2] = 0;
n->run_pid = 0;
n->total_pid = 1;
n->last_pid = initpid;
n->cfd = cfd;
insert_node(&n, hash);
}
// 第二次以后开始从daemon的计算结果中读取
a = n->avenrun[0] + (FIXED_1/200);
b = n->avenrun[1] + (FIXED_1/200);
c = n->avenrun[2] + (FIXED_1/200);
total_len = snprintf(d->buf, d->buflen, "%lu.%02lu %lu.%02lu %lu.%02lu %d/%d %d\n",
LOAD_INT(a), LOAD_FRAC(a),
LOAD_INT(b), LOAD_FRAC(b),
LOAD_INT(c), LOAD_FRAC(c),
n->run_pid, n->total_pid, n->last_pid);
pthread_rwlock_unlock(&load_hash[hash].rdlock);
if (total_len < 0 || total_len >= d->buflen) {
lxcfs_error("%s\n", "Failed to write to cache");
rv = 0;
goto err;
}
d->size = (int)total_len;
d->cached = 1;

if (total_len > size)
total_len = size;
memcpy(buf, d->buf, total_len);
rv = total_len;

err:
free(cg);
return rv;
}

参考文章

  • https://www.helpsystems.com/resources/guides/unix-load-average-part-1-how-it-works

  • https://github.com/libfuse/libfuse

  • https://github.com/lxc/lxcfs

  • https://www.helpsystems.com/resources/guides/unix-load-average-part-1-how-it-works

关注我们

界世的你当不

只做你的肩膀

U3eUvay.jpg!web

AZv6fay.jpg!web

360官方技术公众号 

技术干货|一手资讯|精彩活动

空·


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK