381 lines
12 KiB
Go
381 lines
12 KiB
Go
/*
|
|
Copyright 2022 DigitalOcean
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package kube
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
|
|
csitypes "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
|
|
csitypesbeta "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1beta1"
|
|
csi "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
|
|
"golang.org/x/sync/errgroup"
|
|
arv1 "k8s.io/api/admissionregistration/v1"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
v1 "k8s.io/api/core/v1"
|
|
st "k8s.io/api/storage/v1"
|
|
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
|
|
// Load client-go authentication plugins
|
|
_ "k8s.io/client-go/plugin/pkg/client/auth"
|
|
)
|
|
|
|
//Identifier is used to identify a specific namspace scoped object.
|
|
type Identifier struct {
|
|
Name string
|
|
Namespace string
|
|
}
|
|
|
|
// Objects encapsulates all the objects from a Kubernetes cluster.
|
|
type Objects struct {
|
|
Nodes *corev1.NodeList
|
|
PersistentVolumes *corev1.PersistentVolumeList
|
|
SystemNamespace *corev1.Namespace
|
|
Pods *corev1.PodList
|
|
PodTemplates *corev1.PodTemplateList
|
|
PersistentVolumeClaims *corev1.PersistentVolumeClaimList
|
|
ConfigMaps *corev1.ConfigMapList
|
|
Services *corev1.ServiceList
|
|
Secrets *corev1.SecretList
|
|
ServiceAccounts *corev1.ServiceAccountList
|
|
ResourceQuotas *corev1.ResourceQuotaList
|
|
LimitRanges *corev1.LimitRangeList
|
|
VolumeSnapshotsV1 *csitypes.VolumeSnapshotList
|
|
VolumeSnapshotsV1Content *csitypes.VolumeSnapshotContentList
|
|
VolumeSnapshotsBeta *csitypesbeta.VolumeSnapshotList
|
|
VolumeSnapshotsBetaContent *csitypesbeta.VolumeSnapshotContentList
|
|
StorageClasses *st.StorageClassList
|
|
DefaultStorageClass *st.StorageClass
|
|
MutatingWebhookConfigurations *arv1.MutatingWebhookConfigurationList
|
|
ValidatingWebhookConfigurations *arv1.ValidatingWebhookConfigurationList
|
|
Namespaces *corev1.NamespaceList
|
|
CronJobs *batchv1.CronJobList
|
|
}
|
|
|
|
// Client encapsulates a client for a Kubernetes cluster.
|
|
type Client struct {
|
|
KubeClient kubernetes.Interface
|
|
CSIClient csi.Interface
|
|
httpClient *http.Client
|
|
}
|
|
|
|
func (c *Client) Close() {
|
|
if c.httpClient != nil {
|
|
utilnet.CloseIdleConnectionsFor(c.httpClient.Transport)
|
|
}
|
|
}
|
|
|
|
// FetchObjects returns the objects from a Kubernetes cluster.
|
|
// ctx is currently unused during API calls. More info: https://github.com/kubernetes/community/pull/1166
|
|
func (c *Client) FetchObjects(ctx context.Context, filter ObjectFilter) (*Objects, error) {
|
|
client := c.KubeClient.CoreV1()
|
|
admissionControllerClient := c.KubeClient.AdmissionregistrationV1()
|
|
batchClient := c.KubeClient.BatchV1()
|
|
storageClient := c.KubeClient.StorageV1()
|
|
csiClient := c.CSIClient.SnapshotV1()
|
|
csiBetaClient := c.CSIClient.SnapshotV1beta1()
|
|
opts := metav1.ListOptions{}
|
|
objects := &Objects{}
|
|
|
|
g, gCtx := errgroup.WithContext(ctx)
|
|
g.Go(func() (err error) {
|
|
objects.Nodes, err = client.Nodes().List(gCtx, opts)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.StorageClasses, err = storageClient.StorageClasses().List(gCtx, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, s := range objects.StorageClasses.Items {
|
|
if v, _ := s.Annotations["storageclass.kubernetes.io/is-default-class"]; v == "true" {
|
|
defaultStorageClass := s
|
|
objects.DefaultStorageClass = &defaultStorageClass
|
|
}
|
|
}
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.PersistentVolumes, err = client.PersistentVolumes().List(gCtx, opts)
|
|
err = annotateFetchError("PersistentVolumes", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.Pods, err = client.Pods(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("Pods", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.PodTemplates, err = client.PodTemplates(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("PodTemplates", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.PersistentVolumeClaims, err = client.PersistentVolumeClaims(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("PersistentVolumeClaims", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.ConfigMaps, err = client.ConfigMaps(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("ConfigMaps", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.Secrets, err = client.Secrets(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("Secrets", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.Services, err = client.Services(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("Services", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.ServiceAccounts, err = client.ServiceAccounts(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("ServiceAccounts", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.ResourceQuotas, err = client.ResourceQuotas(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("ResourceQuotas", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.LimitRanges, err = client.LimitRanges(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("LimitRanges", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.SystemNamespace, err = client.Namespaces().Get(gCtx, metav1.NamespaceSystem, metav1.GetOptions{})
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to fetch namespace %q: %s", metav1.NamespaceSystem, err)
|
|
}
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.MutatingWebhookConfigurations, err = admissionControllerClient.MutatingWebhookConfigurations().List(gCtx, opts)
|
|
err = annotateFetchError("MutatingWebhookConfigurations (v1)", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.ValidatingWebhookConfigurations, err = admissionControllerClient.ValidatingWebhookConfigurations().List(gCtx, opts)
|
|
err = annotateFetchError("ValidatingWebhookConfigurations (v1)", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.Namespaces, err = client.Namespaces().List(gCtx, opts)
|
|
err = annotateFetchError("Namespaces", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.CronJobs, err = batchClient.CronJobs(corev1.NamespaceAll).List(gCtx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("CronJobs", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.VolumeSnapshotsV1, err = csiClient.VolumeSnapshots(corev1.NamespaceAll).List(ctx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("VolumeSnapshotsV1", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.VolumeSnapshotsV1Content, err = csiClient.VolumeSnapshotContents().List(ctx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("VolumeSnapshotsV1Contents", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.VolumeSnapshotsBeta, err = csiBetaClient.VolumeSnapshots(corev1.NamespaceAll).List(ctx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("VolumeSnapshotsBeta", err)
|
|
return
|
|
})
|
|
g.Go(func() (err error) {
|
|
objects.VolumeSnapshotsBetaContent, err = csiBetaClient.VolumeSnapshotContents().List(ctx, filter.NamespaceOptions(opts))
|
|
err = annotateFetchError("VolumeSnapshotsBetaContents", err)
|
|
return
|
|
})
|
|
err := g.Wait()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return objectsWithoutNils(objects), nil
|
|
}
|
|
|
|
func annotateFetchError(kind string, err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if kerrors.IsNotFound(err) {
|
|
// Resource doesn't exist in this cluster's version, so there aren't any
|
|
// objects to list and check.
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("failed to fetch %s: %s", kind, err)
|
|
}
|
|
|
|
func objectsWithoutNils(objects *Objects) *Objects {
|
|
if objects.Nodes == nil {
|
|
objects.Nodes = &v1.NodeList{}
|
|
}
|
|
if objects.PersistentVolumes == nil {
|
|
objects.PersistentVolumes = &v1.PersistentVolumeList{}
|
|
}
|
|
if objects.Pods == nil {
|
|
objects.Pods = &v1.PodList{}
|
|
}
|
|
if objects.PodTemplates == nil {
|
|
objects.PodTemplates = &v1.PodTemplateList{}
|
|
}
|
|
if objects.PersistentVolumeClaims == nil {
|
|
objects.PersistentVolumeClaims = &v1.PersistentVolumeClaimList{}
|
|
}
|
|
if objects.ConfigMaps == nil {
|
|
objects.ConfigMaps = &v1.ConfigMapList{}
|
|
}
|
|
if objects.Services == nil {
|
|
objects.Services = &v1.ServiceList{}
|
|
}
|
|
if objects.Secrets == nil {
|
|
objects.Secrets = &v1.SecretList{}
|
|
}
|
|
if objects.ServiceAccounts == nil {
|
|
objects.ServiceAccounts = &v1.ServiceAccountList{}
|
|
}
|
|
if objects.ResourceQuotas == nil {
|
|
objects.ResourceQuotas = &v1.ResourceQuotaList{}
|
|
}
|
|
if objects.LimitRanges == nil {
|
|
objects.LimitRanges = &v1.LimitRangeList{}
|
|
}
|
|
if objects.StorageClasses == nil {
|
|
objects.StorageClasses = &st.StorageClassList{}
|
|
}
|
|
if objects.MutatingWebhookConfigurations == nil {
|
|
objects.MutatingWebhookConfigurations = &arv1.MutatingWebhookConfigurationList{}
|
|
}
|
|
if objects.ValidatingWebhookConfigurations == nil {
|
|
objects.ValidatingWebhookConfigurations = &arv1.ValidatingWebhookConfigurationList{}
|
|
}
|
|
if objects.Namespaces == nil {
|
|
objects.Namespaces = &v1.NamespaceList{}
|
|
}
|
|
if objects.CronJobs == nil {
|
|
objects.CronJobs = &batchv1.CronJobList{}
|
|
}
|
|
if objects.VolumeSnapshotsV1 == nil {
|
|
objects.VolumeSnapshotsV1 = &csitypes.VolumeSnapshotList{}
|
|
}
|
|
if objects.VolumeSnapshotsV1Content == nil {
|
|
objects.VolumeSnapshotsV1Content = &csitypes.VolumeSnapshotContentList{}
|
|
}
|
|
if objects.VolumeSnapshotsBeta == nil {
|
|
objects.VolumeSnapshotsBeta = &csitypesbeta.VolumeSnapshotList{}
|
|
}
|
|
if objects.VolumeSnapshotsBetaContent == nil {
|
|
objects.VolumeSnapshotsBetaContent = &csitypesbeta.VolumeSnapshotContentList{}
|
|
}
|
|
return objects
|
|
}
|
|
|
|
func NewClientConfigFromKubeConfig(opts *options) (*rest.Config, error) {
|
|
var err error
|
|
config := &rest.Config{}
|
|
|
|
if opts.yaml != nil {
|
|
config, err = clientcmd.RESTConfigFromKubeConfig(opts.yaml)
|
|
} else {
|
|
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
|
|
if len(opts.paths) != 0 {
|
|
loadingRules.Precedence = opts.paths
|
|
}
|
|
configOverrides := &clientcmd.ConfigOverrides{}
|
|
if opts.kubeContext != "" {
|
|
configOverrides.CurrentContext = opts.kubeContext
|
|
}
|
|
config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides).ClientConfig()
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
// NewClient builds a kubernetes client to interact with the live cluster.
|
|
// The kube config file path or the kubeconfig yaml must be specified
|
|
// If not specified, defaults are assumed - configPath: ~/.kube/config, configContext: current context
|
|
func NewClient(opts ...Option) (*Client, error) {
|
|
defOpts := &options{}
|
|
|
|
for _, opt := range opts {
|
|
if err := opt(defOpts); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var config *rest.Config
|
|
var err error
|
|
|
|
err = defOpts.validate()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if defOpts.inCluster {
|
|
config, err = rest.InClusterConfig()
|
|
} else {
|
|
config, err = NewClientConfigFromKubeConfig(defOpts)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config.Timeout = defOpts.timeout
|
|
if defOpts.transportWrapper != nil {
|
|
config.Wrap(defOpts.transportWrapper)
|
|
}
|
|
|
|
// disable transport caching
|
|
// see: https://github.com/kubernetes/kubernetes/issues/109289
|
|
config.Proxy = http.ProxyFromEnvironment
|
|
httpClient, err := rest.HTTPClientFor(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
client, err := kubernetes.NewForConfigAndClient(config, httpClient)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
csiClient, err := csi.NewForConfig(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Client{
|
|
KubeClient: client,
|
|
CSIClient: csiClient,
|
|
httpClient: httpClient,
|
|
}, nil
|
|
}
|