This commit is contained in:
Harivansh Rathi 2026-02-07 13:49:11 -05:00
commit 0595d93c49
28 changed files with 1763 additions and 0 deletions

92
internal/kube/apply.go Normal file
View file

@ -0,0 +1,92 @@
package kube
import (
"bytes"
"context"
"fmt"
"io"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/restmapper"
yamlserializer "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
)
// ServerSideApply splits a multi-document YAML into individual resources
// and applies each one using server-side apply with the "agentikube" field manager.
func (c *Client) ServerSideApply(ctx context.Context, manifests []byte) error {
decoder := k8syaml.NewYAMLOrJSONDecoder(bytes.NewReader(manifests), 4096)
discoveryClient, ok := c.Clientset().Discovery().(*discovery.DiscoveryClient)
if !ok {
return fmt.Errorf("failed to get discovery client")
}
cachedDiscovery := memory.NewMemCacheClient(discoveryClient)
mapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscovery)
deserializer := yamlserializer.NewDecodingSerializer(unstructured.UnstructuredJSONScheme)
for {
var rawObj unstructured.Unstructured
if err := decoder.Decode(&rawObj); err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("decoding YAML document: %w", err)
}
// Skip empty documents
if len(rawObj.Object) == 0 {
continue
}
// Re-encode to JSON for the patch body
rawJSON, err := rawObj.MarshalJSON()
if err != nil {
return fmt.Errorf("marshaling to JSON: %w", err)
}
// Decode to get the GVK
obj := &unstructured.Unstructured{}
_, gvk, err := deserializer.Decode(rawJSON, nil, obj)
if err != nil {
return fmt.Errorf("deserializing object: %w", err)
}
// Map GVK to GVR using the REST mapper
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return fmt.Errorf("mapping GVK %s to GVR: %w", gvk.String(), err)
}
gvr := mapping.Resource
name := obj.GetName()
namespace := obj.GetNamespace()
applyOpts := metav1.ApplyOptions{
FieldManager: "agentikube",
}
// Apply using the dynamic client - handle namespaced vs cluster-scoped
if namespace != "" {
_, err = c.Dynamic().Resource(gvr).Namespace(namespace).Patch(
ctx, name, types.ApplyPatchType, rawJSON, applyOpts.ToPatchOptions(),
)
} else {
_, err = c.Dynamic().Resource(gvr).Patch(
ctx, name, types.ApplyPatchType, rawJSON, applyOpts.ToPatchOptions(),
)
}
if err != nil {
return fmt.Errorf("applying %s/%s: %w", gvk.Kind, name, err)
}
fmt.Printf("applied %s/%s\n", gvk.Kind, name)
}
return nil
}

76
internal/kube/client.go Normal file
View file

@ -0,0 +1,76 @@
package kube
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// Client wraps the Kubernetes dynamic client, typed clientset, and REST config.
type Client struct {
dynamic dynamic.Interface
clientset kubernetes.Interface
restConfig *rest.Config
}
func (c *Client) Dynamic() dynamic.Interface { return c.dynamic }
func (c *Client) Clientset() kubernetes.Interface { return c.clientset }
func (c *Client) RestConfig() *rest.Config { return c.restConfig }
// NewClient creates a Kubernetes client using the default kubeconfig loading
// rules (KUBECONFIG env var or ~/.kube/config).
func NewClient() (*Client, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{}
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
restConfig, err := kubeConfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("loading kubeconfig: %w", err)
}
dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("creating dynamic client: %w", err)
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("creating clientset: %w", err)
}
return &Client{
dynamic: dynamicClient,
clientset: clientset,
restConfig: restConfig,
}, nil
}
// EnsureNamespace creates the namespace if it does not already exist.
func (c *Client) EnsureNamespace(ctx context.Context, name string) error {
_, err := c.clientset.CoreV1().Namespaces().Get(ctx, name, metav1.GetOptions{})
if err == nil {
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("checking namespace %q: %w", name, err)
}
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
_, err = c.clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("creating namespace %q: %w", name, err)
}
return nil
}

24
internal/kube/exec.go Normal file
View file

@ -0,0 +1,24 @@
package kube
import (
"os"
"os/exec"
)
// Exec runs kubectl exec to attach an interactive terminal to the specified
// pod. If command is empty, it defaults to /bin/sh.
func Exec(namespace, podName string, command []string) error {
if len(command) == 0 {
command = []string{"/bin/sh"}
}
args := []string{"exec", "-it", "-n", namespace, podName, "--"}
args = append(args, command...)
cmd := exec.Command("kubectl", args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}

81
internal/kube/wait.go Normal file
View file

@ -0,0 +1,81 @@
package kube
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
)
// WaitForReady watches a resource in the agentsandbox.dev/v1 group until its
// Ready condition becomes True or the context is cancelled/times out.
// The resource parameter is the plural resource name (e.g. "sandboxclaims", "sandboxwarmpools").
func (c *Client) WaitForReady(ctx context.Context, namespace, resource, name string) error {
gvr := schema.GroupVersionResource{
Group: "agentsandbox.dev",
Version: "v1",
Resource: resource,
}
watcher, err := c.Dynamic().Resource(gvr).Namespace(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
})
if err != nil {
return fmt.Errorf("watching %s %s/%s: %w", resource, namespace, name, err)
}
defer watcher.Stop()
for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out waiting for %s %s/%s to become ready", resource, namespace, name)
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("watch channel closed for %s %s/%s", resource, namespace, name)
}
if event.Type == watch.Error {
return fmt.Errorf("watch error for %s %s/%s", resource, namespace, name)
}
obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
continue
}
if isReady(obj) {
return nil
}
}
}
}
// isReady checks whether an unstructured object has a condition with
// type=Ready and status=True.
func isReady(obj *unstructured.Unstructured) bool {
status, found, err := unstructured.NestedMap(obj.Object, "status")
if err != nil || !found {
return false
}
conditionsRaw, found, err := unstructured.NestedSlice(status, "conditions")
if err != nil || !found {
return false
}
for _, c := range conditionsRaw {
condition, ok := c.(map[string]interface{})
if !ok {
continue
}
condType, _ := condition["type"].(string)
condStatus, _ := condition["status"].(string)
if condType == "Ready" && condStatus == "True" {
return true
}
}
return false
}