Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 54 additions & 91 deletions pkg/multiproject/neg/informerset/informerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,28 @@ import (
"fmt"

networkclient "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned"
informernetwork "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network/v1"
networkinformers "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions"
nodetopologyclient "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned"
informernodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology/v1"
nodetopologyinformers "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
informerv1 "k8s.io/client-go/informers/core/v1"
discoveryinformer "k8s.io/client-go/informers/discovery/v1"
informernetworking "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/multiproject/common/filteredinformer"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
informersvcneg "k8s.io/ingress-gce/pkg/svcneg/client/informers/externalversions/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/utils"
svcneginformers "k8s.io/ingress-gce/pkg/svcneg/client/informers/externalversions"
"k8s.io/ingress-gce/pkg/utils/endpointslices"
"k8s.io/klog/v2"
)

// InformerSet manages all shared informers used by multiproject controllers.
// It provides centralized initialization and lifecycle management.
type InformerSet struct {
kubeFactory informers.SharedInformerFactory
svcNegFactory svcneginformers.SharedInformerFactory
networkFactory networkinformers.SharedInformerFactory
nodetopologyFactory nodetopologyinformers.SharedInformerFactory

// Core Kubernetes informers (always present)
Ingress cache.SharedIndexInformer
Service cache.SharedIndexInformer
Expand Down Expand Up @@ -51,80 +53,48 @@ func NewInformerSet(
nodeTopologyClient nodetopologyclient.Interface,
resyncPeriod metav1.Duration,
) *InformerSet {
informers := &InformerSet{}

// Create core Kubernetes informers
informers.Ingress = informernetworking.NewIngressInformer(
kubeClient,
metav1.NamespaceAll,
resyncPeriod.Duration,
utils.NewNamespaceIndexer(),
)

informers.Service = informerv1.NewServiceInformer(
kubeClient,
metav1.NamespaceAll,
resyncPeriod.Duration,
utils.NewNamespaceIndexer(),
)
infSet := &InformerSet{}

informers.Pod = informerv1.NewPodInformer(
kubeClient,
metav1.NamespaceAll,
resyncPeriod.Duration,
utils.NewNamespaceIndexer(),
)
infSet.kubeFactory = informers.NewSharedInformerFactory(kubeClient, resyncPeriod.Duration)
if svcNegClient != nil {
infSet.svcNegFactory = svcneginformers.NewSharedInformerFactory(svcNegClient, resyncPeriod.Duration)
}
if networkClient != nil {
infSet.networkFactory = networkinformers.NewSharedInformerFactory(networkClient, resyncPeriod.Duration)
}
if nodeTopologyClient != nil {
infSet.nodetopologyFactory = nodetopologyinformers.NewSharedInformerFactory(nodeTopologyClient, resyncPeriod.Duration)
}

informers.Node = informerv1.NewNodeInformer(
kubeClient,
resyncPeriod.Duration,
utils.NewNamespaceIndexer(),
)
// Create core Kubernetes informers from factory
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for all of the informers we create, set the namespace indexer on them

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are already set because we are using SharedInformerFactory.

infSet.Ingress = infSet.kubeFactory.Networking().V1().Ingresses().Informer()
infSet.Service = infSet.kubeFactory.Core().V1().Services().Informer()
infSet.Pod = infSet.kubeFactory.Core().V1().Pods().Informer()
infSet.Node = infSet.kubeFactory.Core().V1().Nodes().Informer()

// EndpointSlice informer with custom indexers for NEG controller
informers.EndpointSlice = discoveryinformer.NewEndpointSliceInformer(
kubeClient,
metav1.NamespaceAll,
resyncPeriod.Duration,
cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc,
},
)

// Create CRD informers if clients are available
if svcNegClient != nil {
informers.SvcNeg = informersvcneg.NewServiceNetworkEndpointGroupInformer(
svcNegClient,
metav1.NamespaceAll,
resyncPeriod.Duration,
utils.NewNamespaceIndexer(),
)
endpointSliceInformer := infSet.kubeFactory.Discovery().V1().EndpointSlices().Informer()
if err := endpointSliceInformer.AddIndexers(cache.Indexers{
endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc,
}); err != nil {
klog.Fatalf("failed to add indexers to endpointSlice informer: %v", err)
}
infSet.EndpointSlice = endpointSliceInformer

if networkClient != nil {
informers.Network = informernetwork.NewNetworkInformer(
networkClient,
resyncPeriod.Duration,
utils.NewNamespaceIndexer(),
)

informers.GkeNetworkParams = informernetwork.NewGKENetworkParamSetInformer(
networkClient,
resyncPeriod.Duration,
utils.NewNamespaceIndexer(),
)
// Create CRD informers if factories are available
if infSet.svcNegFactory != nil {
infSet.SvcNeg = infSet.svcNegFactory.Networking().V1beta1().ServiceNetworkEndpointGroups().Informer()
}

if nodeTopologyClient != nil {
informers.NodeTopology = informernodetopology.NewNodeTopologyInformer(
nodeTopologyClient,
resyncPeriod.Duration,
utils.NewNamespaceIndexer(),
)
if infSet.networkFactory != nil {
infSet.Network = infSet.networkFactory.Networking().V1().Networks().Informer()
infSet.GkeNetworkParams = infSet.networkFactory.Networking().V1().GKENetworkParamSets().Informer()
}

return informers
if infSet.nodetopologyFactory != nil {
infSet.NodeTopology = infSet.nodetopologyFactory.Networking().V1().NodeTopologies().Informer()
}
return infSet
}

// Start starts all informers and waits for their caches to sync.
Expand All @@ -147,18 +117,19 @@ func (i *InformerSet) Start(stopCh <-chan struct{}, logger klog.Logger) error {
default:
}

// Start all core informers
startInformer(i.Ingress, stopCh)
startInformer(i.Service, stopCh)
startInformer(i.Pod, stopCh)
startInformer(i.Node, stopCh)
startInformer(i.EndpointSlice, stopCh)

// Start optional informers
startInformer(i.SvcNeg, stopCh)
startInformer(i.Network, stopCh)
startInformer(i.GkeNetworkParams, stopCh)
startInformer(i.NodeTopology, stopCh)
// Start factories
if i.kubeFactory != nil {
i.kubeFactory.Start(stopCh)
}
if i.svcNegFactory != nil {
i.svcNegFactory.Start(stopCh)
}
if i.networkFactory != nil {
i.networkFactory.Start(stopCh)
}
if i.nodetopologyFactory != nil {
i.nodetopologyFactory.Start(stopCh)
}

i.started = true

Expand Down Expand Up @@ -271,11 +242,3 @@ func (i *InformerSet) hasSyncedFuncs() []func() bool {

return funcs
}

// startInformer starts the informer if it is non-nil.
func startInformer(inf cache.SharedIndexInformer, stopCh <-chan struct{}) {
if inf == nil {
return
}
go inf.Run(stopCh)
}
Loading