启动流程

入口path: containerd/cmd/containerd/command/main.go

app.Action = func(context *cli.Context) error {
        var (
            start   = time.Now()
            signals = make(chan os.Signal, 2048)
            serverC = make(chan *server.Server, 1)
            ctx     = gocontext.Background()
            config  = defaultConfig()
            //默认的 config root 为 /var/lib/containerd
        )

        done := handleSignals(ctx, signals, serverC)
        // start the signal handler as soon as we can to make sure that
        // we don't miss any signals during boot
        signal.Notify(signals, handledSignals...)

        if err := srvconfig.LoadConfig(context.GlobalString("config"), config); err != nil && !os.IsNotExist(err) {
            return err
        }
        // apply flags to the config
        if err := applyFlags(context, config); err != nil {
            return err
        }
        // cleanup temp mounts
        if err := mount.SetTempMountLocation(filepath.Join(config.Root, "tmpmounts")); err != nil {
            return errors.Wrap(err, "creating temp mount location")
        }
        // unmount all temp mounts on boot for the server
        warnings, err := mount.CleanupTempMounts(0)
        if err != nil {
            log.G(ctx).WithError(err).Error("unmounting temp mounts")
        }
        for _, w := range warnings {
            log.G(ctx).WithError(w).Warn("cleanup temp mount")
        }
        address := config.GRPC.Address
        if address == "" {
            return errors.New("grpc address cannot be empty")
        }
        log.G(ctx).WithFields(logrus.Fields{
            "version":  version.Version,
            "revision": version.Revision,
        }).Info("starting containerd")

        server, err := server.New(ctx, config)
        if err != nil {
            return err
        }
        serverC <- server
        if config.Debug.Address != "" {
            var l net.Listener
            if filepath.IsAbs(config.Debug.Address) {
                if l, err = sys.GetLocalListener(config.Debug.Address, config.Debug.UID, config.Debug.GID); err != nil {
                    return errors.Wrapf(err, "failed to get listener for debug endpoint")
                }
            } else {
                if l, err = net.Listen("tcp", config.Debug.Address); err != nil {
                    return errors.Wrapf(err, "failed to get listener for debug endpoint")
                }
            }
            //初始化debug的接口
            serve(ctx, l, server.ServeDebug)
        }
        if config.Metrics.Address != "" {
            l, err := net.Listen("tcp", config.Metrics.Address)
            if err != nil {
                return errors.Wrapf(err, "failed to get listener for metrics endpoint")
            }
            //初始化ServeMetrics的接口
            serve(ctx, l, server.ServeMetrics)
        }

        //产生unix-socket被grpcServer使用来监听, 默认在/run/containerd/containerd.sock
        l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
        if err != nil {
            return errors.Wrapf(err, "failed to get listener for main endpoint")
        }
        //rpc在Server.New中被初始化
        serve(ctx, l, server.ServeGRPC)

        log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
        <-done
        return nil
    }
    return app
}



// New creates and initializes a new containerd server
func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
    switch {
    case config.Root == "":
        return nil, errors.New("root must be specified")
    case config.State == "":
        return nil, errors.New("state must be specified")
    case config.Root == config.State:
        return nil, errors.New("root and state must be different paths")
    }

    if err := os.MkdirAll(config.Root, 0711); err != nil {
        return nil, err
    }
    if err := os.MkdirAll(config.State, 0711); err != nil {
        return nil, err
    }
    if err := apply(ctx, config); err != nil {
        return nil, err
    }
    //加载插件,比如snapshottor, metaStore,content等,但是其中还有个proxyPlugin,看起来是为某些服务提供的代理
    plugins, err := LoadPlugins(ctx, config)
    if err != nil {
        return nil, err
    }

    serverOpts := []grpc.ServerOption{
        grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
        grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
    }
    if config.GRPC.MaxRecvMsgSize > 0 {
        serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))
    }
    if config.GRPC.MaxSendMsgSize > 0 {
        serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))
    }
    rpc := grpc.NewServer(serverOpts...)
    var (
        services []plugin.Service
        s        = &Server{
            rpc:    rpc,
            events: exchange.NewExchange(),
            config: config,
        }
        initialized = plugin.NewPluginSet()
    )
    //上边加载好了plugin后,由下边调用init方法初始化每个plugin,并将需要注册的grpc路由收集到services []plugin.Service中
    for _, p := range plugins {
        id := p.URI()
        log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)

        initContext := plugin.NewContext(
            ctx,
            p,
            initialized,
            config.Root,
            config.State,
        )
        initContext.Events = s.events
        initContext.Address = config.GRPC.Address

        // load the plugin specific configuration if it is provided
        if p.Config != nil {
            pluginConfig, err := config.Decode(p.ID, p.Config)
            if err != nil {
                return nil, err
            }
            initContext.Config = pluginConfig
        }
        result := p.Init(initContext)
        if err := initialized.Add(result); err != nil {
            return nil, errors.Wrapf(err, "could not add plugin result to plugin set")
        }

        instance, err := result.Instance()
        if err != nil {
            if plugin.IsSkipPlugin(err) {
                log.G(ctx).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
            } else {
                log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
            }
            continue
        }
        // check for grpc services that should be registered with the server
        if service, ok := instance.(plugin.Service); ok {
            services = append(services, service)
        }
        s.plugins = append(s.plugins, result)
    }
    // register services after all plugins have been initialized(注册rpc路由信息)
    for _, service := range services {
        if err := service.Register(rpc); err != nil {
            return nil, err
        }
    }
    return s, nil
}

下面简介一下plugin的类型

const (
    // InternalPlugin implements an internal plugin to containerd
    InternalPlugin Type = "io.containerd.internal.v1"
    // RuntimePlugin implements a runtime
    RuntimePlugin Type = "io.containerd.runtime.v1"
    // RuntimePluginV2 implements a runtime v2
    RuntimePluginV2 Type = "io.containerd.runtime.v2"
    // ServicePlugin implements a internal service
    ServicePlugin Type = "io.containerd.service.v1"
    // GRPCPlugin implements a grpc service
    GRPCPlugin Type = "io.containerd.grpc.v1"
    // SnapshotPlugin implements a snapshotter
    SnapshotPlugin Type = "io.containerd.snapshotter.v1"
    // TaskMonitorPlugin implements a task monitor
    TaskMonitorPlugin Type = "io.containerd.monitor.v1"
    // DiffPlugin implements a differ
    DiffPlugin Type = "io.containerd.differ.v1"
    // MetadataPlugin implements a metadata store
    MetadataPlugin Type = "io.containerd.metadata.v1"
    // ContentPlugin implements a content store
    ContentPlugin Type = "io.containerd.content.v1"
    // GCPlugin implements garbage collection policy
    GCPlugin Type = "io.containerd.gc.v1"
)

func init() {
       plugin.Register(&plugin.Registration{
              ID:   "btrfs",
              Type: plugin.SnapshotPlugin,
              Init: func(ic *plugin.InitContext) (interface{}, error) {
                     return NewSnapshotter(ic.Root)
              },
       })
}

func init() {
       plugin.Register(&plugin.Registration{
              Type: plugin.SnapshotPlugin,
              ID:   "overlayfs",
              Init: func(ic *plugin.InitContext) (interface{}, error) {
                     return NewSnapshotter(ic.Root)
              },
       })
}
//可以看到containerd的snapshottor机制也是通过底层的这些storage driver实现的。

更详细的plugin介绍在后边涉及到的时候再深入研究。

总结-流程图

应用添加模块.png-12.2kB

results matching ""

    No results matching ""