ctr run流程

我们接下来从TaskService的Create接口看起,看一下containerd是如何与底层的containerd-shim以及最后的runc进行结合工作的。

opts := runtime.CreateOpts{
        Spec: container.Spec,
        //这看起来实际操作I/O还是底层的容器执行器
        IO: runtime.IO{
            Stdin:    r.Stdin,
            Stdout:   r.Stdout,
            Stderr:   r.Stderr,
            Terminal: r.Terminal,
        },
        Checkpoint:     checkpointPath,
        Runtime:        container.Runtime.Name,
        RuntimeOptions: container.Runtime.Options,
        TaskOptions:    r.Options,
}
c, err := runtime.Create(ctx, r.ContainerID, opts)

// Create a new task
func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
    namespace, err := namespaces.NamespaceRequired(ctx)
    if err != nil {
        return nil, err
    }

    if err := identifiers.Validate(id); err != nil {
        return nil, errors.Wrapf(err, "invalid task id")
    }

    ropts, err := r.getRuncOptions(ctx, id)
    if err != nil {
        return nil, err
    }
    // newBundle 根据传入的路径和 ID 创建目录文件,/var/lib/containerd/io.containerd.runtime.v1.linux/default
    bundle, err := newBundle(id,
        filepath.Join(r.state, namespace),
        filepath.Join(r.root, namespace),
        opts.Spec.Value)
    if err != nil {
        return nil, err
    }
    defer func() {
        if err != nil {
            bundle.Delete()
        }
    }()

    shimopt := ShimLocal(r.config, r.events)
    if !r.config.NoShim {
        var cgroup string
        if opts.TaskOptions != nil {
            v, err := typeurl.UnmarshalAny(opts.TaskOptions)
            if err != nil {
                return nil, err
            }
            cgroup = v.(*runctypes.CreateOptions).ShimCgroup
        }
        exitHandler := func() {
            log.G(ctx).WithField("id", id).Info("shim reaped")
            t, err := r.tasks.Get(ctx, id)
            if err != nil {
                // Task was never started or was already successfully deleted
                return
            }
            lc := t.(*Task)

            log.G(ctx).WithFields(logrus.Fields{
                "id":        id,
                "namespace": namespace,
            }).Warn("cleaning up after killed shim")
            if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid); err != nil {
                log.G(ctx).WithError(err).WithFields(logrus.Fields{
                    "id":        id,
                    "namespace": namespace,
                }).Warn("failed to clean up after killed shim")
            }
        }
        //在这里产生了一个新的shim进程,adress地址就是containerd daemon的地址,调用了WithStart func
        shimopt = ShimRemote(r.config, r.address, cgroup, exitHandler)
    }

    // 产生一个shim client
    s, err := bundle.NewShimClient(ctx, namespace, shimopt, ropts)
    if err != nil {
        return nil, err
    }
    defer func() {
        if err != nil {
            if kerr := s.KillShim(ctx); kerr != nil {
                log.G(ctx).WithError(err).Error("failed to kill shim")
            }
        }
    }()

    rt := r.config.Runtime
    if ropts != nil && ropts.Runtime != "" {
        rt = ropts.Runtime
    }
    //填充 CreateTaskRequest 结构体,发送 GRPC 给 shim 创建,Create 路径为containerd/runtime/v1/shim/service.go
    sopts := &shim.CreateTaskRequest{
        ID:         id,
        Bundle:     bundle.path,
        Runtime:    rt,
        Stdin:      opts.IO.Stdin,
        Stdout:     opts.IO.Stdout,
        Stderr:     opts.IO.Stderr,
        Terminal:   opts.IO.Terminal,
        Checkpoint: opts.Checkpoint,
        Options:    opts.TaskOptions,
    }
    for _, m := range opts.Rootfs {
        sopts.Rootfs = append(sopts.Rootfs, &types.Mount{
            Type:    m.Type,
            Source:  m.Source,
            Options: m.Options,
        })
    }
    //将创建的请求交给shim处理,我们下边看看shim会再如何处理得到的这个create请求
    cr, err := s.Create(ctx, sopts)
    if err != nil {
        return nil, errdefs.FromGRPC(err)
    }
    t, err := newTask(id, namespace, int(cr.Pid), s, r.events, r.tasks, bundle)
    if err != nil {
        return nil, err
    }
    if err := r.tasks.Add(ctx, t); err != nil {
        return nil, err
    }
    r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventstypes.TaskCreate{
        ContainerID: sopts.ID,
        Bundle:      sopts.Bundle,
        Rootfs:      sopts.Rootfs,
        IO: &eventstypes.TaskIO{
            Stdin:    sopts.Stdin,
            Stdout:   sopts.Stdout,
            Stderr:   sopts.Stderr,
            Terminal: sopts.Terminal,
        },
        Checkpoint: sopts.Checkpoint,
        Pid:        uint32(t.pid),
    })

    return t, nil
}


func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHandler func()) Opt {
    return func(ctx context.Context, config shim.Config) (_ shimapi.ShimService, _ io.Closer, err error) {
        // 这个address是shim-address,用来containerd和shim进程之间的通信
        // filepath.Join(string(filepath.Separator), "containerd-shim", namespace, bundle.id, "shim.sock")
        socket, err := newSocket(address)
        if err != nil {
            return nil, nil, err
        }
        defer socket.Close()
        f, err := socket.File()
        if err != nil {
            return nil, nil, errors.Wrapf(err, "failed to get fd for socket %s", address)
        }
        defer f.Close()
       // 执行containerd-shim --namespace default --address /run/containerd/containerd.sock命令
        cmd, err := newCommand(binary, daemonAddress, debug, config, f)
        if err != nil {
            return nil, nil, err
        }
        if err := cmd.Start(); err != nil {
            return nil, nil, errors.Wrapf(err, "failed to start shim")
        }
        defer func() {
            if err != nil {
                cmd.Process.Kill()
            }
        }()
        go func() {
            cmd.Wait()
            exitHandler()
        }()
        log.G(ctx).WithFields(logrus.Fields{
            "pid":     cmd.Process.Pid,
            "address": address,
            "debug":   debug,
        }).Infof("shim %s started", binary)
        // set shim in cgroup if it is provided
        if cgroup != "" {
            if err := setCgroup(cgroup, cmd); err != nil {
                return nil, nil, err
            }
            log.G(ctx).WithFields(logrus.Fields{
                "pid":     cmd.Process.Pid,
                "address": address,
            }).Infof("shim placed in cgroup %s", cgroup)
        }
        if err = sys.SetOOMScore(cmd.Process.Pid, sys.OOMScoreMaxKillable); err != nil {
            return nil, nil, errors.Wrap(err, "failed to set OOM Score on shim")
        }
        //通过前边介绍的shim-address产生一个client然后进行通信,使用的也是unix socket通信 
        c, clo, err := WithConnect(address, func() {})(ctx, config)
        if err != nil {
            return nil, nil, errors.Wrap(err, "failed to connect")
        }
        return c, clo, nil
    }
}

shim处理逻辑

path: containerd/runtime/v1/shim/service.go api list:

type ShimService interface {
    State(ctx context.Context, req *StateRequest) (*StateResponse, error)
    Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error)
    Start(ctx context.Context, req *StartRequest) (*StartResponse, error)
    Delete(ctx context.Context, req *google_protobuf1.Empty) (*DeleteResponse, error)
    DeleteProcess(ctx context.Context, req *DeleteProcessRequest) (*DeleteResponse, error)
    ListPids(ctx context.Context, req *ListPidsRequest) (*ListPidsResponse, error)
    Pause(ctx context.Context, req *google_protobuf1.Empty) (*google_protobuf1.Empty, error)
    Resume(ctx context.Context, req *google_protobuf1.Empty) (*google_protobuf1.Empty, error)
    Checkpoint(ctx context.Context, req *CheckpointTaskRequest) (*google_protobuf1.Empty, error)
    Kill(ctx context.Context, req *KillRequest) (*google_protobuf1.Empty, error)
    Exec(ctx context.Context, req *ExecProcessRequest) (*google_protobuf1.Empty, error)
    ResizePty(ctx context.Context, req *ResizePtyRequest) (*google_protobuf1.Empty, error)
    CloseIO(ctx context.Context, req *CloseIORequest) (*google_protobuf1.Empty, error)
    ShimInfo(ctx context.Context, req *google_protobuf1.Empty) (*ShimInfoResponse, error)
    Update(ctx context.Context, req *UpdateTaskRequest) (*google_protobuf1.Empty, error)
    Wait(ctx context.Context, req *WaitRequest) (*WaitResponse, error)
}

// 上文中提到过的CreateRequest通过shim的unix socket交给这个函数来进行处理
// Create a new initial process and container with the underlying OCI runtime
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    var mounts []proc.Mount
    for _, m := range r.Rootfs {
        mounts = append(mounts, proc.Mount{
            Type:    m.Type,
            Source:  m.Source,
            Target:  m.Target,
            Options: m.Options,
        })
    }

    config := &proc.CreateConfig{
        ID:               r.ID,
        Bundle:           r.Bundle,
        Runtime:          r.Runtime,
        Rootfs:           mounts,
        Terminal:         r.Terminal,
        Stdin:            r.Stdin,
        Stdout:           r.Stdout,
        Stderr:           r.Stderr,
        Checkpoint:       r.Checkpoint,
        ParentCheckpoint: r.ParentCheckpoint,
        Options:          r.Options,
    }
    rootfs := filepath.Join(r.Bundle, "rootfs")
    defer func() {
        if err != nil {
            if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
                log.G(ctx).WithError(err2).Warn("Failed to cleanup rootfs mount")
            }
        }
    }()
    for _, rm := range mounts {
        m := &mount.Mount{
            Type:    rm.Type,
            Source:  rm.Source,
            Options: rm.Options,
        }
        if err := m.Mount(rootfs); err != nil {
            return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
        }
    }
    //下文有介绍
    process, err := newInit(
        ctx,
        s.config.Path,
        s.config.WorkDir,
        s.config.RuntimeRoot,
        s.config.Namespace,
        s.config.Criu,
        s.config.SystemdCgroup,
        s.platform,
        config,
    )
    if err != nil {
        return nil, errdefs.ToGRPC(err)
    }
    //在这里函数里边开始创建物理容器进程
    //最终在func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) 里边通过/run/containerd/runc命令创建符合OCI标准的容器物理进程
    if err := process.Create(ctx, config); err != nil {
        return nil, errdefs.ToGRPC(err)
    }
    // save the main task id and bundle to the shim for additional requests
    s.id = r.ID
    s.bundle = r.Bundle
    pid := process.Pid()
    s.processes[r.ID] = process
    return &shimapi.CreateTaskResponse{
        Pid: uint32(pid),
    }, nil
}

//在这个func里边返回一个process描述对象供以后使用,这里这里已经是在shim进程里边了。
func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform rproc.Platform, r *proc.CreateConfig) (*proc.Init, error) {
    var options runctypes.CreateOptions
    if r.Options != nil {
        v, err := typeurl.UnmarshalAny(r.Options)
        if err != nil {
            return nil, err
        }
        options = *v.(*runctypes.CreateOptions)
    }

    rootfs := filepath.Join(path, "rootfs")
    runtime := proc.NewRunc(runtimeRoot, path, namespace, r.Runtime, criu, systemdCgroup)
    //创建进程描述对象,这个时候还没有创建起物理的容器进程
    p := proc.New(r.ID, runtime, rproc.Stdio{
        Stdin:    r.Stdin,
        Stdout:   r.Stdout,
        Stderr:   r.Stderr,
        Terminal: r.Terminal,
    })
    p.Bundle = r.Bundle
    p.Platform = platform
    p.Rootfs = rootfs
    p.WorkDir = workDir
    p.IoUID = int(options.IoUid)
    p.IoGID = int(options.IoGid)
    p.NoPivotRoot = options.NoPivotRoot
    p.NoNewKeyring = options.NoNewKeyring
    return p, nil
}

总结

containerd daemon看起来只是一层比较薄的逻辑。 containerd-shim则相当于是容器物理进程,比如会在容器物理挂掉后接管它,避免它被init进程监管等。 具体的执行还是都交给了底层的runc来完成。

流程图如下所示 未命名文件.png-34.3kB

参考

https://blog.csdn.net/zhonglinzhang/article/details/76615127 https://blog.csdn.net/zhonglinzhang/article/category/3271199 https://blog.csdn.net/zhonglinzhang/article/details/76683925

results matching ""

    No results matching ""