diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 117af30..a420ef4 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -47,6 +47,7 @@ type Daemon struct { machineLocks map[contracthost.MachineID]*sync.Mutex artifactLocks map[string]*sync.Mutex + publishedPortAllocMu sync.Mutex publishedPortsMu sync.Mutex publishedPortListeners map[contracthost.PublishedPortID]net.Listener } diff --git a/internal/daemon/lifecycle.go b/internal/daemon/lifecycle.go index 883f19a..ba1d7b7 100644 --- a/internal/daemon/lifecycle.go +++ b/internal/daemon/lifecycle.go @@ -2,6 +2,7 @@ package daemon import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -464,6 +465,9 @@ func (d *Daemon) reconcileSnapshot(ctx context.Context, operation model.Operatio // Snapshot completed successfully, just clear the journal return d.store.DeleteOperation(ctx, operation.MachineID) } + if !errors.Is(err, store.ErrNotFound) { + return fmt.Errorf("get snapshot %q during reconciliation: %w", *operation.SnapshotID, err) + } // Snapshot did not complete: clean up partial snapshot directory and resume the machine snapshotDir := filepath.Join(d.config.SnapshotsDir, string(*operation.SnapshotID)) _ = os.RemoveAll(snapshotDir) @@ -482,6 +486,9 @@ func (d *Daemon) reconcileRestore(ctx context.Context, operation model.Operation // Restore completed, clear journal return d.store.DeleteOperation(ctx, operation.MachineID) } + if !errors.Is(err, store.ErrNotFound) { + return fmt.Errorf("get machine %q during restore reconciliation: %w", operation.MachineID, err) + } // Restore did not complete: clean up partial machine directory and disk _ = os.RemoveAll(filepath.Dir(d.systemVolumePath(operation.MachineID))) _ = os.RemoveAll(d.machineRuntimeBaseDir(operation.MachineID)) diff --git a/internal/daemon/published_ports.go b/internal/daemon/published_ports.go index d55bec5..f91e71b 100644 --- a/internal/daemon/published_ports.go +++ b/internal/daemon/published_ports.go @@ -49,6 +49,9 @@ func (d *Daemon) CreatePublishedPort(ctx context.Context, machineID contracthost return nil, err } + d.publishedPortAllocMu.Lock() + defer d.publishedPortAllocMu.Unlock() + hostPort, err := d.allocatePublishedHostPort(ctx) if err != nil { return nil, err diff --git a/internal/daemon/review_regressions_test.go b/internal/daemon/review_regressions_test.go new file mode 100644 index 0000000..22cbd1d --- /dev/null +++ b/internal/daemon/review_regressions_test.go @@ -0,0 +1,292 @@ +package daemon + +import ( + "context" + "errors" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/getcompanion-ai/computer-host/internal/model" + hoststore "github.com/getcompanion-ai/computer-host/internal/store" + contracthost "github.com/getcompanion-ai/computer-host/contract" +) + +type blockingPublishedPortStore struct { + hoststore.Store + createEntered chan struct{} + releaseCreate chan struct{} + firstCreate sync.Once +} + +func (s *blockingPublishedPortStore) CreatePublishedPort(ctx context.Context, record model.PublishedPortRecord) error { + shouldBlock := false + s.firstCreate.Do(func() { + shouldBlock = true + close(s.createEntered) + }) + if shouldBlock { + <-s.releaseCreate + } + return s.Store.CreatePublishedPort(ctx, record) +} + +type snapshotLookupErrorStore struct { + hoststore.Store + err error +} + +func (s snapshotLookupErrorStore) GetSnapshot(context.Context, contracthost.SnapshotID) (*model.SnapshotRecord, error) { + return nil, s.err +} + +type machineLookupErrorStore struct { + hoststore.Store + err error +} + +func (s machineLookupErrorStore) GetMachine(context.Context, contracthost.MachineID) (*model.MachineRecord, error) { + return nil, s.err +} + +type publishedPortResult struct { + response *contracthost.CreatePublishedPortResponse + err error +} + +func TestCreatePublishedPortSerializesHostPortAllocationAcrossMachines(t *testing.T) { + root := t.TempDir() + cfg := testConfig(root) + baseStore, err := hoststore.NewFileStore(cfg.StatePath, cfg.OperationsPath) + if err != nil { + t.Fatalf("create file store: %v", err) + } + wrappedStore := &blockingPublishedPortStore{ + Store: baseStore, + createEntered: make(chan struct{}), + releaseCreate: make(chan struct{}), + } + + hostDaemon, err := New(cfg, wrappedStore, &fakeRuntime{}) + if err != nil { + t.Fatalf("create daemon: %v", err) + } + + for _, machineID := range []contracthost.MachineID{"vm-1", "vm-2"} { + if err := baseStore.CreateMachine(context.Background(), model.MachineRecord{ + ID: machineID, + RuntimeHost: "127.0.0.1", + Phase: contracthost.MachinePhaseRunning, + CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("create machine %q: %v", machineID, err) + } + } + + resultCh1 := make(chan publishedPortResult, 1) + go func() { + response, err := hostDaemon.CreatePublishedPort(context.Background(), "vm-1", contracthost.CreatePublishedPortRequest{ + PublishedPortID: "port-1", + Port: 8080, + }) + resultCh1 <- publishedPortResult{response: response, err: err} + }() + + <-wrappedStore.createEntered + + resultCh2 := make(chan publishedPortResult, 1) + go func() { + response, err := hostDaemon.CreatePublishedPort(context.Background(), "vm-2", contracthost.CreatePublishedPortRequest{ + PublishedPortID: "port-2", + Port: 9090, + }) + resultCh2 <- publishedPortResult{response: response, err: err} + }() + + close(wrappedStore.releaseCreate) + + first := waitPublishedPortResult(t, resultCh1) + second := waitPublishedPortResult(t, resultCh2) + + if first.err != nil { + t.Fatalf("first CreatePublishedPort returned error: %v", first.err) + } + if second.err != nil { + t.Fatalf("second CreatePublishedPort returned error: %v", second.err) + } + if first.response.Port.HostPort == second.response.Port.HostPort { + t.Fatalf("host ports collided: both requests received %d", first.response.Port.HostPort) + } + + t.Cleanup(func() { + hostDaemon.stopPublishedPortProxy("port-1") + hostDaemon.stopPublishedPortProxy("port-2") + }) +} + +func TestGetStorageReportHandlesSparseSnapshotPathsAndIncludesPublishedPortPool(t *testing.T) { + root := t.TempDir() + cfg := testConfig(root) + fileStore, err := hoststore.NewFileStore(cfg.StatePath, cfg.OperationsPath) + if err != nil { + t.Fatalf("create file store: %v", err) + } + + if err := fileStore.CreateSnapshot(context.Background(), model.SnapshotRecord{ + ID: "snap-1", + MachineID: "vm-1", + DiskPaths: []string{""}, + CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("create snapshot: %v", err) + } + if err := fileStore.CreatePublishedPort(context.Background(), model.PublishedPortRecord{ + ID: "port-1", + MachineID: "vm-1", + Port: 8080, + HostPort: minPublishedHostPort, + Protocol: contracthost.PortProtocolTCP, + CreatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("create published port: %v", err) + } + + hostDaemon, err := New(cfg, fileStore, &fakeRuntime{}) + if err != nil { + t.Fatalf("create daemon: %v", err) + } + + response, err := hostDaemon.GetStorageReport(context.Background()) + if err != nil { + t.Fatalf("GetStorageReport returned error: %v", err) + } + if response.Report.PublishedPorts != 1 { + t.Fatalf("published port count = %d, want 1", response.Report.PublishedPorts) + } + if len(response.Report.Snapshots) != 1 || response.Report.Snapshots[0].Bytes != 0 { + t.Fatalf("snapshot usage = %#v, want zero-byte sparse snapshot", response.Report.Snapshots) + } + + foundPool := false + for _, pool := range response.Report.Pools { + if pool.Pool == contracthost.StoragePoolPublishedPort { + foundPool = true + if pool.Bytes != 0 { + t.Fatalf("published port pool bytes = %d, want 0", pool.Bytes) + } + } + } + if !foundPool { + t.Fatal("storage report missing published ports pool") + } +} + +func TestReconcileSnapshotPreservesArtifactsOnUnexpectedStoreError(t *testing.T) { + root := t.TempDir() + cfg := testConfig(root) + baseStore, err := hoststore.NewFileStore(cfg.StatePath, cfg.OperationsPath) + if err != nil { + t.Fatalf("create file store: %v", err) + } + + lookupErr := errors.New("snapshot backend unavailable") + hostDaemon, err := New(cfg, snapshotLookupErrorStore{Store: baseStore, err: lookupErr}, &fakeRuntime{}) + if err != nil { + t.Fatalf("create daemon: %v", err) + } + + snapshotID := contracthost.SnapshotID("snap-1") + operation := model.OperationRecord{ + MachineID: "vm-1", + Type: model.MachineOperationSnapshot, + StartedAt: time.Now().UTC(), + SnapshotID: &snapshotID, + } + if err := baseStore.UpsertOperation(context.Background(), operation); err != nil { + t.Fatalf("upsert operation: %v", err) + } + + snapshotDir := filepath.Join(cfg.SnapshotsDir, string(snapshotID)) + if err := os.MkdirAll(snapshotDir, 0o755); err != nil { + t.Fatalf("create snapshot dir: %v", err) + } + + err = hostDaemon.reconcileSnapshot(context.Background(), operation) + if err == nil || !strings.Contains(err.Error(), "snapshot backend unavailable") { + t.Fatalf("reconcileSnapshot error = %v, want wrapped lookup error", err) + } + if _, statErr := os.Stat(snapshotDir); statErr != nil { + t.Fatalf("snapshot dir should be preserved, stat error: %v", statErr) + } + assertOperationCount(t, baseStore, 1) +} + +func TestReconcileRestorePreservesArtifactsOnUnexpectedStoreError(t *testing.T) { + root := t.TempDir() + cfg := testConfig(root) + baseStore, err := hoststore.NewFileStore(cfg.StatePath, cfg.OperationsPath) + if err != nil { + t.Fatalf("create file store: %v", err) + } + + lookupErr := errors.New("machine backend unavailable") + hostDaemon, err := New(cfg, machineLookupErrorStore{Store: baseStore, err: lookupErr}, &fakeRuntime{}) + if err != nil { + t.Fatalf("create daemon: %v", err) + } + + operation := model.OperationRecord{ + MachineID: "vm-1", + Type: model.MachineOperationRestore, + StartedAt: time.Now().UTC(), + } + if err := baseStore.UpsertOperation(context.Background(), operation); err != nil { + t.Fatalf("upsert operation: %v", err) + } + + systemVolumeDir := filepath.Dir(hostDaemon.systemVolumePath("vm-1")) + runtimeDir := hostDaemon.machineRuntimeBaseDir("vm-1") + for _, dir := range []string{systemVolumeDir, runtimeDir} { + if err := os.MkdirAll(dir, 0o755); err != nil { + t.Fatalf("create dir %q: %v", dir, err) + } + } + + err = hostDaemon.reconcileRestore(context.Background(), operation) + if err == nil || !strings.Contains(err.Error(), "machine backend unavailable") { + t.Fatalf("reconcileRestore error = %v, want wrapped lookup error", err) + } + for _, dir := range []string{systemVolumeDir, runtimeDir} { + if _, statErr := os.Stat(dir); statErr != nil { + t.Fatalf("directory %q should be preserved, stat error: %v", dir, statErr) + } + } + assertOperationCount(t, baseStore, 1) +} + +func waitPublishedPortResult(t *testing.T, ch <-chan publishedPortResult) publishedPortResult { + t.Helper() + + select { + case result := <-ch: + return result + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for CreatePublishedPort result") + return publishedPortResult{} + } +} + +func assertOperationCount(t *testing.T, store hoststore.Store, want int) { + t.Helper() + + operations, err := store.ListOperations(context.Background()) + if err != nil { + t.Fatalf("list operations: %v", err) + } + if len(operations) != want { + t.Fatalf("operation count = %d, want %d", len(operations), want) + } +} diff --git a/internal/daemon/storage_report.go b/internal/daemon/storage_report.go index c1ad1b9..bdf5817 100644 --- a/internal/daemon/storage_report.go +++ b/internal/daemon/storage_report.go @@ -43,6 +43,7 @@ func (d *Daemon) GetStorageReport(ctx context.Context) (*contracthost.GetStorage }{ {name: contracthost.StoragePoolArtifacts, path: d.config.ArtifactsDir}, {name: contracthost.StoragePoolMachineDisks, path: d.config.MachineDisksDir}, + {name: contracthost.StoragePoolPublishedPort, path: ""}, {name: contracthost.StoragePoolSnapshots, path: d.config.SnapshotsDir}, {name: contracthost.StoragePoolState, path: filepath.Dir(d.config.StatePath)}, } { @@ -133,8 +134,14 @@ func directorySize(root string) (int64, error) { } func fileSize(path string) (int64, error) { + if path == "" { + return 0, nil + } info, err := os.Stat(path) if err != nil { + if os.IsNotExist(err) { + return 0, nil + } return 0, fmt.Errorf("stat %q: %w", path, err) } return info.Size(), nil