Kubernetes ApiServer三大server权限与数据存储解析

目录
  • 确立目标
  • Run
    • Three Servers
  • KubeAPIServer
    • GenericConfig
    • Authentication
    • Authorization
    • Admission
  • GenericAPIServer的初始化
    • New
    • GenericServer
    • NewAPIServerHandler
    • installAPI
    • Apiserver
    • InstallLegacyAPI
    • Create Pod
  • Pod数据的保存
    • RESTCreateStrategy
    • Storage
    • Storage Implement
    • Summary

确立目标

  • 理解启动kube-apiserver的权限相关的三个核心概念 Authentication/Authorization/Admission 分别是认证,授权,准入
  • 理解kube-apiserver是中的管理核心资源的KubeAPIServer是怎么启动的
  • 理解Pod发送到kube-apiserver后是怎么保存的

Run

kube-apiserver的启动 代码在cmd/kube-apiserver

// 类似kubectl的源代码,kube-apiserver的命令行工具也使用了cobra,我们很快就能找到启动的入口
RunE: func(cmd *cobra.Command, args []string) error {
			// 这里包含2个参数,前者是参数completedOptions,后者是一个stopCh <-chan struct{}
			return Run(completedOptions, genericapiserver.SetupSignalHandler())
		}
/*
	在这里,我们可以和kubectl结合起来思考:
	kubectl是一个命令行工具,执行完命令就退出;kube-apiserver是一个常驻的服务器进程,监听端口
	这里引入了一个stopCh <-chan struct{},可以在启动后,用一个 <-stopCh 作为阻塞,使程序不退出
	用channel阻塞进程退出,对比传统的方法 - 用一个永不退出的for循环,是一个很优雅的实现
*/
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
        // 这里进行创建服务链
        server, err := CreateServerChain(completeOptions)
	if err != nil {
		return err
	}
	prepared, err := server.PrepareRun()
	if err != nil {
		return err
	}
	return prepared.Run(stopCh)
}

Three Servers

// 在CreateServerChain这个函数下,创建了3个server
func CreateServerChain(){
  // API扩展服务,主要针对CRD
	createAPIExtensionsServer(){}
  // API核心服务,包括常见的Pod/Deployment/Service,我们今天的重点聚焦在这里
  // 我会跳过很多非核心的配置参数,一开始就去研究细节,很影响整体代码的阅读效率
	CreateKubeAPIServer(){}
  // API聚合服务,主要针对metrics
	createAggregatorServer(){}
  //细节是第二个ApiServer需要第一个server的配置,第三个server会要第二个server的配置,最后返回的是聚合server
  // 这些server的config都是由一个GenericConfig和一个ExtraConfig组成 有自己的特点和链上的
      return aggregatorServer, nil
}

KubeAPIServer

// 创建配置的流程
func CreateKubeAPIServerConfig(){
  // 创建通用配置genericConfig
  genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
}

GenericConfig

// 通用配置的创建
func buildGenericConfig(s *options.ServerRunOptions,proxyTransport *http.Transport){
  // Insecure对应的非安全的通信,也就是HTTP
  if lastErr = s.InsecureServing...
  // Secure对应的就是HTTPS
  if lastErr = s.SecureServing...
  // OpenAPIConfig是对外提供的API文档
  genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig()
  // 这一块是storageFactory的实例化,可以看到采用的是etcd作为存储方案
  storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
	storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
	completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
	storageFactory, lastErr = completedStorageFactoryConfig.New()
  // Authentication 认证相关
  if lastErr = s.Authentication.ApplyTo()...
  // Authorization 授权相关
  genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer()
  // Admission 准入机制
  err = s.Admission.ApplyTo()
}

Authentication

func (o *BuiltInAuthenticationOptions) ApplyTo(){
  // 前面都是对认证config进行参数设置,这里才是真正的实例化
  authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
}
// New这块的代码,我们要抓住核心变量authenticators和tokenAuthenticators,也就是各种认证方法
func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
  // 核心变量authenticators和tokenAuthenticators
	var authenticators []authenticator.Request
        var tokenAuthenticators []authenticator.Token
	if config.RequestHeaderConfig != nil {
		// 1. 添加requestHeader
		authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
	}
	if config.ClientCAContentProvider != nil {
		// 2. 添加ClientCA
    authenticators = append(authenticators, certAuth)
	}
	if len(config.TokenAuthFile) > 0 {
		// 3. token 添加tokenfile
		tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
	}
  // 4. token 添加 service account,分两种来源
	if len(config.ServiceAccountKeyFiles) > 0 {
		tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
	}
	if utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) && config.ServiceAccountIssuer != "" {
		tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
	}
	if config.BootstrapToken {
		if config.BootstrapTokenAuthenticator != nil {
      // 5. token 添加 bootstrap
			tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
		}
	}
	if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 {
		// 6. token 添加 oidc
    Authenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth))
	}
	if len(config.WebhookTokenAuthnConfigFile) > 0 {
		// 7. token 添加 webhook
		tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
	}
  // 8. 组合tokenAuthenticators到tokenAuthenticators中
	if len(tokenAuthenticators) > 0 {
		tokenAuth := tokenunion.New(tokenAuthenticators...)
		if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
			tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
		}
		authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
	}
  // 9. 没有任何认证方式且启用了Anonymous
	if len(authenticators) == 0 {
		if config.Anonymous {
			return anonymous.NewAuthenticator(), &securityDefinitions, nil
		}
		return nil, &securityDefinitions, nil
	}
  // 10. 组合authenticators
	authenticator := union.New(authenticators...)
	return authenticator, &securityDefinitions, nil
}

复杂的Authentication模块的初始化顺序我们看完了,有初步的了解即可,没必要去强制记忆其中的加载顺序。

Authorization

func BuildAuthorizer(){
  // 与上面一致,实例化是在这个New中
  return authorizationConfig.New()
}
// 不得不说,Authorizer这块的阅读体验更好
func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
  // 必须传入一个Authorizer机制
	if len(config.AuthorizationModes) == 0 {
		return nil, nil, fmt.Errorf("at least one authorization mode must be passed")
	}
	var (
		authorizers   []authorizer.Authorizer
		ruleResolvers []authorizer.RuleResolver
	)
	for _, authorizationMode := range config.AuthorizationModes {
		// 具体的mode定义,可以跳转到对应的链接去看,不细讲
		switch authorizationMode {
		case modes.ModeNode:
			authorizers = append(authorizers, nodeAuthorizer)
			ruleResolvers = append(ruleResolvers, nodeAuthorizer)
		case modes.ModeAlwaysAllow:
			authorizers = append(authorizers, alwaysAllowAuthorizer)
			ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
		case modes.ModeAlwaysDeny:
			authorizers = append(authorizers, alwaysDenyAuthorizer)
			ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
		case modes.ModeABAC:
			authorizers = append(authorizers, abacAuthorizer)
			ruleResolvers = append(ruleResolvers, abacAuthorizer)
		case modes.ModeWebhook:
			authorizers = append(authorizers, webhookAuthorizer)
			ruleResolvers = append(ruleResolvers, webhookAuthorizer)
		case modes.ModeRBAC:
			authorizers = append(authorizers, rbacAuthorizer)
			ruleResolvers = append(ruleResolvers, rbacAuthorizer)
		default:
			return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
		}
	}
	return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}
const (
	// ModeAlwaysAllow is the mode to set all requests as authorized
	ModeAlwaysAllow string = "AlwaysAllow"
	// ModeAlwaysDeny is the mode to set no requests as authorized
	ModeAlwaysDeny string = "AlwaysDeny"
	// ModeABAC is the mode to use Attribute Based Access Control to authorize
	ModeABAC string = "ABAC"
	// ModeWebhook is the mode to make an external webhook call to authorize
	ModeWebhook string = "Webhook"
	// ModeRBAC is the mode to use Role Based Access Control to authorize
	ModeRBAC string = "RBAC"
	// ModeNode is an authorization mode that authorizes API requests made by kubelets.
	ModeNode string = "Node"
)

Admission

// 查看定义
err = s.Admission.ApplyTo()
func (a *AdmissionOptions) ApplyTo(){
  return a.GenericAdmission.ApplyTo()
}
func (ps *Plugins) NewFromPlugins(){
  for _, pluginName := range pluginNames {
		// InitPlugin 为初始化的工作
		plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
		if err != nil {
			return nil, err
		}
	}
}
func (ps *Plugins) InitPlugin(name string, config io.Reader, pluginInitializer PluginInitializer) (Interface, error){
  // 获取plugin
  plugin, found, err := ps.getPlugin(name, config)
}
// 查看一下Interface的定义,就是对准入机制的控制 抽象化的插件化的接口 服务于Admission Control
// Interface is an abstract, pluggable interface for Admission Control decisions.
type Interface interface {
	Handles(operation Operation) bool
}
// 再去看看获取plugin的地方
func (ps *Plugins) getPlugin(name string, config io.Reader) (Interface, bool, error) {
	ps.lock.Lock()
	defer ps.lock.Unlock()
  // 我们再去研究ps.registry这个参数是在哪里被初始化的
	f, found := ps.registry[name]
}
// 接下来,我们从kube-apiserver启动过程,逐步找到Admission被初始化的地方
// 启动命令
command := app.NewAPIServerCommand()
// server配置
s := options.NewServerRunOptions()
// admission选项
Admission:               kubeoptions.NewAdmissionOptions()
// 注册准入机制
RegisterAllAdmissionPlugins(options.Plugins)
// 准入机制的所有内容
func RegisterAllAdmissionPlugins(plugins *admission.Plugins){
  // 这里有很多plugin的注册
}
// 往上翻,我们能找到所有plugin,也就是准入机制的定义 有三十几种 已经进行了排序的
var AllOrderedPlugins = []string{
admit.PluginName,                        // AlwaysAdmit
	autoprovision.PluginName,                // NamespaceAutoProvision
	lifecycle.PluginName,                    // NamespaceLifecycle
	exists.PluginName,                       // NamespaceExists
	scdeny.PluginName,                       // SecurityContextDeny
	antiaffinity.PluginName,                 // LimitPodHardAntiAffinityTopology
	limitranger.PluginName,                  // LimitRanger
	serviceaccount.PluginName,               // ServiceAccount
	noderestriction.PluginName,              // NodeRestriction
	nodetaint.PluginName,                    // TaintNodesByCondition
	alwayspullimages.PluginName,             // AlwaysPullImages
	imagepolicy.PluginName,                  // ImagePolicyWebhook
	podsecurity.PluginName,                  // PodSecurity
	podnodeselector.PluginName,              // PodNodeSelector
	podpriority.PluginName,                  // Priority
	defaulttolerationseconds.PluginName,     // DefaultTolerationSeconds
	podtolerationrestriction.PluginName,     // PodTolerationRestriction
	eventratelimit.PluginName,               // EventRateLimit
	extendedresourcetoleration.PluginName,   // ExtendedResourceToleration
	label.PluginName,                        // PersistentVolumeLabel
	setdefault.PluginName,                   // DefaultStorageClass
	storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection
	gc.PluginName,                           // OwnerReferencesPermissionEnforcement
	resize.PluginName,                       // PersistentVolumeClaimResize
	runtimeclass.PluginName,                 // RuntimeClass
	certapproval.PluginName,                 // CertificateApproval
	certsigning.PluginName,                  // CertificateSigning
	certsubjectrestriction.PluginName,       // CertificateSubjectRestriction
	defaultingressclass.PluginName,          // DefaultIngressClass
	denyserviceexternalips.PluginName,       // DenyServiceExternalIPs
	// new admission plugins should generally be inserted above here
	// webhook, resourcequota, and deny plugins must go at the end
	mutatingwebhook.PluginName,   // MutatingAdmissionWebhook
	validatingwebhook.PluginName, // ValidatingAdmissionWebhook
	resourcequota.PluginName,     // ResourceQuota
	deny.PluginName,              // AlwaysDeny
}

GenericAPIServer的初始化

理解kube-apiserver是中的管理核心资源的KubeAPIServer是怎么启动的

New

// 先对配置进行complete补全再进行new
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
	kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
	if err != nil {
		return nil, err
	}
	return kubeAPIServer, nil
}

GenericServer

// 在APIExtensionsServer、KubeAPIServer和AggregatorServer三种Server启动时,我们都能发现这么一个函数
// APIExtensionsServer
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
// KubeAPIServer
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
// AggregatorServer
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
// 都通过GenericConfig创建了genericServer,我们先大致浏览下
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
	// 新建Handler
	apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
	// 实例化一个Server
	s := &GenericAPIServer{
    ...
  }
	// 处理钩子hook操作
	for k, v := range delegationTarget.PostStartHooks() {
		s.postStartHooks[k] = v
	}
	for k, v := range delegationTarget.PreShutdownHooks() {
		s.preShutdownHooks[k] = v
	}
	// 健康监测
	for _, delegateCheck := range delegationTarget.HealthzChecks() {
		skip := false
		for _, existingCheck := range c.HealthzChecks {
			if existingCheck.Name() == delegateCheck.Name() {
				skip = true
				break
			}
		}
		if skip {
			continue
		}
		s.AddHealthChecks(delegateCheck)
	}
  // 安装API相关参数,这个是重点
	installAPI(s, c.Config)
	return s, nil
}

NewAPIServerHandler

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	// 采用了 github.com/emicklei/go-restful 这个库作为 RESTful 接口的设计,目前了解即可
	gorestfulContainer := restful.NewContainer()
}

installAPI

一些通用的

func installAPI(s *GenericAPIServer, c *Config) {
  // 添加 /index.html 路由规则
	if c.EnableIndex {
		routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
	}
  // 添加go语言 /pprof 的路由规则,常用于性能分析
	if c.EnableProfiling {
		routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
		if c.EnableContentionProfiling {
			goruntime.SetBlockProfileRate(1)
		}
		routes.DebugFlags{}.Install(s.Handler.NonGoRestfulMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
	}
  // 添加监控相关的 /metrics 的指标路由规则
	if c.EnableMetrics {
		if c.EnableProfiling {
			routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
		} else {
			routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
		}
	}
	// 添加版本 /version 的路由规则
	routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
	// 开启服务发现
	if c.EnableDiscovery {
		s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
	}
	if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
		c.FlowControl.Install(s.Handler.NonGoRestfulMux)
	}
}

Apiserver

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
	// genericServer的初始化
	s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
	// 核心KubeAPIServer的实例化
	m := &Master{
		GenericAPIServer:          s,
		ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
	}
	// 注册Legacy API的注册
	if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
		legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{}
		if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
			return nil, err
		}
	}
	// REST接口的存储定义,可以看到很多k8s上的常见定义,比如node节点/storage存储/event事件等等
	restStorageProviders := []RESTStorageProvider{
		authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
		authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
		autoscalingrest.RESTStorageProvider{},
		batchrest.RESTStorageProvider{},
		certificatesrest.RESTStorageProvider{},
		coordinationrest.RESTStorageProvider{},
		discoveryrest.StorageProvider{},
		extensionsrest.RESTStorageProvider{},
		networkingrest.RESTStorageProvider{},
		noderest.RESTStorageProvider{},
		policyrest.RESTStorageProvider{},
		rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
		schedulingrest.RESTStorageProvider{},
		settingsrest.RESTStorageProvider{},
		storagerest.RESTStorageProvider{},
		flowcontrolrest.RESTStorageProvider{},
		// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
		// See https://github.com/kubernetes/kubernetes/issues/42392
		appsrest.StorageProvider{},
		admissionregistrationrest.RESTStorageProvider{},
		eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
	}
  // 注册API
	if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
		return nil, err
	}
	// 添加Hook
	m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
	})
	return m, nil
}

注册API的关键在InstallLegacyAPIInstallAPIs,如果你对kubernetes的资源有一定的了解,会知道核心资源都放在Legacy中如pod(如果不了解的话,点击函数看一下,就能有所有了解)

InstallLegacyAPI

// 定义了legacy和非legacy资源的路由前缀
const (
// DefaultLegacyAPIPrefix is where the legacy APIs will be located.
DefaultLegacyAPIPrefix="/api"
// APTGroupPrefix is where non-legacy API group will be located.
APIGroupPrefix ="/apis"
)
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
  // RESTStorage的初始化
	legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
  // 前缀为 /api,注册上对应的Version和Resource
  // Pod作为核心资源,没有Group的概念
	if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
		return fmt.Errorf("error in registering group versions: %v", err)
	}
	return nil
}
// 我们再细看这个RESTStorage的初始化
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
	// pod 模板
	podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
	// event事件
	eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
	// limitRange资源限制
	limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
	// resourceQuota资源配额
	resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter)
	// secret加密
	secretStorage, err := secretstore.NewREST(restOptionsGetter)
	// PV 存储
	persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
	// PVC 存储
	persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
	// ConfigMap 配置
	configMapStorage, err := configmapstore.NewREST(restOptionsGetter)
	// 等等核心资源,暂不一一列举
  // pod模板,我们的示例nginx-pod属于这个类型的资源
  podStorage, err := podstore.NewStorage()
  // 保存storage的对应关系
  restStorageMap := map[string]rest.Storage{
		"pods":             podStorage.Pod,
		"pods/attach":      podStorage.Attach,
		"pods/status":      podStorage.Status,
		"pods/log":         podStorage.Log,
		"pods/exec":        podStorage.Exec,
		"pods/portforward": podStorage.PortForward,
		"pods/proxy":       podStorage.Proxy,
		"pods/binding":     podStorage.Binding,
		"bindings":         podStorage.LegacyBinding,
    ...
  }
}

Create Pod

// 查看Pod初始化 上一步的podStorage
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
	store := &genericregistry.Store{
		NewFunc:                  func() runtime.Object { return &api.Pod{} },
		NewListFunc:              func() runtime.Object { return &api.PodList{} },
		PredicateFunc:            registrypod.MatchPod,
		DefaultQualifiedResource: api.Resource("pods"),
		// 增改删的策略
		CreateStrategy:      registrypod.Strategy,
		UpdateStrategy:      registrypod.Strategy,
		DeleteStrategy:      registrypod.Strategy,
		ReturnDeletedObject: true,
		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
	}
}
// 查看 Strategy 的初始化 是一个全局变量  进行实例化 调用了Scheme,核心资源的schme,legacyscheme
var Strategy = podStrategy{legacyscheme.Scheme, names.SimpleNameGenerator}
// 又查询到Scheme的初始化。Schema可以理解为Kubernetes的注册表,即所有的资源类型必须先注册进Schema才可使用 注册里有资源的增删改的策略
var Scheme = runtime.NewScheme()

Pod数据的保存

理解Pod发送到kube-apiserver后是怎么保存的

RESTCreateStrategy

// podStrategy 是封装了 Pod 的各类动作,这里我们先关注create这个操作
type podStrategy struct {
	runtime.ObjectTyper
	names.NameGenerator
}
// podStrategy 的接口
type RESTCreateStrategy interface {
	runtime.ObjectTyper
	names.NameGenerator
  // 是否属于当前的 namespace
	NamespaceScoped() bool
  // 准备创建前的检查
	PrepareForCreate(ctx context.Context, obj runtime.Object)
  // 验证资源对象
	Validate(ctx context.Context, obj runtime.Object) field.ErrorList
  // 规范化
	Canonicalize(obj runtime.Object)
}
// 完成了检查,我们就要保存数据了

Storage

// PodStorage 是 Pod 存储的实现,里面包含了多个存储的定义
type PodStorage struct {
  // REST implements a RESTStorage for pods
	Pod                 *REST
  // BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
	Binding             *BindingREST
  // LegacyBindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
	LegacyBinding       *LegacyBindingREST
	Eviction            *EvictionREST
  // StatusREST implements the REST endpoint for changing the status of a pod.
	Status              *StatusREST
  // EphemeralContainersREST implements the REST endpoint for adding EphemeralContainers
	EphemeralContainers *EphemeralContainersREST
	Log                 *podrest.LogREST
	Proxy               *podrest.ProxyREST
	Exec                *podrest.ExecREST
	Attach              *podrest.AttachREST
	PortForward         *podrest.PortForwardREST
}
/*
从上一节的map关系中,保存在REST中
restStorageMap := map[string]rest.Storage{
		"pods":             podStorage.Pod,
}
*/
type REST struct {
	*genericregistry.Store
        // 代理传输层 大概率是和网络相关的先不看
	proxyTransport http.RoundTripper
}
// Store是一个通用的数据结构
type Store struct {
	// Storage定义
        ...
	Storage DryRunnableStorage
}
// DryRunnableStorage中的Storage是一个Interface
type DryRunnableStorage struct {
	Storage storage.Interface
        // 和编解码相关的codec
	Codec   runtime.Codec
}
func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error {
	if dryRun {
		if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err == nil {
			return storage.NewKeyExistsError(key, 0)
		}
		return s.copyInto(obj, out)
	}
  // 这里,就是Create的真正调用
	return s.Storage.Create(ctx, key, obj, out, ttl)
}

Storage Implement

// Storage Interface 的定义,包括基本的增删改查,以及watch等等进阶操作
type Interface interface {
	Versioner() Versioner
	Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
	Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
	Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
	WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
	Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
	GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
	List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
	GuaranteedUpdate(
		ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
		precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
	Count(key string) (int64, error)
}
// 去找Storage的初始化
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
	return factory.Create(*config)
}
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
	switch c.Type {
  // 已经不支持etcd2
	case "etcd2":
		return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
  // 默认为etcd3版本
	case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
		return newETCD3Storage(c)
	default:
		return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
	}
}

Summary

  • kube-apiserver 包含三个apiserverAPIExtensionsServerKubeAPIServerAggregatorServer三个APIServer底层均依赖通用的GenericServer,使用go-restful对外提供RESTful风格的API服务,三个server,都有两类配置一类是专有的一个通用的genericServer,通用的配置中有三种Authentication/Authorization/Admission,控制权限的方式,
  • kube-apiserver 对请求进行 AuthenticationAuthorizationAdmission三层验证,Admission是插件化的,可以通过webhook来拓展
  • 完成验证后,请求会根据路由规则,触发到对应资源的handler,主要包括数据的预处理保存,pod的底层是podStorage的对象,使用到注册表schme
  • kube-apiserver 的底层存储为etcd v3,它被抽象为一种RESTStorage,使网络请求和底层存储操作一一对应

以上就是Kubernetes ApiServer三大server权限与数据存储解析的详细内容,更多关于Kubernetes ApiServer权限存储的资料请关注我们其它相关文章!

(0)

相关推荐

  • Visitor设计模式及发送pod创建请求实现详解

    目录 确立目标 visitor design pattern Visitor Chained VisitorList EagerVisitorList DecoratedVisitor ContinueOnErrorVisitor FlattenListVisitor FilteredVisitor Implements StreamVisitor FileVisitor URLVisitor KustomizeVisitor 发送创建Pod请求的实现细节 send request RESTfu

  • Kubernetes 权限管理认证鉴权详解

    目录 正文 认证 认证用户 Normal Users Service Accounts 认证策略 客户端证书 不记名令牌 Static Token File Service Account Tokens OpenID Connect Tokens 鉴权 鉴权流程 鉴权模块 RBAC Role 和 ClusterRole RoleBinding 和 ClusterRoleBinding Service Account 最后 正文 Kubernetes 主要通过 API Server 对外提供服务,

  • Kubernetes kubectl中Pod创建流程源码解析

    目录 确立目标 先写一个Pod的Yaml 部署Pod 查询Pod kubectl create 的调用逻辑 Main Match Command Create RunCreate Summary 确立目标 从创建pod的全流程入手,了解各组件的工作内容,组件主要包括以下 kubectl kube-apiserver kube-scheduler kube-controller kubelet 理解各个组件之间的相互协作,目前是kubectl 先写一个Pod的Yaml apiVersion: v1

  • Kubernetes应用服务质量管理详解

    目录 服务质量管理 QoS模型 Guaranteed Burstable BestEffort cpuset LimitRange 服务可用性管理 高可用 可用性 PDB 总结 服务质量管理 在Kubernetes中,Pod是最小的调度单元,所以跟资源和调度相关的属性都是Pod对象的字段,而其中最重要的就是CPU和内存. 如下所示: --- apiVersion: v1 kind: Pod metadata: name: pod-demo spec: containers: - name: my

  • Kubernetes存储系统数据持久化管理详解

    目录 引言 安装存储系统 PV PVC StorageClass 安装NFS Provisioner 使用StorageClass 总结 引言 Kubernetes为了能更好的支持有状态应用的数据存储问题,除了基本的HostPath和EmptyDir提供的数据持久化方案之外,还提供了PV,PVC和StorageClass资源对象来对存储进行管理. PV的全称是Persistent Volume(持久化卷),是对底层数据存储的抽象,PV由管理员创建.维护以及配置,它和底层的数据存储实现方法有关,比

  • Kubernetes ApiServer三大server权限与数据存储解析

    目录 确立目标 Run Three Servers KubeAPIServer GenericConfig Authentication Authorization Admission GenericAPIServer的初始化 New GenericServer NewAPIServerHandler installAPI Apiserver InstallLegacyAPI Create Pod Pod数据的保存 RESTCreateStrategy Storage Storage Imple

  • 使用.NET 6开发TodoList应用之引入数据存储的思路详解

    需求 作为后端CRUD程序员(bushi,数据存储是开发后端服务一个非常重要的组件.对我们的TodoList项目来说,自然也需要配置数据存储.目前的需求很简单: 需要能持久化TodoList对象并对其进行操作: 需要能持久化TodoItem对象并对其进行操作: 问题是,我们打算如何存储数据? 存储组件的选择非常多:以MSSQL Server/Postgres/MySql/SQLite等为代表的关系型数据库,以MongoDB/ElasticSearch等为代表的非关系型数据库,除此之外,我们还可以

  • .NET 6开发TodoList应用引入数据存储

    目录 一.需求 二.目标 三.原理和思路 四.实现 1. 引入Nuget包并进行配置 2. 添加DBContext对象并进行配置# 3. 配置文件修改 4. 主程序配置 5. 本地运行MSSQL Server容器及数据持久化 五.验证 一.需求 作为后端CRUD程序员(bushi,数据存储是开发后端服务一个非常重要的组件.对我们的TodoList项目来说,自然也需要配置数据存储. 目前的需求很简单: 需要能持久化TodoList对象并对其进行操作: 需要能持久化TodoItem对象并对其进行操作

  • Kubernetes Informer数据存储Index与Pod分配流程解析

    目录 确立目标 Process 查看消费的过程 Index 掌握Index数据结构 distribute 信息的分发distribute 理解一个pod的被调度的大致流程 Scheduler SchedulingQueue scheduleOne ScheduleResult 调度计算结果 Assume 初步推算 Bind 实际绑定 Update To Etcd Summary 确立目标 理解Informer的数据存储方式 大致理解Pod的分配流程 理解Informer的数据存储方式 代码在k8

  • MySQL如何更改数据库数据存储目录详解

    前言 MySQL数据库默认的数据库文件位于/var/lib/mysql下,有时候由于存储规划等原因,需要更改MySQL数据库的数据存储目录.下文总结整理了实践过程的操作步骤.话不多说了,一起来看看吧 方法如下: 1:确认MySQL数据库存储目录 [root@DB-Server tmp]# mysqladmin -u root -p variables | grep datadir Enter password: | datadir | /var/lib/mysql/ 2:关闭MySQL服务 在更

  • Centos7 移动mysql5.7.19 数据存储位置的操作方法

    场景:随着数据量的增加,mysql所在的磁盘已占满,需要将data移动到空间较大的盘上. 方法: 1. 关闭mysql服务 service mysqld stop 2. 将data目录移动到空间较大的盘上 cp -a /usr/local/mysql/data/ /home/mysqldata/ -a :相当于 -pdr 的意思(参数pdr分别为:保留权限,复制软链接本身,递归复制): 3 . 修改配置文件my.cnf ... sql_mode=NO_ENGINE_SUBSTITUTION,ST

  • 在android开发中进行数据存储与访问的多种方式介绍

    数据存储与访问 很多时候我们的软件需要对处理后的数据进行存储或再次访问.Android为数据存储提供了多种方式,分别有如下几种: 文件 SharedPreferences SQLite数据库 内容提供者(Content provider) 网络 使用文件进行数据存储 首先给大家介绍使用文件如何对数据进行存储,Activity提供了openFileOutput()方法可以用于把数据输出到文件中,具体的实现过程与在J2SE环境中保存数据到文件中是一样的. 复制代码 代码如下: public clas

  • ASP.NET MVC5 网站开发框架模型、数据存储、业务逻辑(三)

    前面项目的层次和调用关系都说明了,关系如下图 采用三层架构的时候,研究过BLL层的必要性,觉得业务逻辑完全可以在controller里实现,没有必要单独做一个项目,另一个分层多了会影响性能.后来我还是把业务逻辑独立出来,原因如下: 业务逻辑写进controller里代码看着比较混乱,时间久了代码容易理不清. 在controller里直接写逻辑重复代码会不较多,开发效率低. 分项目有利于代码重用,有时候可以直接拿到其他项目中稍作修改就可以用. 对于性能我觉得分层多了肯定会有影响,但是不会很大.现在

  • 5种Android数据存储方式汇总

    本文介绍Android中的5种数据存储方式. 数据存储在开发中是使用最频繁的,在这里主要介绍Android平台中实现数据存储的5种方式,分别是: 1 使用SharedPreferences存储数据 2 文件存储数据 3 SQLite数据库存储数据 4 使用ContentProvider存储数据 5 网络存储数据 下面将为大家一一详细介绍. 第一种:使用SharedPreferences存储数据 SharedPreferences是Android平台上一个轻量级的存储类,主要是保存一些常用的配置比

  • 四种Android数据存储方式

    Android提供以下四种存储方式: SharePreference SQLite File ContentProvider Android系统中数据基本都是私有的,一般存放在"data/data/程序包名"目录下.如果要实现数据共享,正确的方式是使用ContentProvider. SharedPreference SharedPreference是一种轻型的数据存储方式,实际上是基于XML文件存储的"key-value"键值对数据.通常用来存储程序的一些配置信息

随机推荐