27

Kubectl exec 背后到底发生了什么?

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzU1MzY4NzQ1OA%3D%3D&%3Bmid=2247485100&%3Bidx=1&%3Bsn=e64f938c2fa6b40ac0071d481ba0eacc
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

对于经常和 Kubernetes 打交道的 YAML 工程师来说,最常用的命令就是 kubectl exec 了,通过它可以直接在容器内执行命令来调试应用程序。如果你不满足于只是用用而已,想了解 kubectl exec 的工作原理,那么本文值得你仔细读一读。本文将通过参考 kubectlAPI ServerKubelet 和容器运行时接口(CRI)Docker API 中的相关代码来了解该命令是如何工作的。

kubectl exec 的工作原理用一张图就可以表示:

fuIVnqz.png!webkubectl exec

先来看一个例子:

:whale: → kubectl version --short 
Client Version: v1.15.0
Server Version: v1.15.3

:whale: → kubectl run nginx --image=nginx --port=80 --generator=run-pod/v1
pod/nginx created

:whale: → kubectl get po
NAME READY STATUS RESTARTS AGE
nginx 1/1 Running 0 6s

:whale: → kubectl exec nginx -- date
Sat Jan 25 18:47:52 UTC 2020

:whale: → kubectl exec -it nginx -- /bin/bash
root@nginx:/#

第一个 kubectl exec 在容器内执行了 date 命令,第二个 kubectl exec 使用 -i-t 参数进入了容器的交互式 shell。

重复第二个 kubectl exec 命令,打印更详细的日志:

:whale: → kubectl -v=7 exec -it nginx -- /bin/bash                                                         
I0125 10:51:55.434043 28053 loader.go:359] Config loaded from file: /home/isim/.kube/kind-config-linkerd
I0125 10:51:55.438595 28053 round_trippers.go:416] GET https://127.0.0.1:38545/api/v1/namespaces/default/pods/nginx
I0125 10:51:55.438607 28053 round_trippers.go:423] Request Headers:
I0125 10:51:55.438611 28053 round_trippers.go:426] Accept: application/json, */*
I0125 10:51:55.438615 28053 round_trippers.go:426] User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5
I0125 10:51:55.445942 28053 round_trippers.go:441] Response Status: 200 OK in 7 milliseconds
I0125 10:51:55.451050 28053 round_trippers.go:416] POST https://127.0.0.1:38545/api/v1/namespaces/default/pods/nginx/exec?command=%2Fbin%2Fbash&container=nginx&stdin=true&stdout=true&tty=true
I0125 10:51:55.451063 28053 round_trippers.go:423] Request Headers:
I0125 10:51:55.451067 28053 round_trippers.go:426] X-Stream-Protocol-Version: v4.channel.k8s.io
I0125 10:51:55.451090 28053 round_trippers.go:426] X-Stream-Protocol-Version: v3.channel.k8s.io
I0125 10:51:55.451096 28053 round_trippers.go:426] X-Stream-Protocol-Version: v2.channel.k8s.io
I0125 10:51:55.451100 28053 round_trippers.go:426] X-Stream-Protocol-Version: channel.k8s.ioI0125 10:51:55.451121 28053 round_trippers.go:426] User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5
I0125 10:51:55.465690 28053 round_trippers.go:441] Response Status: 101 Switching Protocols in 14 milliseconds
root@nginx:/#

这里有两个重要的 HTTP 请求:

  • GET 请求用来获取 Pod 信息。
  • POST 请求调用 Pod 的子资源 exec 在容器内执行命令。

子资源(subresource)隶属于某个 K8S 资源,表示为父资源下方的子路径,例如 /logs/status/scale/exec 等。其中每个子资源支持的操作根据对象的不同而改变。

最后 API Server 返回了 101 Ugrade 响应,向客户端表示已切换到 SPDY 协议。

SPDY 允许在单个 TCP 连接上复用独立的 stdin/stdout/stderr/spdy-error 流。

1. API Server 源码分析

请求首先会到底 API Server,先来看看 API Server 是如何注册 rest.ExecRest 处理器来处理子资源请求 /exec 的。这个处理器用来确定 exec 要进入的节点。

API Server 启动过程中做的第一件事就是指挥内嵌的 GenericAPIServer 加载早期的遗留 API(legacy API):

if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
// ...
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}

在 API 加载过程中,会将类型 LegacyRESTStorage 实例化,创建一个 storage.PodStorage 实例:

podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
podDisruptionClient,
)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}

随后 storeage.PodStorage 实例会被添加到 map restStorageMap 中。注意,该 map 将路径 pods/exec 映射到了 podStoragerest.ExecRest 处理器。

restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.LegacyBinding,

podstorage 为 pod 和子资源提供了 CURD 逻辑和策略的抽象。更多详细信息请查看内嵌的 genericregistry.Store

map restStorageMap 会成为实例 apiGroupInfo 的一部分,添加到 GenericAPIServer 中:

if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
return err
}

// Install the version handler.
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())

其中 GoRestfulContainer.ServeMux 会将传入的请求 URL 映射到不同的处理器。

接下来重点观察处理器 therest.ExecRest 的工作原理,它的 Connect() 方法会调用函数 pod.ExecLocation() 来确定 pod 中容器的 exec 子资源的 URL

// Connect returns a handler for the pod exec proxy
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
execOpts, ok := opts.(*api.PodExecOptions)
if !ok {
return nil, fmt.Errorf("invalid options object: %#v", opts)
}
location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts)
if err != nil {
return nil, err
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}

函数 pod.ExecLocation() 返回的 URL 被 API Server 用来决定连接到哪个节点。

下面接着分析节点上的 Kubelet 源码。

2. Kubelet 源码分析

到了 Kubelet 这边,我们需要关心两点:

  • Kubelet 是如何注册 exec 处理器的?
  • Kubelet 与 Docker API 如何交互?

Kubelet 的初始化过程非常复杂,主要涉及到两个函数:

  • PreInitRuntimeService() : 使用 dockershim 包来初始化 CRI
  • RunKubelet() : 注册处理器,启动 Kubelet 服务。

注册处理器

当 Kubelet 启动时,它的 RunKubelet() 函数会调用私有函数 startKubelet() 来启动 kubelet.Kubelet 实例的 ListenAndServe() 方法,然后该方法会调用函数 ListenAndServeKubeletServer() ,使用构造函数 NewServer() 来安装 『debugging』处理器:

// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
func NewServer(
// ...
criHandler http.Handler)
Server
{
// ...
if enableDebuggingHandlers {
server.InstallDebuggingHandlers(criHandler)
if enableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
} else {
server.InstallDebuggingDisabledHandlers()
}
return server
}

InstallDebuggingHandlers() 函数使用 getExec() 处理器来注册 HTTP 请求模式:

// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
// ...
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
s.restfulCont.Add(ws)

其中 getExec() 处理器又会调用 s.host 实例中的 GetExec() 方法:

// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
// ...
podFullName := kubecontainer.GetPodFullName(pod)
url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
if err != nil {
streaming.WriteError(err, response.ResponseWriter)
return
}
// ...
}

s.host 被实例化为 kubelet.Kubelet 类型的一个实例,它嵌套引用了 StreamingRuntime 接口,该接口又被实例化为 kubeGenericRuntimeManager 的实例,即 运行时管理器 。该运行时管理器是 Kubelet 与 Docker API 交互的关键组件, GetExec() 方法就是由它实现的:

// GetExec gets the endpoint the runtime will serve the exec request from.
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
// ...
resp, err := m.runtimeService.Exec(req)
if err != nil {
return nil, err
}

return url.Parse(resp.Url)
}

GetExec() 又会调用 runtimeService.Exec() 方法,进一步挖掘你会发现 runtimeService 是 CRI 包中定义的接口。 kuberuntime.kubeGenericRuntimeManagerruntimeService 被实例化为 kuberuntime.instrumentedRuntimeService 类型,由它来实现 runtimeService.Exec() 方法:

func (in instrumentedRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
const operation = "exec"
defer recordOperation(operation, time.Now())

resp, err := in.service.Exec(req)
recordError(operation, err)
return resp, err
}

instrumentedRuntimeService 实例的嵌套服务对象被实例化为 theremote.RemoteRuntimeService 类型的实例。该类型实现了 Exec() 方法:

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()

resp, err := r.runtimeClient.Exec(ctx, req)
if err != nil {
klog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err)
return nil, err
}

if resp.Url == "" {
errorMessage := "URL is not set"
klog.Errorf("Exec failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}

return resp, nil
}

Exec() 方法会向 /runtime.v1alpha2.RuntimeService/Exec 发起一个 gRPC 调用来让运行时端准备一个流式通信的端点,该端点用于在容器中执行命令(关于如何将 Docker shim 设置为 gRPC 服务端的更多信息请参考下一小节)。

gRPC 服务端通过调用 RuntimeServiceServer.Exec() 方法来处理请求,该方法由 dockershim.dockerService 结构体实现:

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.NewErrorStreamingDisabled("exec")
}
_, err := checkContainerStatus(ds.client, req.ContainerId)
if err != nil {
return nil, err
}
return ds.streamingServer.GetExec(req)
}

第 10 行的 ThestreamingServer 是一个 streaming.Server 接口,它在构造函数 dockershim.NewDockerService() 中被实例化:

// create streaming server if configured.
if streamingConfig != nil {
var err error
ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime)
if err != nil {
return nil, err
}
}

来看一下 GetExec() 方法的实现方式:

func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if err := validateExecRequest(req); err != nil {
return nil, err
}
token, err := s.cache.Insert(req)
if err != nil {
return nil, err
}
return &runtimeapi.ExecResponse{
Url: s.buildURL("exec", token),
}, nil
}

可以看到这里只是向客户端返回一个简单的 token 组合成的 URL, 之所以生成一个 token 是因为用户的命令中可能包含各种各样的字符,各种长度的字符,需要格式化为一个简单的 token。该 token 会缓存在本地,后面真正的 exec 请求会携带这个 token,通过该 token 找到之前的具体请求。其中 restful.WebService 实例会将 pod exec 请求路由到这个端点:

// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
// ...
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
s.restfulCont.Add(ws)

创建 Docker shim

PreInitRuntimeService() 函数作为 gRPC 服务端,负责创建并启动 Docker shim。在将 dockershim.dockerService 类型实例化时,让其嵌套的 streamingRuntime 实例引用 dockershim.NativeExecHandler 的实例(该实例实现了 dockershim.ExecHandler 接口)。

ds := &dockerService{
// ...
streamingRuntime: &streamingRuntime{
client: client,
execHandler: &NativeExecHandler{},
},
// ...
}

使用 Docker 的 exec API 在容器中执行命令的核心实现就是 NativeExecHandler.ExecInContainer() 方法:

func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
// ...
startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
streamOpts := libdocker.StreamOptions{
InputStream: stdin,
OutputStream: stdout,
ErrorStream: stderr,
RawTerminal: tty,
ExecStarted: execStarted,
}
err = client.StartExec(execObj.ID, startOpts, streamOpts)
if err != nil {
return err
}
// ...

这里就是最终 Kubelet 调用 Docker exec API 的地方。

最后需要搞清楚的是 streamingServer 处理器如何处理 exec 请求。首先需要找到它的 exec 处理器,我们直接从构造函数 streaming.NewServer() 开始往下找,因为这是将 /exec/{token} 路径绑定到 serveExec 处理器的地方:

ws := &restful.WebService{}
endpoints := []struct {
path string
handler restful.RouteFunction
}{
{"/exec/{token}", s.serveExec},
{"/attach/{token}", s.serveAttach},
{"/portforward/{token}", s.servePortForward},
}

所有发送到 dockershim.dockerService 实例的请求最终都会在 streamingServer 处理器上完成,因为 dockerService.ServeHTTP() 方法会调用 streamingServer 实例的 ServeHTTP() 方法。

serveExec 处理器会调用 remoteCommand.ServeExec() 函数,这个函数又是干嘛的呢?它会调用前面提到的 Executor.ExecInContainer() 方法,而 ExecInContainer() 方法是知道如何与 Docker exec API 通信的:

// ServeExec handles requests to execute a command in a container. After
// creating/receiving the required streams, it delegates the actual execution
// to the executor.
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
// ...
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
if err != nil {
// ...
} else {
// ...
}
}

3. 总结

本文通过解读 kubectlAPI ServerCRI 的源码,帮助大家理解 kubectl exec 命令的工作原理,当然,这里并没有涉及到 Docker exec API 的细节,也没有涉及到 docker exec 的工作原理。

首先,kubectl 向 API Server 发出了 GETPOST 请求,API Server 返回了 101 Ugrade 响应,向客户端表示已切换到 SPDY 协议。

随后 API Server 使用 storage.PodStoragerest.ExecRest 来提供处理器的映射和执行逻辑,其中 rest.ExecRest 处理器决定 exec 要进入的节点。

最后 Kubelet 向 Docker shim 请求一个流式端点 URL,并将 exec 请求转发到 Docker exec API。kubelet 再将这个 URL 以 Redirect 的方式返回给 API Server,请求就会重定向到到对应 Streaming Server 上发起的 exec 请求,并维护长链。

虽然本文只关注了 kubectl exec 命令,但其他的子命令(例如 attachport-forwardlog 等等)也遵循了类似的实现模式:

IJZVvaM.png!webkubectl

原文链接:https://itnext.io/how-it-works-kubectl-exec-e31325daa910

jEJraeF.png!web

你可能还喜欢

点击下方图片即可阅读

rUFviqY.png!web

不要轻易使用 Alpine 镜像来构建 Docker 镜像,有坑!

vam2EvY.gif

云原生是一种信仰 

3YRZRn2.png!web

码关注公众号

后台回复◉k8s◉获取史上最方便快捷的 Kubernetes 高可用部署工具,只需一条命令,连 ssh 都不需要!

JV73iue.gif

jeeyYzV.gif

点击  "阅读原文"  获取 更好的阅读体验!

:heart: 给个 「在看」 ,是对我最大的支持:heart:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK