mirror of
https://github.com/getcompanion-ai/computer-host.git
synced 2026-04-15 03:00:42 +00:00
227 lines
7 KiB
Go
227 lines
7 KiB
Go
package daemon
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/getcompanion-ai/computer-host/internal/firecracker"
|
|
"github.com/getcompanion-ai/computer-host/internal/model"
|
|
"github.com/getcompanion-ai/computer-host/internal/store"
|
|
contracthost "github.com/getcompanion-ai/computer-host/contract"
|
|
)
|
|
|
|
func (d *Daemon) CreateMachine(ctx context.Context, req contracthost.CreateMachineRequest) (*contracthost.CreateMachineResponse, error) {
|
|
if err := validateMachineID(req.MachineID); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := validateArtifactRef(req.Artifact); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
unlock := d.lockMachine(req.MachineID)
|
|
defer unlock()
|
|
|
|
if _, err := d.store.GetMachine(ctx, req.MachineID); err == nil {
|
|
return nil, fmt.Errorf("machine %q already exists", req.MachineID)
|
|
} else if err != nil && err != store.ErrNotFound {
|
|
return nil, err
|
|
}
|
|
|
|
if err := d.store.UpsertOperation(ctx, model.OperationRecord{
|
|
MachineID: req.MachineID,
|
|
Type: model.MachineOperationCreate,
|
|
StartedAt: time.Now().UTC(),
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clearOperation := false
|
|
defer func() {
|
|
if clearOperation {
|
|
_ = d.store.DeleteOperation(context.Background(), req.MachineID)
|
|
}
|
|
}()
|
|
|
|
artifact, err := d.ensureArtifact(ctx, req.Artifact)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
userVolumes, err := d.loadAttachableUserVolumes(ctx, req.MachineID, req.UserVolumeIDs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
systemVolumePath := d.systemVolumePath(req.MachineID)
|
|
if err := os.MkdirAll(filepath.Dir(systemVolumePath), 0o755); err != nil {
|
|
return nil, fmt.Errorf("create system volume dir for %q: %w", req.MachineID, err)
|
|
}
|
|
if err := cloneFile(artifact.RootFSPath, systemVolumePath); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
spec, err := d.buildMachineSpec(req.MachineID, artifact, userVolumes, systemVolumePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
usedNetworks, err := d.listRunningNetworks(ctx, req.MachineID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
state, err := d.runtime.Boot(ctx, spec, usedNetworks)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ports := defaultMachinePorts()
|
|
if err := waitForGuestReady(ctx, state.RuntimeHost, ports); err != nil {
|
|
_ = d.runtime.Delete(context.Background(), *state)
|
|
return nil, err
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
systemVolumeRecord := model.VolumeRecord{
|
|
ID: d.systemVolumeID(req.MachineID),
|
|
Kind: contracthost.VolumeKindSystem,
|
|
AttachedMachineID: machineIDPtr(req.MachineID),
|
|
SourceArtifact: &req.Artifact,
|
|
Pool: model.StoragePoolMachineDisks,
|
|
Path: systemVolumePath,
|
|
CreatedAt: now,
|
|
}
|
|
if err := d.store.CreateVolume(ctx, systemVolumeRecord); err != nil {
|
|
_ = d.runtime.Delete(context.Background(), *state)
|
|
return nil, err
|
|
}
|
|
|
|
attachedUserVolumeIDs := make([]contracthost.VolumeID, 0, len(userVolumes))
|
|
for _, volume := range userVolumes {
|
|
volume.AttachedMachineID = machineIDPtr(req.MachineID)
|
|
if err := d.store.UpdateVolume(ctx, volume); err != nil {
|
|
for _, attachedVolumeID := range attachedUserVolumeIDs {
|
|
attachedVolume, getErr := d.store.GetVolume(context.Background(), attachedVolumeID)
|
|
if getErr == nil {
|
|
attachedVolume.AttachedMachineID = nil
|
|
_ = d.store.UpdateVolume(context.Background(), *attachedVolume)
|
|
}
|
|
}
|
|
_ = d.store.DeleteVolume(context.Background(), systemVolumeRecord.ID)
|
|
_ = d.runtime.Delete(context.Background(), *state)
|
|
return nil, err
|
|
}
|
|
attachedUserVolumeIDs = append(attachedUserVolumeIDs, volume.ID)
|
|
}
|
|
|
|
record := model.MachineRecord{
|
|
ID: req.MachineID,
|
|
Artifact: req.Artifact,
|
|
SystemVolumeID: systemVolumeRecord.ID,
|
|
UserVolumeIDs: append([]contracthost.VolumeID(nil), attachedUserVolumeIDs...),
|
|
RuntimeHost: state.RuntimeHost,
|
|
TapDevice: state.TapName,
|
|
Ports: ports,
|
|
Phase: contracthost.MachinePhaseRunning,
|
|
PID: state.PID,
|
|
SocketPath: state.SocketPath,
|
|
CreatedAt: now,
|
|
StartedAt: state.StartedAt,
|
|
}
|
|
if err := d.store.CreateMachine(ctx, record); err != nil {
|
|
for _, volume := range userVolumes {
|
|
volume.AttachedMachineID = nil
|
|
_ = d.store.UpdateVolume(context.Background(), volume)
|
|
}
|
|
_ = d.store.DeleteVolume(context.Background(), systemVolumeRecord.ID)
|
|
_ = d.runtime.Delete(context.Background(), *state)
|
|
return nil, err
|
|
}
|
|
|
|
clearOperation = true
|
|
return &contracthost.CreateMachineResponse{Machine: machineToContract(record)}, nil
|
|
}
|
|
|
|
func (d *Daemon) buildMachineSpec(machineID contracthost.MachineID, artifact *model.ArtifactRecord, userVolumes []model.VolumeRecord, systemVolumePath string) (firecracker.MachineSpec, error) {
|
|
drives := make([]firecracker.DriveSpec, 0, len(userVolumes))
|
|
for i, volume := range userVolumes {
|
|
drives = append(drives, firecracker.DriveSpec{
|
|
ID: fmt.Sprintf("user-%d", i),
|
|
Path: volume.Path,
|
|
ReadOnly: false,
|
|
})
|
|
}
|
|
|
|
spec := firecracker.MachineSpec{
|
|
ID: firecracker.MachineID(machineID),
|
|
VCPUs: defaultGuestVCPUs,
|
|
MemoryMiB: defaultGuestMemoryMiB,
|
|
KernelImagePath: artifact.KernelImagePath,
|
|
RootFSPath: systemVolumePath,
|
|
KernelArgs: defaultGuestKernelArgs,
|
|
Drives: drives,
|
|
}
|
|
if err := spec.Validate(); err != nil {
|
|
return firecracker.MachineSpec{}, err
|
|
}
|
|
return spec, nil
|
|
}
|
|
|
|
func (d *Daemon) ensureArtifact(ctx context.Context, ref contracthost.ArtifactRef) (*model.ArtifactRecord, error) {
|
|
key := artifactKey(ref)
|
|
unlock := d.lockArtifact(key)
|
|
defer unlock()
|
|
|
|
if artifact, err := d.store.GetArtifact(ctx, ref); err == nil {
|
|
return artifact, nil
|
|
} else if err != store.ErrNotFound {
|
|
return nil, err
|
|
}
|
|
|
|
dir := filepath.Join(d.config.ArtifactsDir, key)
|
|
if err := os.MkdirAll(dir, 0o755); err != nil {
|
|
return nil, fmt.Errorf("create artifact dir %q: %w", dir, err)
|
|
}
|
|
|
|
kernelPath := filepath.Join(dir, "kernel")
|
|
rootFSPath := filepath.Join(dir, "rootfs")
|
|
if err := downloadFile(ctx, ref.KernelImageURL, kernelPath); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := downloadFile(ctx, ref.RootFSURL, rootFSPath); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
artifact := model.ArtifactRecord{
|
|
Ref: ref,
|
|
LocalKey: key,
|
|
LocalDir: dir,
|
|
KernelImagePath: kernelPath,
|
|
RootFSPath: rootFSPath,
|
|
CreatedAt: time.Now().UTC(),
|
|
}
|
|
if err := d.store.PutArtifact(ctx, artifact); err != nil {
|
|
return nil, err
|
|
}
|
|
return &artifact, nil
|
|
}
|
|
|
|
func (d *Daemon) loadAttachableUserVolumes(ctx context.Context, machineID contracthost.MachineID, volumeIDs []contracthost.VolumeID) ([]model.VolumeRecord, error) {
|
|
volumes := make([]model.VolumeRecord, 0, len(volumeIDs))
|
|
for _, volumeID := range volumeIDs {
|
|
volume, err := d.store.GetVolume(ctx, volumeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if volume.Kind != contracthost.VolumeKindUser {
|
|
return nil, fmt.Errorf("volume %q is not a user volume", volumeID)
|
|
}
|
|
if volume.AttachedMachineID != nil && *volume.AttachedMachineID != machineID {
|
|
return nil, fmt.Errorf("volume %q is already attached to machine %q", volumeID, *volume.AttachedMachineID)
|
|
}
|
|
volumes = append(volumes, *volume)
|
|
}
|
|
return volumes, nil
|
|
}
|