watchCache的结构如下所示:
- type watchCache struct {
- sync.RWMutex //同步锁
- cond *sync.Cond //条件变量
- capacity int//历史滑动窗口容量
- keyFunc func(runtime.Object) (string, error)//从storage中获取键值
- getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)//获取一个对象的field和label信息
- cache []watchCacheElement//循环队列缓存
- startIndex int//循环队列的起始下标
- endIndex int//循环队列的结束下标
- store cache.Store//
- resourceVersion uint64
- onReplace func()
- onEvent func(*watchCacheEvent)//在每次缓存中的数据发生Add/Update/Delete后都会调用该函数,来获取对象的之前版本的值
- clock clock.Clock
- versioner storage.Versioner
- }
cache里面存放的是所有操作事件,而store中存放的是当前最新的事件。
4 cacheWatcher从watchCache中拿到从某个resourceVersion以来的所有数据,即initEvents,然后将数据放到input这个channel里面去,通过filter然后输出到result这个channel里面,返回数据到某个client。
- type cacheWatcher struct {
- sync.Mutex//同步锁
- input chan *watchCacheEvent//输入管道,Apiserver都事件发生时都会通过广播的形式向input管道进行发送
- result chan watch.Event//输出管道,输出到update管道中去
- done chan struct{}
- filter filterWithAttrsFunc//过滤器
- stopped bool
- forget func(bool)
- versioner storage.Versioner
- }
从一个pod创建过程看k8s组件通信
我们再回到上面的Pod创建流程图。从图中我们可以看出以下信息:
1 首先各组件也会在初始化时向Apiserver发送watch请求,即在图中标0的指令。Apiserver在创建kubeApiserver并注册各API路由信息时,获取Watch请求的路由信息
2 从Kubectl向Apiserver发送创建pod请求起,每一步创建、更新操作,都会存储到etcd中。
3 各组件向Apiserver发送watch请求,Apiserver从etcd获取最新数据并返回。
注意:当事件发生时,Apiserver会给这些watcher中的通道推送,每个watcher都有自己的Filter过滤,找到自己想要监听的事件则通过管道的方式将该数据发送到相应的组件。 【编辑推荐】 - 为什么说Kubernetes的崛起预示着云原生时代到来?
- Kubernetes研究:图解Kubernetes网络
- Kubernetes研究:网络原理及方案(网络原理基础经典版)
- 学习Kubernetes,这些负载均衡知识点得知道!
- 十大Kubernetes开源监控工具
【责任编辑:武晓燕 TEL:(010)68476606】
点赞 0 (编辑:晋中站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|