'Watch CustomResourceDefinitions (CRD) with client-go

I have added a new CRD ApiGateway to Kubernetes and I want to watch for new/changed resources of it.

This works with a simple Rest Client as shown in the example below.

But I´d like to watch for these resources with k8s.io/client-go/kubernetes.

While it is simple to get the standard resources like in the client-go example below , I don´t get anything working for CRDs. Is it possible to get that done with client-go?

client-go example for standard resources

import (
    ....
    "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

func handleNewServices(clientset *kubernetes.Clientset) {
    for {
        serviceStreamWatcher, err := clientset.CoreV1().Services("").Watch(metav1.ListOptions{})
        if err != nil {
            panic(err.Error())
        }
        //fmt.Printf("%T\n", serviceStreamWatcher)
        for {
            select {
            case event := <-serviceStreamWatcher.ResultChan():
            service := event.Object.(*v1.Service)

            for key, value := range service.Labels {
                fmt.Printf("Key, VAlue: %s %s\n", key, value)
            }
...

RestClient (working fine)

package main

import (
    "net/http"
    ....

)

func main() {
    for {
        // Url "cw.com" must match the config spec.group in api-gateway-crd.yaml
        // URL "apigateways" must match the config spec.names.plural in api-gateway-crd.yaml
        resp, err := http.Get("http://localhost:8001/apis/cw.com/v1/apigateways?watch=true")
        if err != nil {
            panic(err)
        }
        defer resp.Body.Close()
        decoder := json.NewDecoder(resp.Body)
        for {
            var event v1.ApiGatewayWatchEvent
            if err := decoder.Decode(&event); err == io.EOF {
                break
            } else if err != nil {
                log.Fatal(err)
            }
            log.Printf("Received watch event: %s: %s: \n", event.Type, event.Object.Metadata.Name)

        }
    }

}

CRD

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: apigateways.cw.com
spec:
  scope: Namespaced
  group: cw.com
  version: v1
  names:
    kind: ApiGateway
    singular: apigateway
    plural: apigateways


Solution 1:[1]

If you think about it, client-go knows about deployments, services, pods etc resources. But it doesn't recognize your CRD ApiGateway.

So, client-go can't be used as a client for your custom made resources (wait-for-it), unless you made them recognizable to client-go!

How?!

You have to generate your own client for the CRDs. Kubernetes already have the tools to auto-generate the clients, all you need to specify the structs of API. This is known as code-generation.

Here is a blog post about code generation by STEFAN SCHIMANSKI (who is one of the top contributors to kubernetes).

Example Controller

Here is a sample-controller example given by kubernetes itself. The pkg folder contains all the APIS and Client. The main.go and controller.go contains the sample code to watch for the CRD and do some task accordingly.

!!Update!!

It's easier now to generate client configs and controllers with kubebuilder (github repo), which is maintained by kubernetes-sigs.

Solution 2:[2]

Using dynamic package of client-go may be a good choice to operate CRD.

Basically watch example:

cliSet, err := dynamic.NewForConfig(&rest.Config{})
if err != nil {
    return err
}

cliSet.Resource(schema.GroupVersionResource{
    // replace it with your CRD's corresponding property
    Group:    CRDGroup,
    Version:  CRDVersion,
    Resource: CRDResourceName,
}).Watch(context.Background(), metav1.ListOptions{})

Advanced watch, informer example:

cliSet, err := dynamic.NewForConfig(&rest.Config{})
if err != nil {
    return err
}

fac := dynamicinformer.NewFilteredDynamicSharedInformerFactory(cliSet, 0, metav1.NamespaceAll, nil)
informer := fac.ForResource(schema.GroupVersionResource{
    // replace it with your CRD's corresponding property 
    Group:    CRDGroup,
    Version:  CRDVersion,
    Resource: CRDResourceName,
}).Informer()

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        // converting the dynamic object to your CRD struct
        typedObj := obj.(*unstructured.Unstructured)
        bytes, _ := typedObj.MarshalJSON()
        
        var crdObj *crd.CRD
        json.Unmarshal(bytes, &crdObj)
    },
    UpdateFunc: func(oldObj, newObj interface{}) {
        
    },
})

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 AnonymousX