意见箱
恒创运营部门将仔细参阅您的意见和建议,必要时将通过预留邮箱与您保持联络。感谢您的支持!
意见/建议
提交建议

如何分析SuperEdge拓扑算法

来源:恒创科技 编辑:恒创科技编辑部
2024-01-31 23:08:59

这篇文章主要为大家分析了如何分析SuperEdge 拓扑算法的相关知识点,内容详细易懂,操作细节合理,具有一定参考价值。如果感兴趣的话,不妨跟着跟随小编一起来看看,下面跟着小编一起深入学习“如何分析SuperEdge 拓扑算法”的知识吧。


前言

SuperEdge 介绍

SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该系统把云原生能力扩展到边缘侧,很好的实现了云端对边缘端的管理和控制。同时 superedge 自研了 service group 实现了基于边缘计算的服务访问控制,极大简化了应用从云端部署到边缘端的过程。

SuperEdgeservicegroup拓扑感知特性

SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问。


如何分析SuperEdge拓扑算法

在深入分析 application-grid-wrapper 之前,这里先简单介绍一下社区 Kubernetes 原生支持的拓扑感知特性[1]

Kubernetes service topology awareness 特性于v1.17发布 alpha 版本,用于实现路由拓扑以及就近访问特性。用户需要在service 中添加 topologyKeys 字段标示拓扑key类型,只有具有相同拓扑域的 endpoint 会被访问到,目前有三种 topologyKeys 可供选择:

"kubernetes.io/hostname":访问本节点内( kubernetes.io/hostname label value相同)的endpoint,如果没有则service访问失败
"topology.kubernetes.io/zone":访问相同zone域内( topology.kubernetes.io/zone label value相同)的endpoint,如果没有则service访问失败
"topology.kubernetes.io/region":访问相同region域内( topology.kubernetes.io/region label value 相同)的 endpoint,如果没有则 service 访问失败

除了单独填写如上某一个拓扑key之外,还可以将这些key构造成列表进行填写,例如:["kubernetes.io/hostname", "topology.kubernetes.io/zone", "topology.kubernetes.io/region"],这表示:优先访问本节点内的 endpoint;如果不存在,则访问同一个 zone 内的 endpoint;如果再不存在,则访问同一个 region 内的 endpoint,如果都不存在则访问失败。

另外,还可以在列表最后(只能最后一项)添加"*"表示:如果前面拓扑域都失败,则访问任何有效的 endpoint,也即没有限制拓扑了,示例如下:
#AServicethatprefersnodelocal,zonal,thenregionalendpointsbutfallsbacktoclusterwideendpoints.
apiVersion:v1
kind:Service
metadata:
name:my-service
spec:
selector:
app:my-app
ports:
-protocol:TCP
port:80
targetPort:9376
topologyKeys:
-"kubernetes.io/hostname"
-"topology.kubernetes.io/zone"
-"topology.kubernetes.io/region"
-"*"

而service group 实现的拓扑感知和社区对比,有如下区别:

service group 拓扑 key 可以自定义,也即为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"。
service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint 的访问。

service group 实现的拓扑感知,service 配置如下:

#AServicethatonlyprefersnodezone1alendpoints.
apiVersion:v1
kind:Service
metadata:
annotations:
topologyKeys:'["zone1"]'
labels:
superedge.io/grid-selector:servicegrid-demo
name:servicegrid-demo-svc
spec:
ports:
-port:80
protocol:TCP
targetPort:8080
selector:
appGrid:echo

在介绍完 service group 实现的拓扑感知后,我们深入到源码分析实现细节。同样的,这里以一个使用示例开始分析:

#step1:labelsedgenodes
$kubectlgetnodes
NAMESTATUSROLESAGEVERSIO
Nnode0Ready<none>16dv1.16.7
node1 Ready<none>16dv1.16.7
node2 Ready<none>16dv1.16.7
#nodeunit1(nodegroupandservicegroupzone1)
$kubectl--kubeconfigconfiglabelnodesnode0zone1=nodeunit1
#nodeunit2(nodegroupandservicegroupzone1)
$kubectl--kubeconfigconfiglabelnodesnode1zone1=nodeunit2
$kubectl--kubeconfigconfiglabelnodesnode2zone1=nodeunit2

...

#step3:deployechoServiceGrid
$cat<<EOF|kubectl--kubeconfigconfigapply-f-
apiVersion:superedge.io/v1
kind:ServiceGrid
metadata:
name:servicegrid-demo
namespace:default
spec:
gridUniqKey:zone1
template:
selector:
appGrid:echo
ports:
-protocol:TCP
port:80
targetPort:8080
EOF
servicegrid.superedge.io/servicegrid-democreated
#notethatthereisonlyonerelevantservicegenerated
$kubectlgetsvc
NAME TYPECLUSTER-IPEXTERNAL-IPPORT(S)AGE
kubernetesClusterIP192.168.0.1<none>443/TCP16d
servicegrid-demo-svcClusterIP192.168.6.139<none>80/TCP10m

#step4:accessservicegrid-demo-svc(servicetopologyandclosed-looped)
#executeonnode0
$curl192.168.6.139|grep"nodename"nodename:node0
#executeonnode1andnode2
$curl192.168.6.139|grep"nodename"
nodename:node2
$curl192.168.6.139|grep"nodename"
nodename:node1

在创建完 ServiceGrid CR 后,ServiceGrid Controller 负责根据 ServiceGrid产生对应的 service (包含由 serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations);而 application-grid-wrapper 根据 service 实现拓扑感知,下面依次分析。

ServiceGrid Controller 分析

ServiceGrid Controller 逻辑和 DeploymentGrid Controller 整体一致,如下:

1、创建并维护 service group 需要的若干 CRDs(包括:ServiceGrid)
2、监听 ServiceGrid event,并填充 ServiceGrid 到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创建并且维护对应的 service
3、监听 service event,并将相关的 ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑

注意这里区别于 DeploymentGrid Controller:

一个 ServiceGrid 对象只产生一个 service
只需额外监听 service event,无需监听 node 事件。因为 node 的CRUD与 ServiceGrid 无关
ServiceGrid 对应产生的 service,命名为: {ServiceGrid}-svc
func(sgc*ServiceGridController)syncServiceGrid(keystring)error{
startTime:=time.Now()
klog.V(4).Infof("Startedsyncingservicegrid%q(%v)",key,startTime)
deferfunc(){
klog.V(4).Infof("Finishedsyncingservicegrid%q(%v)",key,time.Since(startTime))
}()

namespace,name,err:=cache.SplitMetaNamespaceKey(key)
iferr!=nil{
returnerr
}

sg,err:=sgc.svcGridLister.ServiceGrids(namespace).Get(name)
iferrors.IsNotFound(err){
klog.V(2).Infof("servicegrid%vhasbeendeleted",key)
returnnil
}
iferr!=nil{
returnerr
}

ifsg.Spec.GridUniqKey==""{
sgc.eventRecorder.Eventf(sg,corev1.EventTypeWarning,"Empty","Thisservicegridhasanemptygridkey")
returnnil
}

//getserviceworkloadlistofthisgrid
svcList,err:=sgc.getServiceForGrid(sg)
iferr!=nil{
returnerr
}

ifsg.DeletionTimestamp!=nil{
returnnil
}

//syncservicegridrelevantservicesworkload
returnsgc.reconcile(sg,svcList)
}

func(sgc*ServiceGridController)getServiceForGrid(sg*crdv1.ServiceGrid)([]*corev1.Service,error){
svcList,err:=sgc.svcLister.Services(sg.Namespace).List(labels.Everything())
iferr!=nil{
returnnil,err
}

labelSelector,err:=common.GetDefaultSelector(sg.Name)
iferr!=nil{
returnnil,err
}

canAdoptFunc:=controller.RecheckDeletionTimestamp(func()(metav1.Object,error)
{
fresh,err:=
sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(),sg.Name,metav1.GetOptions{})
iferr!=nil{
returnnil,err
}
iffresh.UID!=sg.UID{
returnnil,fmt.Errorf("orignalservicegrid%v/%visgone:gotuid%v,wanted%v",sg.Namespace,
sg.Name,fresh.UID,sg.UID)
}
returnfresh,nil
})

cm:=controller.NewServiceControllerRefManager(sgc.svcClient,sg,labelSelector,util.ControllerKind,canAdoptFunc)
returncm.ClaimService(svcList)
}

func(sgc*ServiceGridController)reconcile(g*crdv1.ServiceGrid,svcList
[]*corev1.Service)error{
var(
adds[]*corev1.Service
updates[]*corev1.Service
deletes[]*corev1.Service
)

sgTargetSvcName:=util.GetServiceName(g)
isExistingSvc:=false
for_,svc:=rangesvcList{
ifsvc.Name==sgTargetSvcName{
isExistingSvc=true
template:=util.KeepConsistence(g,svc)
if!apiequality.Semantic.DeepEqual(template,svc){
updates=append(updates,template)
}
}else{
deletes=append(deletes,svc)
}
}

if!isExistingSvc{
adds=append(adds,util.CreateService(g))
}

returnsgc.syncService(adds,updates,deletes)
}

funcCreateService(sg*crdv1.ServiceGrid)*corev1.Service{
svc:=&corev1.Service{
ObjectMeta:metav1.ObjectMeta{
Name:GetServiceName(sg),
Namespace:sg.Namespace,
//AppendexistedServiceGridlabelstoservicetobecreated
Labels:func()map[string]string{
ifsg.Labels!=nil{
newLabels:=sg.Labels
newLabels[common.GridSelectorName]=sg.Name
newLabels[common.GridSelectorUniqKeyName]=sg.Spec.GridUniqKey
returnnewLabels
}else{
returnmap[string]string{
common.GridSelectorName:sg.Name,
common.GridSelectorUniqKeyName:sg.Spec.GridUniqKey,
}
}
}(),
Annotations:make(map[string]string),
},
Spec:sg.Spec.Template,
}

keys:=make([]string,1)
keys[0]=sg.Spec.GridUniqKey
keyData,_:=json.Marshal(keys)
svc.Annotations[common.TopologyAnnotationsKey]=string(keyData)

returnsvc
}

由于逻辑与 DeploymentGrid 类似,这里不展开细节,重点关注 application-grid-wrapper 部分。

application-grid-wrapper 分析

在 ServiceGrid Controller 创建完 service 之后,application-grid-wrapper 的作用就开始启动了:

apiVersion:v1
kind:Service
metadata:
annotations:
topologyKeys:'["zone1"]'
creationTimestamp:"2021-03-03T07:33:30Z"
labels:
superedge.io/grid-selector:servicegrid-demo
name:servicegrid-demo-svc
namespace:default
ownerReferences:
-apiVersion:superedge.io/v1
blockOwnerDeletion:true
controller:true
kind:ServiceGrid
name:servicegrid-demo
uid:78c74d3c-72ac-4e68-8c79-f1396af5a581
resourceVersion:"127987090"
selfLink:/api/v1/namespaces/default/services/servicegrid-demo-svc
uid:8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfc
spec:
clusterIP:192.168.161.1
ports:
-port:80
protocol:TCP
targetPort:8080
selector:
appGrid:echo
sessionAffinity:None
type:ClusterIP
status:
loadBalancer:{}

为了实现 Kubernetes 零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,架构如下:

调用链路如下:

kube-proxy->application-grid-wrapper->lite-apiserver->kube-apiserver

因此 application-grid-wrapper 会起服务,接受来自 kube-proxy 的请求,如下:

func(s*interceptorServer)Run(debugbool,bindAddressstring,insecurebool,caFile,certFile,keyFilestring)error{
...
klog.Infof("Starttoruninterceptorserver")
/*filter
*/
server:=&http.Server{Addr:bindAddress,Handler:s.buildFilterChains(debug)}

ifinsecure{
returnserver.ListenAndServe()
}
...
server.TLSConfig=tlsConfig
returnserver.ListenAndServeTLS("","")
}

func(s*interceptorServer)buildFilterChains(debugbool)http.Handler{
handler:=http.Handler(http.NewServeMux())

handler=s.interceptEndpointsRequest(handler)
handler=s.interceptServiceRequest(handler)
handler=s.interceptEventRequest(handler)
handler=s.interceptNodeRequest(handler)
handler=s.logger(handler)

ifdebug{
handler=s.debugger(handler)
}

returnhandler
}
这里会首先创建 interceptorServer,然后注册处理函数,由外到内依次如下:

debug:接受 debug 请求,返回 wrapper pprof 运行信息

logger:打印请求日志

node:接受 kube-proxy node GET(/api/v1/nodes/{node})请求,并返回 node信息

event:接受 kube-proxy events POST(/events)请求,并将请求转发给 lite-apiserver

func(s*interceptorServer)interceptEventRequest(handlerhttp.Handler)http.Handler{
returnhttp.HandlerFunc(func(whttp.ResponseWriter,r*http.Request){ ifr.Method!=http.MethodPost||!strings.HasSuffix(r.URL.Path,"/events"){
handler.ServeHTTP(w,r)
return
}

targetURL,_:=url.Parse(s.restConfig.Host)
reverseProxy:=httputil.NewSingleHostReverseProxy(targetURL)
reverseProxy.Transport,_=rest.TransportFor(s.restConfig)
reverseProxy.ServeHTTP(w,r)
})
}

service:接受 kube-proxy service List&Watch(/api/v1/services)请求,并根据 storageCache 内容返回(GetServices)

endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints)请求,并根据 storageCache 内容返回(GetEndpoints)

下面先重点分析 cache 部分的逻辑,然后再回过头来分析具体的 http handler List&Watch 处理逻辑。

wrapper 为了实现拓扑感知,自己维护了一个 cache,包括:node,service,endpoint。可以看到在 setupInformers 中注册了这三类资源的处理函数:

typestorageCachestruct{
//hostNameisthenodeNameofnodewhichapplication-grid-wrapperdeployson
hostNamestring
wrapperInClusterbool

//mulockprotectthefollowingmapstructure
musync.RWMutex
servicesMapmap[types.NamespacedName]*serviceContainer
endpointsMapmap[types.NamespacedName]*endpointsContainer
nodesMapmap[types.NamespacedName]*nodeContainer

//servicewatchchannel
serviceChanchan<-watch.Event
//endpointswatchchannel
endpointsChanchan<-watch.Event
}
...
funcNewStorageCache(hostNamestring,wrapperInClusterbool,serviceNotifier,endpointsNotifierchanwatch.Event)*storageCache{
msc:=&storageCache{
hostName:hostName,
wrapperInCluster:wrapperInCluster,
servicesMap:make(map[types.NamespacedName]*serviceContainer),
endpointsMap:make(map[types.NamespacedName]*endpointsContainer),
nodesMap:make(map[types.NamespacedName]*nodeContainer),
serviceChan:serviceNotifier,
endpointsChan:endpointsNotifier,
}

returnmsc
}
...
func(s*interceptorServer)Run(debugbool,bindAddressstring,insecurebool,caFile,certFile,keyFilestring)error{
...
iferr:=s.setupInformers(ctx.Done());err!=nil{
returnerr
}

klog.Infof("Starttoruninterceptorserver")
/*filter
*/
server:=&http.Server{Addr:bindAddress,Handler:s.buildFilterChains(debug)}
...
returnserver.ListenAndServeTLS("","")
}

func(s*interceptorServer)setupInformers(stop<-chanstruct{})error{
klog.Infof("Starttorunserviceandendpointsinformers")
noProxyName,err:=labels.NewRequirement(apis.LabelServiceProxyName,selection.DoesNotExist,nil)
iferr!=nil{
klog.Errorf("can'tparseproxylabel,%v",err)
returnerr
}

noHeadlessEndpoints,err:=labels.NewRequirement(v1.IsHeadlessService,selection.DoesNotExist,nil)
iferr!=nil{
klog.Errorf("can'tparseheadlesslabel,%v",err)
returnerr
}

labelSelector:=labels.NewSelector()
labelSelector=labelSelector.Add(*noProxyName,*noHeadlessEndpoints)

resyncPeriod:=time.Minute*5
client:=kubernetes.NewForConfigOrDie(s.restConfig)
nodeInformerFactory:=informers.NewSharedInformerFactory(client,resyncPeriod)
informerFactory:=informers.NewSharedInformerFactoryWithOptions(client,resyncPeriod,
informers.WithTweakListOptions(func(options*metav1.ListOptions){
options.LabelSelector=labelSelector.String()
}))

nodeInformer:=nodeInformerFactory.Core().V1().Nodes().Informer()
serviceInformer:=informerFactory.Core().V1().Services().Informer()
endpointsInformer:=informerFactory.Core().V1().Endpoints().Informer()

/*
*/
nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(),resyncPeriod)
serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(),resyncPeriod)

endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(),resyncPeriod)

gonodeInformer.Run(stop)
goserviceInformer.Run(stop)
goendpointsInformer.Run(stop)

if!cache.WaitForNamedCacheSync("node",stop,
nodeInformer.HasSynced,
serviceInformer.HasSynced,
endpointsInformer.HasSynced){
returnfmt.Errorf("can'tsyncinformers")
}

returnnil
}

func(sc*storageCache)NodeEventHandler()cache.ResourceEventHandler{
return&nodeHandler{cache:sc}
}

func(sc*storageCache)ServiceEventHandler()cache.ResourceEventHandler{
return&serviceHandler{cache:sc}

}
func(sc*storageCache)EndpointsEventHandler()cache.ResourceEventHandler{
return&endpointsHandler{cache:sc}
}
这里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:

1、NodeEventHandler

NodeEventHandler 负责监听 node 资源相关 event,并将 node 以及 node Labels 添加到 storageCache.nodesMap 中(key为nodeName,value为node以及node labels)。

func(nh*nodeHandler)add(node*v1.Node){
sc:=nh.cache

sc.mu.Lock()

nodeKey:=types.NamespacedName{Namespace:node.Namespace,Name:node.Name}
klog.Infof("Addingnode%v",nodeKey)
sc.nodesMap[nodeKey]=&nodeContainer{
node:node,
labels:node.Labels,
}
//updateendpoints
changedEps:=sc.rebuildEndpointsMap()

sc.mu.Unlock()

for_,eps:=rangechangedEps{
sc.endpointsChan<-eps
}
}

func(nh*nodeHandler)update(node*v1.Node){
sc:=nh.cache

sc.mu.Lock()
nodeKey:=types.NamespacedName{Namespace:node.Namespace,Name:node.Name}
klog.Infof("Updatingnode%v",nodeKey)
nodeContainer,found:=sc.nodesMap[nodeKey]
if!found{
sc.mu.Unlock()
klog.Errorf("Updatingnon-existednode%v",nodeKey)
return
}

nodeContainer.node=node
//returndirectlywhenlabelsofnodestayunchanged
ifreflect.DeepEqual(node.Labels,nodeContainer.labels){
sc.mu.Unlock()
return
}
nodeContainer.labels=node.Labels

//updateendpoints
changedEps:=sc.rebuildEndpointsMap()

sc.mu.Unlock()

for_,eps:=rangechangedEps{
sc.endpointsChan<-eps
}
}
...

同时由于 node 的改变会影响 endpoint,因此会调用 rebuildEndpointsMap 刷新 storageCache.endpointsMap。

//rebuildEndpointsMapupdatesallendpointsstoredinstorageCache.endpointsMapdynamicallyandconstructsrelevantmodifiedevents
func(sc*storageCache)rebuildEndpointsMap()[]watch.Event{
evts:=make([]watch.Event,0)
forname,endpointsContainer:=rangesc.endpointsMap{
newEps:=pruneEndpoints(sc.hostName,sc.nodesMap,sc.servicesMap,endpointsContainer.endpoints,sc.wrapperInCluster)
ifapiequality.Semantic.DeepEqual(newEps,endpointsContainer.modified){
continue
}
sc.endpointsMap[name].modified=newEps
evts=append(evts,watch.Event{
Type:watch.Modified,
Object:newEps,
})
}
returnevts
}

rebuildEndpointsMap 是 cache 的核心函数,同时也是拓扑感知的算法实现:

//pruneEndpointsfiltersendpointsusingserviceTopologyrulescombinedbyservicestopologyKeysandnodelabels
funcpruneEndpoints(hostNamestring,
nodesmap[types.NamespacedName]*nodeContainer,
servicesmap[types.NamespacedName]*serviceContainer,
eps*v1.Endpoints,wrapperInClusterbool)*v1.Endpoints{

epsKey:=types.NamespacedName{Namespace:eps.Namespace,Name:eps.Name}

ifwrapperInCluster{
eps=genLocalEndpoints(eps)
}

//danglingendpoints
svc,ok:=services[epsKey]
if!ok{
klog.V(4).Infof("Danglingendpoints%s,%+#v",eps.Name,eps.Subsets)
returneps
}

//normalservice
iflen(svc.keys)==0{
klog.V(4).Infof("Normalendpoints%s,%+#v",eps.Name,eps.Subsets)
returneps
}

//topologyendpoints
newEps:=eps.DeepCopy()
forsi:=rangenewEps.Subsets{
subnet:=&newEps.Subsets[si]
subnet.Addresses=filterConcernedAddresses(svc.keys,hostName,nodes,subnet.Addresses)
subnet.NotReadyAddresses=filterConcernedAddresses(svc.keys,hostName,nodes,subnet.NotReadyAddresses)
}
klog.V(4).Infof("Topologyendpoints%s:subnetsfrom%+#vto%+#v",eps.Name,eps.Subsets,newEps.Subsets)

returnnewEps
}

//filterConcernedAddressesaimstofilteroutendpointsaddresseswithinthesamenodeunit
funcfilterConcernedAddresses(topologyKeys[]string,hostNamestring,nodes
map[types.NamespacedName]*nodeContainer,
addresses[]v1.EndpointAddress)[]v1.EndpointAddress{
hostNode,found:=nodes[types.NamespacedName{Name:hostName}]
if!found{
returnnil
}

filteredEndpointAddresses:=make([]v1.EndpointAddress,0)
fori:=rangeaddresses{
addr:=addresses[i]
ifnodeName:=addr.NodeName;nodeName!=nil{
epsNode,found:=nodes[types.NamespacedName{Name:*nodeName}]
if!found{
continue
}
ifhasIntersectionLabel(topologyKeys,hostNode.labels,epsNode.labels){
filteredEndpointAddresses=append(filteredEndpointAddresses,addr)
}
}
}

returnfilteredEndpointAddresses
}

funchasIntersectionLabel(keys[]string,n1,n2map[string]string)bool{
ifn1==nil||n2==nil{
returnfalse
}

for_,key:=rangekeys{
val1,v1found:=n1[key]
val2,v2found:=n2[key]

ifv1found&&v2found&&val1==val2{
returntrue
}
}

returnfalse
}

算法逻辑如下:

判断 endpoint 是否为 default kubernetes service,如果是,则将该 endpoint 转化为 wrapper 所在边缘节点的 lite-apiserver 地址(127.0.0.1)和端口(5100 3)。
apiVersion:v1
kind:Endpoints
metadata:
annotations:
superedge.io/local-endpoint:127.0.0.1
superedge.io/local-port:"51003"
name:kubernetes
namespace:default
subsets:
-addresses:
-ip:172.31.0.60
ports:
-name:https
port:xxx
protocol:TCP
funcgenLocalEndpoints(eps*v1.Endpoints)*v1.Endpoints{
ifeps.Namespace!=metav1.NamespaceDefault||eps.Name!=MasterEndpointName{
returneps
}

klog.V(4).Infof("begintogenlocalep%v",eps)
ipAddress,e:=eps.Annotations[EdgeLocalEndpoint]
if!e{
returneps
}

portStr,e:=eps.Annotations[EdgeLocalPort]
if!e{
returneps
}

klog.V(4).Infof("getlocalendpoint%s:%s",ipAddress,portStr)
port,err:=strconv.ParseInt(portStr,10,32)
iferr!=nil{
klog.Errorf("parseint%serr%v",portStr,err)
returneps
}

ip:=net.ParseIP(ipAddress)
ifip==nil{
klog.Warningf("parseip%snil",ipAddress)
returneps
}

nep:=eps.DeepCopy()
nep.Subsets=[]v1.EndpointSubset{
{
Addresses:[]v1.EndpointAddress{
{
IP:ipAddress,
},
},
Ports:[]v1.EndpointPort{
{
Protocol:v1.ProtocolTCP,
Port:int32(port),
Name:"https",
},
},
},
}

klog.V(4).Infof("gennewendpointcomplete%v",nep)
returnnep
}
这样做的目的是使边缘节点上的服务采用集群内 (InCluster) 方式访问的 apiserver 为本地的 lite-apiserver,而不是云端的 apiserver。

从 storageCache.servicesMap cache 中根据 endpoint 名称(namespace/name) 取出对应 service,如果该 service 没有 topologyKeys 则无需做拓扑转化(非 service group)。

funcgetTopologyKeys(objectMeta*metav1.ObjectMeta)[]string{
if!hasTopologyKey(objectMeta){
returnnil
}

varkeys[]string
keyData:=objectMeta.Annotations[TopologyAnnotationsKey]
iferr:=json.Unmarshal([]byte(keyData),&keys);err!=nil{
klog.Errorf("can'tparsetopologykeys%s,%v",keyData,err)
returnnil
}

returnkeys
}

调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。

//filterConcernedAddressesaimstofilteroutendpointsaddresseswithinthesamenodeunit
funcfilterConcernedAddresses(topologyKeys[]string,hostNamestring,nodes
map[types.NamespacedName]*nodeContainer,
addresses[]v1.EndpointAddress)[]v1.EndpointAddress{
hostNode,found:=nodes[types.NamespacedName{Name:hostName}]
if!found{
returnnil
}

filteredEndpointAddresses:=make([]v1.EndpointAddress,0)
fori:=rangeaddresses{
addr:=addresses[i]
ifnodeName:=addr.NodeName;nodeName!=nil{
epsNode,found:=nodes[types.NamespacedName{Name:*nodeName}]
if!found{
continue
}
ifhasIntersectionLabel(topologyKeys,hostNode.labels,epsNode.labels){
filteredEndpointAddresses=append(filteredEndpointAddresses,addr)
}
}
}

returnfilteredEndpointAddresses
}
funchasIntersectionLabel(keys[]string,n1,n2map[string]string)bool{
ifn1==nil||n2==nil{
returnfalse
}

for_,key:=rangekeys{
val1,v1found:=n1[key]
val2,v2found:=n2[key]

ifv1found&&v2found&&val1==val2{
returntrue
}
}

returnfalse
}
注意:如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service。
回到 rebuildEndpointsMap,在调用 pruneEndpoints 刷新了同一个拓扑域内的 endpoint 后,会将修改后的 endpoints 赋值给 storageCache .endpointsMap [endpoint]. modified (该字段记录了拓扑感知后修改的endpoints)。
func(nh*nodeHandler)add(node*v1.Node){
sc:=nh.cache

sc.mu.Lock()

nodeKey:=types.NamespacedName{Namespace:node.Namespace,Name:node.Name}
klog.Infof("Addingnode%v",nodeKey)
sc.nodesMap[nodeKey]=&nodeContainer{
node:node,
labels:node.Labels,
}
//updateendpoints
changedEps:=sc.rebuildEndpointsMap()

sc.mu.Unlock()

for_,eps:=rangechangedEps{
sc.endpointsChan<-eps
}
}

//rebuildEndpointsMapupdatesallendpointsstoredinstorageCache.endpointsMapdynamicallyandconstructsrelevantmodifiedevents
func(sc*storageCache)rebuildEndpointsMap()[]watch.Event{
evts:=make([]watch.Event,0)
forname,endpointsContainer:=rangesc.endpointsMap{
newEps:=pruneEndpoints(sc.hostName,sc.nodesMap,sc.servicesMap,endpointsContainer.endpoints,sc.wrapperInCluster)
ifapiequality.Semantic.DeepEqual(newEps,endpointsContainer.modified){
continue
}
sc.endpointsMap[name].modified=newEps
evts=append(evts,watch.Event{
Type:watch.Modified,
Object:newEps,
})
}
returnevts
}
另外,如果 endpoints (拓扑感知后修改的 endpoints)发生改变,会构建 watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。

2、ServiceEventHandler

storageCache.servicesMap 结构体 key 为 service 名称(namespace/name),value 为 serviceContainer,包含如下数据:

svc:service对象
keys:service topologyKeys
对于 service 资源的改动,这里用 Update event 说明:
func(sh*serviceHandler)update(service*v1.Service){
sc:=sh.cache

sc.mu.Lock()
serviceKey:=types.NamespacedName{Namespace:service.Namespace,Name:service.Name}
klog.Infof("Updatingservice%v",serviceKey)
newTopologyKeys:=getTopologyKeys(&service.ObjectMeta)
serviceContainer,found:=sc.servicesMap[serviceKey]
if!found{
sc.mu.Unlock()
klog.Errorf("updatenon-existedservice,%v",serviceKey)
return
}

sc.serviceChan<-watch.Event{
Type:watch.Modified,
Object:service,
}

serviceContainer.svc=service
//returndirectlywhentopologyKeysofservicestayunchanged
ifreflect.DeepEqual(serviceContainer.keys,newTopologyKeys){
sc.mu.Unlock()
return
}

serviceContainer.keys=newTopologyKeys

//updateendpoints
changedEps:=sc.rebuildEndpointsMap()
sc.mu.Unlock()

for_,eps:=rangechangedEps{
sc.endpointsChan<-eps
}
}
逻辑如下:
获取 service topologyKeys
构建 service event.Modified event
比较 service topologyKeys 与已经存在的是否有差异
如果有差异则更新 topologyKeys,且调用 rebuildEndpointsMap 刷新该 service 对应的endpoints,如果 endpoints 发生变化,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。

3、EndpointsEventHandler

storageCache.endpointsMap 结构体 key 为 endpoints 名称(namespace/name),value 为 endpointsContainer,包含如下数据:

endpoints:拓扑修改前的 endpoints
modified:拓扑修改后的 endpoints
对于 endpoints 资源的改动,这里用 Update event 说 明:
func(eh*endpointsHandler)update(endpoints*v1.Endpoints){
sc:=eh.cache

sc.mu.Lock()
endpointsKey:=types.NamespacedName{Namespace:endpoints.Namespace,Name:endpoints.Name}
klog.Infof("Updatingendpoints%v",endpointsKey)

endpointsContainer,found:=sc.endpointsMap[endpointsKey]
if!found{
sc.mu.Unlock()
klog.Errorf("Updatingnon-existedendpoints%v",endpointsKey)
return
}
endpointsContainer.endpoints=endpoints
newEps:=pruneEndpoints(sc.hostName,sc.nodesMap,sc.servicesMap,endpoints,sc.wrapperInCluster)
changed:=!apiequality.Semantic.DeepEqual(endpointsContainer.modified,newEps)
ifchanged{
endpointsContainer.modified=newEps
}
sc.mu.Unlock()

ifchanged{
sc.endpointsChan<-watch.Event{
Type:watch.Modified,
Object:newEps,
}
}
}
逻辑如下:
更新 endpointsContainer.endpoint 为新的 endpoints 对象
调用 pruneEndpoints 获取拓扑刷新后的 endpoints
比较 endpointsContainer.modified 与新刷新后的 endpoints
如果有差异则更新 endpointsContainer.modified,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。
在分析完 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler 之后,我们回到具体的 http handler List&Watch 处理逻辑上,这里以 endpoints 为例:
func(s*interceptorServer)interceptEndpointsRequest(handlerhttp.Handler)http.Handler{
returnhttp.HandlerFunc(func(whttp.ResponseWriter,r*http.Request){
ifr.Method!=http.MethodGet||
!strings.HasPrefix(r.URL.Path,"/api/v1/endpoints"){
handler.ServeHTTP(w,r)
return
}

queries:=r.URL.Query()
acceptType:=r.Header.Get("Accept")
info,found:=s.parseAccept(acceptType,s.mediaSerializer)
if!found{
klog.Errorf("can'tfind%sserializer",acceptType)
w.WriteHeader(http.StatusBadRequest)
return
}

encoder:=
scheme.Codecs.EncoderForVersion(info.Serializer,v1.SchemeGroupVersion)
//listrequest
ifqueries.Get("watch")==""{
w.Header().Set("Content-Type",info.MediaType)
allEndpoints:=s.cache.GetEndpoints()
epsItems:=make([]v1.Endpoints,0,len(allEndpoints))
for_,eps:=rangeallEndpoints{
epsItems=append(epsItems,*eps)
}

epsList:=&v1.EndpointsList{
Items:epsItems,
}

err:=encoder.Encode(epsList,w)
iferr!=nil{
klog.Errorf("can'tmarshalendpointslist,%v",err)
w.WriteHeader(http.StatusInternalServerError)
return
}

return
}

//watchrequest
timeoutSecondsStr:=r.URL.Query().Get("timeoutSeconds")
timeout:=time.Minute
iftimeoutSecondsStr!=""{
timeout,_=time.ParseDuration(fmt.Sprintf("%ss",timeoutSecondsStr))
}

timer:=time.NewTimer(timeout)
defertimer.Stop()

flusher,ok:=w.(http.Flusher)
if!ok{
klog.Errorf("unabletostartwatch-can'tgethttp.Flusher:%#v",w)
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

e:=restclientwatch.NewEncoder(
streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w),
scheme.Codecs.EncoderForVersion(info.StreamSerializer,v1.SchemeGroupVersion)),
encoder)
ifinfo.MediaType==runtime.ContentTypeProtobuf{
w.Header().Set("Content-Type",
runtime.ContentTypeProtobuf+";stream=watch")
}else{
w.Header().Set("Content-Type",runtime.ContentTypeJSON)
}
w.Header().Set("Transfer-Encoding","chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
for{
select{
case<-r.Context().Done():
return
case<-timer.C:
return
caseevt:=<-s.endpointsWatchCh:
klog.V(4).Infof("Sendendpointwatchevent:%+#v",evt)
err:=e.Encode(&evt)
iferr!=nil{
klog.Errorf("can'tencodewatchevent,%v",err)
return
}

iflen(s.endpointsWatchCh)==0{
flusher.Flush()
}
}
}
})
}

逻辑如下:

如果为 List请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返回

func(sc*storageCache)GetEndpoints()[]*v1.Endpoints{
sc.mu.RLock()
defersc.mu.RUnlock()

epList:=make([]*v1.Endpoints,0,
len(sc.endpointsMap))
for_,v:=rangesc.endpointsMap{
epList=append(epList,v.modified)
}
returnepList
}

如果为 Watch 请求,则不断从 storageCache.endpointsWatchCh 管道中接受 watch event,并返回interceptServiceRequest 逻辑与 interceptEndpointsRequest 一致,这里不再赘述 。

总结

SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问
service group 实现的拓扑感知和 Kubernetes 社区原生实现对比,有如下区别:
service group 拓扑 key 可以自定义,也即为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"
service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint的访问
ServiceGrid Controller 负责根据 ServiceGrid 产生对应的 service(包含由serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations),逻辑和 DeploymentGrid Controller 整体一致,如下:
创建并维护 service group 需要的若干CRDs(包括:ServiceGrid)
监听 ServiceGrid event,并填充 ServiceGrid到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创建并且维护对应的 service
监听 service event,并将相关的 ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑
为了实现 Kubernetes 零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,调用链路如下: kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
application-grid-wrapper 是一个 http server,接受来自 kube-proxy 的请求,同时维护一个资源缓存,处理函数由外到内依次如下:
debug:接受 debug 请求,返回 wrapper pprof 运行信息
logger:打印请求日志
node:接受 kube-proxy node GET (/api/v1/nodes/{node}) 请求,并返回 node 信息
event:接受 kube-proxy events POST (/events) 请求,并将请求转发给 lite-apiserver
service:接受 kube-proxy service List&Watch (/api/v1/services) 请求,并根据 storageCache 内容返回 (GetServices)
endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints) 请求,并根据 storageCache 内容返回(GetEndpoints)
wrapper 为了实现拓扑感知,维护了一个资源 cache,包括:node,service,endpoint,同时注册了相关 event 处理函数。核心拓扑算法逻辑为:调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。另外,如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该service
wrapper 接受来自 kube-proxy 对 endpoints 以及 service 的 List&Watch 请求,以endpoints 为例:如果为List 请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返回;如果为 Watch 请求,则不断从storageCache.endpointsWatchCh 管道中接受 watch event,并返回。service 逻辑与 endpoints 一致

关于“如何分析SuperEdge 拓扑算法”就介绍到这了,更多相关内容可以搜索恒创以前的文章,希望能够帮助大家答疑解惑,请多多支持恒创网站!

上一篇: web开发中基数排序是什么意思 下一篇: 手机怎么远程登录云服务器?