mirror of
https://github.com/getcompanion-ai/computer-host.git
synced 2026-04-15 06:04:38 +00:00
host daemon (#2)
* feat: host daemon api scaffold * fix: use sparse writes * fix: unix socket length (<108 bytes)
This commit is contained in:
parent
4028bb5a1d
commit
e2f9e54970
21 changed files with 2111 additions and 372 deletions
413
internal/store/file_store.go
Normal file
413
internal/store/file_store.go
Normal file
|
|
@ -0,0 +1,413 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/getcompanion-ai/computer-host/internal/model"
|
||||
contracthost "github.com/getcompanion-ai/computer-host/contract"
|
||||
)
|
||||
|
||||
type FileStore struct {
|
||||
mu sync.Mutex
|
||||
statePath string
|
||||
operationsPath string
|
||||
}
|
||||
|
||||
type persistedOperations struct {
|
||||
Operations []model.OperationRecord `json:"operations"`
|
||||
}
|
||||
|
||||
type persistedState struct {
|
||||
Artifacts []model.ArtifactRecord `json:"artifacts"`
|
||||
Machines []model.MachineRecord `json:"machines"`
|
||||
Volumes []model.VolumeRecord `json:"volumes"`
|
||||
}
|
||||
|
||||
func NewFileStore(statePath string, operationsPath string) (*FileStore, error) {
|
||||
store := &FileStore{
|
||||
statePath: filepath.Clean(statePath),
|
||||
operationsPath: filepath.Clean(operationsPath),
|
||||
}
|
||||
if err := initializeJSONFile(store.statePath, emptyPersistedState()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := initializeJSONFile(store.operationsPath, emptyPersistedOperations()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func (s *FileStore) PutArtifact(_ context.Context, record model.ArtifactRecord) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.updateState(func(state *persistedState) error {
|
||||
for i := range state.Artifacts {
|
||||
if state.Artifacts[i].Ref == record.Ref {
|
||||
state.Artifacts[i] = record
|
||||
return nil
|
||||
}
|
||||
}
|
||||
state.Artifacts = append(state.Artifacts, record)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FileStore) GetArtifact(_ context.Context, ref contracthost.ArtifactRef) (*model.ArtifactRecord, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
state, err := s.readState()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range state.Artifacts {
|
||||
if state.Artifacts[i].Ref == ref {
|
||||
record := state.Artifacts[i]
|
||||
return &record, nil
|
||||
}
|
||||
}
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
func (s *FileStore) ListArtifacts(_ context.Context) ([]model.ArtifactRecord, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
state, err := s.readState()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return append([]model.ArtifactRecord(nil), state.Artifacts...), nil
|
||||
}
|
||||
|
||||
func (s *FileStore) CreateMachine(_ context.Context, record model.MachineRecord) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.updateState(func(state *persistedState) error {
|
||||
for _, machine := range state.Machines {
|
||||
if machine.ID == record.ID {
|
||||
return fmt.Errorf("store: machine %q already exists", record.ID)
|
||||
}
|
||||
}
|
||||
state.Machines = append(state.Machines, record)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FileStore) GetMachine(_ context.Context, id contracthost.MachineID) (*model.MachineRecord, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
state, err := s.readState()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range state.Machines {
|
||||
if state.Machines[i].ID == id {
|
||||
record := state.Machines[i]
|
||||
return &record, nil
|
||||
}
|
||||
}
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
func (s *FileStore) ListMachines(_ context.Context) ([]model.MachineRecord, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
state, err := s.readState()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return append([]model.MachineRecord(nil), state.Machines...), nil
|
||||
}
|
||||
|
||||
func (s *FileStore) UpdateMachine(_ context.Context, record model.MachineRecord) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.updateState(func(state *persistedState) error {
|
||||
for i := range state.Machines {
|
||||
if state.Machines[i].ID == record.ID {
|
||||
state.Machines[i] = record
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return ErrNotFound
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FileStore) DeleteMachine(_ context.Context, id contracthost.MachineID) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.updateState(func(state *persistedState) error {
|
||||
for i := range state.Machines {
|
||||
if state.Machines[i].ID == id {
|
||||
state.Machines = append(state.Machines[:i], state.Machines[i+1:]...)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return ErrNotFound
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FileStore) CreateVolume(_ context.Context, record model.VolumeRecord) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.updateState(func(state *persistedState) error {
|
||||
for _, volume := range state.Volumes {
|
||||
if volume.ID == record.ID {
|
||||
return fmt.Errorf("store: volume %q already exists", record.ID)
|
||||
}
|
||||
}
|
||||
state.Volumes = append(state.Volumes, record)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FileStore) GetVolume(_ context.Context, id contracthost.VolumeID) (*model.VolumeRecord, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
state, err := s.readState()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range state.Volumes {
|
||||
if state.Volumes[i].ID == id {
|
||||
record := state.Volumes[i]
|
||||
return &record, nil
|
||||
}
|
||||
}
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
func (s *FileStore) ListVolumes(_ context.Context) ([]model.VolumeRecord, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
state, err := s.readState()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return append([]model.VolumeRecord(nil), state.Volumes...), nil
|
||||
}
|
||||
|
||||
func (s *FileStore) UpdateVolume(_ context.Context, record model.VolumeRecord) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.updateState(func(state *persistedState) error {
|
||||
for i := range state.Volumes {
|
||||
if state.Volumes[i].ID == record.ID {
|
||||
state.Volumes[i] = record
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return ErrNotFound
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FileStore) DeleteVolume(_ context.Context, id contracthost.VolumeID) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.updateState(func(state *persistedState) error {
|
||||
for i := range state.Volumes {
|
||||
if state.Volumes[i].ID == id {
|
||||
state.Volumes = append(state.Volumes[:i], state.Volumes[i+1:]...)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return ErrNotFound
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FileStore) UpsertOperation(_ context.Context, record model.OperationRecord) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.updateOperations(func(operations *persistedOperations) error {
|
||||
for i := range operations.Operations {
|
||||
if operations.Operations[i].MachineID == record.MachineID {
|
||||
operations.Operations[i] = record
|
||||
return nil
|
||||
}
|
||||
}
|
||||
operations.Operations = append(operations.Operations, record)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FileStore) ListOperations(_ context.Context) ([]model.OperationRecord, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
operations, err := s.readOperations()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return append([]model.OperationRecord(nil), operations.Operations...), nil
|
||||
}
|
||||
|
||||
func (s *FileStore) DeleteOperation(_ context.Context, machineID contracthost.MachineID) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.updateOperations(func(operations *persistedOperations) error {
|
||||
for i := range operations.Operations {
|
||||
if operations.Operations[i].MachineID == machineID {
|
||||
operations.Operations = append(operations.Operations[:i], operations.Operations[i+1:]...)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FileStore) readOperations() (*persistedOperations, error) {
|
||||
var operations persistedOperations
|
||||
if err := readJSONFile(s.operationsPath, &operations); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
normalizeOperations(&operations)
|
||||
return &operations, nil
|
||||
}
|
||||
|
||||
func (s *FileStore) readState() (*persistedState, error) {
|
||||
var state persistedState
|
||||
if err := readJSONFile(s.statePath, &state); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
normalizeState(&state)
|
||||
return &state, nil
|
||||
}
|
||||
|
||||
func (s *FileStore) updateOperations(update func(*persistedOperations) error) error {
|
||||
operations, err := s.readOperations()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := update(operations); err != nil {
|
||||
return err
|
||||
}
|
||||
return writeJSONFileAtomically(s.operationsPath, operations)
|
||||
}
|
||||
|
||||
func (s *FileStore) updateState(update func(*persistedState) error) error {
|
||||
state, err := s.readState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := update(state); err != nil {
|
||||
return err
|
||||
}
|
||||
return writeJSONFileAtomically(s.statePath, state)
|
||||
}
|
||||
|
||||
func initializeJSONFile(path string, value any) error {
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
||||
return fmt.Errorf("create store dir for %q: %w", path, err)
|
||||
}
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
return nil
|
||||
} else if !os.IsNotExist(err) {
|
||||
return fmt.Errorf("stat store file %q: %w", path, err)
|
||||
}
|
||||
return writeJSONFileAtomically(path, value)
|
||||
}
|
||||
|
||||
func readJSONFile(path string, value any) error {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read store file %q: %w", path, err)
|
||||
}
|
||||
if err := json.Unmarshal(data, value); err != nil {
|
||||
return fmt.Errorf("decode store file %q: %w", path, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeJSONFileAtomically(path string, value any) error {
|
||||
payload, err := json.MarshalIndent(value, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal store file %q: %w", path, err)
|
||||
}
|
||||
payload = append(payload, '\n')
|
||||
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
||||
return fmt.Errorf("create store dir for %q: %w", path, err)
|
||||
}
|
||||
|
||||
tmpPath := path + ".tmp"
|
||||
file, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open temp store file %q: %w", tmpPath, err)
|
||||
}
|
||||
if _, err := file.Write(payload); err != nil {
|
||||
file.Close()
|
||||
return fmt.Errorf("write temp store file %q: %w", tmpPath, err)
|
||||
}
|
||||
if err := file.Sync(); err != nil {
|
||||
file.Close()
|
||||
return fmt.Errorf("sync temp store file %q: %w", tmpPath, err)
|
||||
}
|
||||
if err := file.Close(); err != nil {
|
||||
return fmt.Errorf("close temp store file %q: %w", tmpPath, err)
|
||||
}
|
||||
if err := os.Rename(tmpPath, path); err != nil {
|
||||
return fmt.Errorf("rename temp store file %q to %q: %w", tmpPath, path, err)
|
||||
}
|
||||
|
||||
dir, err := os.Open(filepath.Dir(path))
|
||||
if err != nil {
|
||||
return fmt.Errorf("open store dir for %q: %w", path, err)
|
||||
}
|
||||
if err := dir.Sync(); err != nil {
|
||||
dir.Close()
|
||||
return fmt.Errorf("sync store dir for %q: %w", path, err)
|
||||
}
|
||||
if err := dir.Close(); err != nil {
|
||||
return fmt.Errorf("close store dir for %q: %w", path, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func emptyPersistedState() persistedState {
|
||||
return persistedState{
|
||||
Artifacts: []model.ArtifactRecord{},
|
||||
Machines: []model.MachineRecord{},
|
||||
Volumes: []model.VolumeRecord{},
|
||||
}
|
||||
}
|
||||
|
||||
func emptyPersistedOperations() persistedOperations {
|
||||
return persistedOperations{Operations: []model.OperationRecord{}}
|
||||
}
|
||||
|
||||
func normalizeState(state *persistedState) {
|
||||
if state.Artifacts == nil {
|
||||
state.Artifacts = []model.ArtifactRecord{}
|
||||
}
|
||||
if state.Machines == nil {
|
||||
state.Machines = []model.MachineRecord{}
|
||||
}
|
||||
if state.Volumes == nil {
|
||||
state.Volumes = []model.VolumeRecord{}
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeOperations(operations *persistedOperations) {
|
||||
if operations.Operations == nil {
|
||||
operations.Operations = []model.OperationRecord{}
|
||||
}
|
||||
}
|
||||
130
internal/store/file_store_test.go
Normal file
130
internal/store/file_store_test.go
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/getcompanion-ai/computer-host/internal/model"
|
||||
contracthost "github.com/getcompanion-ai/computer-host/contract"
|
||||
)
|
||||
|
||||
func TestFileStorePersistsStateAndOperations(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
root := t.TempDir()
|
||||
statePath := filepath.Join(root, "state", "state.json")
|
||||
opsPath := filepath.Join(root, "state", "ops.json")
|
||||
|
||||
ctx := context.Background()
|
||||
first, err := NewFileStore(statePath, opsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("create file store: %v", err)
|
||||
}
|
||||
|
||||
artifact := model.ArtifactRecord{
|
||||
Ref: contracthost.ArtifactRef{
|
||||
KernelImageURL: "https://example.com/kernel",
|
||||
RootFSURL: "https://example.com/rootfs",
|
||||
},
|
||||
LocalKey: "artifact-key",
|
||||
LocalDir: filepath.Join(root, "artifacts", "artifact-key"),
|
||||
KernelImagePath: filepath.Join(root, "artifacts", "artifact-key", "kernel"),
|
||||
RootFSPath: filepath.Join(root, "artifacts", "artifact-key", "rootfs"),
|
||||
CreatedAt: time.Unix(1700000000, 0).UTC(),
|
||||
}
|
||||
if err := first.PutArtifact(ctx, artifact); err != nil {
|
||||
t.Fatalf("put artifact: %v", err)
|
||||
}
|
||||
|
||||
machine := model.MachineRecord{
|
||||
ID: "vm-1",
|
||||
Artifact: artifact.Ref,
|
||||
SystemVolumeID: "vm-1-system",
|
||||
RuntimeHost: "172.16.0.2",
|
||||
TapDevice: "fctap0",
|
||||
Ports: []contracthost.MachinePort{
|
||||
{Name: contracthost.MachinePortNameSSH, Port: 22, Protocol: contracthost.PortProtocolTCP},
|
||||
},
|
||||
Phase: contracthost.MachinePhaseRunning,
|
||||
PID: 1234,
|
||||
SocketPath: filepath.Join(root, "runtime", "machine.sock"),
|
||||
CreatedAt: time.Unix(1700000001, 0).UTC(),
|
||||
StartedAt: timePtr(time.Unix(1700000002, 0).UTC()),
|
||||
}
|
||||
if err := first.CreateMachine(ctx, machine); err != nil {
|
||||
t.Fatalf("create machine: %v", err)
|
||||
}
|
||||
|
||||
volume := model.VolumeRecord{
|
||||
ID: "vm-1-system",
|
||||
Kind: contracthost.VolumeKindSystem,
|
||||
AttachedMachineID: machineIDPtr("vm-1"),
|
||||
Path: filepath.Join(root, "machine-disks", "vm-1", "system.img"),
|
||||
CreatedAt: time.Unix(1700000003, 0).UTC(),
|
||||
}
|
||||
if err := first.CreateVolume(ctx, volume); err != nil {
|
||||
t.Fatalf("create volume: %v", err)
|
||||
}
|
||||
|
||||
operation := model.OperationRecord{
|
||||
MachineID: "vm-1",
|
||||
Type: model.MachineOperationCreate,
|
||||
StartedAt: time.Unix(1700000004, 0).UTC(),
|
||||
}
|
||||
if err := first.UpsertOperation(ctx, operation); err != nil {
|
||||
t.Fatalf("upsert operation: %v", err)
|
||||
}
|
||||
|
||||
second, err := NewFileStore(statePath, opsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("reopen file store: %v", err)
|
||||
}
|
||||
|
||||
gotArtifact, err := second.GetArtifact(ctx, artifact.Ref)
|
||||
if err != nil {
|
||||
t.Fatalf("get artifact after reopen: %v", err)
|
||||
}
|
||||
if gotArtifact.LocalKey != artifact.LocalKey {
|
||||
t.Fatalf("artifact local key mismatch: got %q want %q", gotArtifact.LocalKey, artifact.LocalKey)
|
||||
}
|
||||
|
||||
gotMachine, err := second.GetMachine(ctx, machine.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get machine after reopen: %v", err)
|
||||
}
|
||||
if gotMachine.Phase != contracthost.MachinePhaseRunning {
|
||||
t.Fatalf("machine phase mismatch: got %q", gotMachine.Phase)
|
||||
}
|
||||
if gotMachine.RuntimeHost != machine.RuntimeHost {
|
||||
t.Fatalf("runtime host mismatch: got %q want %q", gotMachine.RuntimeHost, machine.RuntimeHost)
|
||||
}
|
||||
|
||||
gotVolume, err := second.GetVolume(ctx, volume.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get volume after reopen: %v", err)
|
||||
}
|
||||
if gotVolume.AttachedMachineID == nil || *gotVolume.AttachedMachineID != "vm-1" {
|
||||
t.Fatalf("attached machine mismatch: got %#v", gotVolume.AttachedMachineID)
|
||||
}
|
||||
|
||||
operations, err := second.ListOperations(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("list operations after reopen: %v", err)
|
||||
}
|
||||
if len(operations) != 1 {
|
||||
t.Fatalf("operation count mismatch: got %d want 1", len(operations))
|
||||
}
|
||||
if operations[0].Type != model.MachineOperationCreate {
|
||||
t.Fatalf("operation type mismatch: got %q", operations[0].Type)
|
||||
}
|
||||
}
|
||||
|
||||
func timePtr(value time.Time) *time.Time {
|
||||
return &value
|
||||
}
|
||||
|
||||
func machineIDPtr(value contracthost.MachineID) *contracthost.MachineID {
|
||||
return &value
|
||||
}
|
||||
|
|
@ -2,11 +2,14 @@ package store
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/getcompanion-ai/computer-host/internal/model"
|
||||
contracthost "github.com/getcompanion-ai/computer-host/contract"
|
||||
)
|
||||
|
||||
var ErrNotFound = errors.New("store: not found")
|
||||
|
||||
type Store interface {
|
||||
PutArtifact(context.Context, model.ArtifactRecord) error
|
||||
GetArtifact(context.Context, contracthost.ArtifactRef) (*model.ArtifactRecord, error)
|
||||
|
|
@ -21,4 +24,7 @@ type Store interface {
|
|||
ListVolumes(context.Context) ([]model.VolumeRecord, error)
|
||||
UpdateVolume(context.Context, model.VolumeRecord) error
|
||||
DeleteVolume(context.Context, contracthost.VolumeID) error
|
||||
UpsertOperation(context.Context, model.OperationRecord) error
|
||||
ListOperations(context.Context) ([]model.OperationRecord, error)
|
||||
DeleteOperation(context.Context, contracthost.MachineID) error
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue