From e7b9ac68545d137bdef9bd86fcc46d00faf2f9e6 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 16 Mar 2026 21:26:13 -0700 Subject: [PATCH] fix(foundry): move Better Auth operations from queues to actions to fix production auth timeout The org actor's workflow queue is shared with GitHub sync, webhooks, task mutations, and billing (20+ queue names processed sequentially). During OAuth callback, auth operations would time out waiting behind long-running queue handlers, causing Better Auth's parseState to redirect to ?error=please_restart_the_process. Auth operations are simple SQLite reads/writes with no cross-actor side effects, so they are safe to run as actions that execute immediately without competing in the queue. Co-Authored-By: Claude Opus 4.6 --- foundry/packages/backend/CLAUDE.md | 10 + .../organization/actions/better-auth.ts | 111 ++++++--- .../backend/src/actors/organization/queues.ts | 11 - .../src/actors/organization/workflow.ts | 38 --- .../src/actors/user/actions/better-auth.ts | 44 ++++ .../backend/src/actors/user/workflow.ts | 13 -- .../backend/src/services/better-auth.ts | 217 ++++++------------ 7 files changed, 211 insertions(+), 233 deletions(-) diff --git a/foundry/packages/backend/CLAUDE.md b/foundry/packages/backend/CLAUDE.md index 8824682..4bcda77 100644 --- a/foundry/packages/backend/CLAUDE.md +++ b/foundry/packages/backend/CLAUDE.md @@ -272,6 +272,16 @@ Each user has independent unread state. The existing `userTaskState` table track These two unread systems must stay in sync via the `user.mark_read` queue command. +## Better Auth: Actions, Not Queues + +All Better Auth adapter operations (verification CRUD, session/email/account index mutations, and user-actor auth record mutations) are exposed as **actions**, not queue commands. This is an intentional exception to the normal pattern of using queues for mutations. + +**Why:** The org actor's workflow queue is shared with GitHub sync, webhook processing, task mutations, and billing — 20+ queue names processed sequentially. During the OAuth callback, Better Auth needs to read/write verification records and upsert session/account indexes. If any long-running queue handler (e.g., a GitHub sync step) is ahead in the queue, auth operations time out (10s), `expectQueueResponse` throws a regular `Error`, and Better Auth's `parseState` catches it as a non-`StateError` → redirects to `?error=please_restart_the_process`. + +**Why it's safe:** Auth operations are simple SQLite reads/writes scoped to a single actor instance with no cross-actor side effects. They don't need workflow replay semantics or sequential ordering guarantees relative to other queue commands. + +**Rule:** Never move Better Auth operations back to queue commands. If new auth-related mutations are added, expose them as actions on the relevant actor. + ## Maintenance - Keep this file up to date whenever actor ownership, hierarchy, or lifecycle responsibilities change. diff --git a/foundry/packages/backend/src/actors/organization/actions/better-auth.ts b/foundry/packages/backend/src/actors/organization/actions/better-auth.ts index 37f34b4..ff2b5af 100644 --- a/foundry/packages/backend/src/actors/organization/actions/better-auth.ts +++ b/foundry/packages/backend/src/actors/organization/actions/better-auth.ts @@ -1,21 +1,4 @@ -import { - and, - asc, - count as sqlCount, - desc, - eq, - gt, - gte, - inArray, - isNotNull, - isNull, - like, - lt, - lte, - ne, - notInArray, - or, -} from "drizzle-orm"; +import { and, asc, count as sqlCount, desc, eq, gt, gte, inArray, isNotNull, isNull, like, lt, lte, ne, notInArray, or } from "drizzle-orm"; import { authAccountIndex, authEmailIndex, authSessionIndex, authVerification } from "../db/schema.js"; import { APP_SHELL_ORGANIZATION_ID } from "../constants.js"; @@ -151,10 +134,7 @@ export async function betterAuthDeleteEmailIndexMutation(c: any, input: { email: await c.db.delete(authEmailIndex).where(eq(authEmailIndex.email, input.email)).run(); } -export async function betterAuthUpsertAccountIndexMutation( - c: any, - input: { id: string; providerId: string; accountId: string; userId: string }, -) { +export async function betterAuthUpsertAccountIndexMutation(c: any, input: { id: string; providerId: string; accountId: string; userId: string }) { assertAppOrganization(c); const now = Date.now(); @@ -198,8 +178,15 @@ export async function betterAuthDeleteAccountIndexMutation(c: any, input: { id?: export async function betterAuthCreateVerificationMutation(c: any, input: { data: Record }) { assertAppOrganization(c); - await c.db.insert(authVerification).values(input.data as any).run(); - return await c.db.select().from(authVerification).where(eq(authVerification.id, input.data.id as string)).get(); + await c.db + .insert(authVerification) + .values(input.data as any) + .run(); + return await c.db + .select() + .from(authVerification) + .where(eq(authVerification.id, input.data.id as string)) + .get(); } export async function betterAuthUpdateVerificationMutation(c: any, input: { where: any[]; update: Record }) { @@ -209,7 +196,11 @@ export async function betterAuthUpdateVerificationMutation(c: any, input: { wher if (!predicate) { return null; } - await c.db.update(authVerification).set(input.update as any).where(predicate).run(); + await c.db + .update(authVerification) + .set(input.update as any) + .where(predicate) + .run(); return await c.db.select().from(authVerification).where(predicate).get(); } @@ -220,7 +211,11 @@ export async function betterAuthUpdateManyVerificationMutation(c: any, input: { if (!predicate) { return 0; } - await c.db.update(authVerification).set(input.update as any).where(predicate).run(); + await c.db + .update(authVerification) + .set(input.update as any) + .where(predicate) + .run(); const row = await c.db.select({ value: sqlCount() }).from(authVerification).where(predicate).get(); return row?.value ?? 0; } @@ -247,7 +242,71 @@ export async function betterAuthDeleteManyVerificationMutation(c: any, input: { return rows.length; } +/** + * Better Auth adapter actions — exposed as actions (not queue commands) so they + * execute immediately without competing in the organization workflow queue. + * + * The org actor's workflow queue is shared with GitHub sync, webhook processing, + * task mutations, and billing operations. When the queue is busy, auth operations + * would time out (10s), causing Better Auth's parseState to throw a non-StateError + * which redirects to ?error=please_restart_the_process. + * + * Auth operations are safe to run as actions because they are simple SQLite + * reads/writes scoped to this actor instance with no cross-actor side effects. + */ export const organizationBetterAuthActions = { + // --- Mutation actions (formerly queue commands) --- + + async betterAuthCreateVerification(c: any, input: { data: Record }) { + return await betterAuthCreateVerificationMutation(c, input); + }, + + async betterAuthUpdateVerification(c: any, input: { where: any[]; update: Record }) { + return await betterAuthUpdateVerificationMutation(c, input); + }, + + async betterAuthUpdateManyVerification(c: any, input: { where: any[]; update: Record }) { + return await betterAuthUpdateManyVerificationMutation(c, input); + }, + + async betterAuthDeleteVerification(c: any, input: { where: any[] }) { + await betterAuthDeleteVerificationMutation(c, input); + return { ok: true }; + }, + + async betterAuthDeleteManyVerification(c: any, input: { where: any[] }) { + return await betterAuthDeleteManyVerificationMutation(c, input); + }, + + async betterAuthUpsertSessionIndex(c: any, input: { sessionId: string; sessionToken: string; userId: string }) { + return await betterAuthUpsertSessionIndexMutation(c, input); + }, + + async betterAuthDeleteSessionIndex(c: any, input: { sessionId?: string; sessionToken?: string }) { + await betterAuthDeleteSessionIndexMutation(c, input); + return { ok: true }; + }, + + async betterAuthUpsertEmailIndex(c: any, input: { email: string; userId: string }) { + return await betterAuthUpsertEmailIndexMutation(c, input); + }, + + async betterAuthDeleteEmailIndex(c: any, input: { email: string }) { + await betterAuthDeleteEmailIndexMutation(c, input); + return { ok: true }; + }, + + async betterAuthUpsertAccountIndex(c: any, input: { id: string; providerId: string; accountId: string; userId: string }) { + return await betterAuthUpsertAccountIndexMutation(c, input); + }, + + async betterAuthDeleteAccountIndex(c: any, input: { id?: string; providerId?: string; accountId?: string }) { + await betterAuthDeleteAccountIndexMutation(c, input); + return { ok: true }; + }, + + // --- Read actions --- + async betterAuthFindSessionIndex(c: any, input: { sessionId?: string; sessionToken?: string }) { assertAppOrganization(c); diff --git a/foundry/packages/backend/src/actors/organization/queues.ts b/foundry/packages/backend/src/actors/organization/queues.ts index f84e818..bd13ec1 100644 --- a/foundry/packages/backend/src/actors/organization/queues.ts +++ b/foundry/packages/backend/src/actors/organization/queues.ts @@ -7,17 +7,6 @@ export const ORGANIZATION_QUEUE_NAMES = [ "organization.command.refreshTaskSummaryForBranch", "organization.command.snapshot.broadcast", "organization.command.syncGithubSession", - "organization.command.better_auth.session_index.upsert", - "organization.command.better_auth.session_index.delete", - "organization.command.better_auth.email_index.upsert", - "organization.command.better_auth.email_index.delete", - "organization.command.better_auth.account_index.upsert", - "organization.command.better_auth.account_index.delete", - "organization.command.better_auth.verification.create", - "organization.command.better_auth.verification.update", - "organization.command.better_auth.verification.update_many", - "organization.command.better_auth.verification.delete", - "organization.command.better_auth.verification.delete_many", "organization.command.github.sync_progress.apply", "organization.command.github.webhook_receipt.record", "organization.command.github.organization_shell.sync_from_github", diff --git a/foundry/packages/backend/src/actors/organization/workflow.ts b/foundry/packages/backend/src/actors/organization/workflow.ts index b64c997..ad7b2a1 100644 --- a/foundry/packages/backend/src/actors/organization/workflow.ts +++ b/foundry/packages/backend/src/actors/organization/workflow.ts @@ -20,19 +20,6 @@ import { registerTaskBranchMutation, removeTaskSummaryMutation, } from "./actions/task-mutations.js"; -import { - betterAuthCreateVerificationMutation, - betterAuthDeleteAccountIndexMutation, - betterAuthDeleteEmailIndexMutation, - betterAuthDeleteManyVerificationMutation, - betterAuthDeleteSessionIndexMutation, - betterAuthDeleteVerificationMutation, - betterAuthUpdateManyVerificationMutation, - betterAuthUpdateVerificationMutation, - betterAuthUpsertAccountIndexMutation, - betterAuthUpsertEmailIndexMutation, - betterAuthUpsertSessionIndexMutation, -} from "./actions/better-auth.js"; import { applyOrganizationFreePlanMutation, applyOrganizationStripeCustomerMutation, @@ -85,31 +72,6 @@ const COMMAND_HANDLERS: Record = { return { ok: true }; }, - // Better Auth index mutations - "organization.command.better_auth.session_index.upsert": async (c, body) => betterAuthUpsertSessionIndexMutation(c, body), - "organization.command.better_auth.session_index.delete": async (c, body) => { - await betterAuthDeleteSessionIndexMutation(c, body); - return { ok: true }; - }, - "organization.command.better_auth.email_index.upsert": async (c, body) => betterAuthUpsertEmailIndexMutation(c, body), - "organization.command.better_auth.email_index.delete": async (c, body) => { - await betterAuthDeleteEmailIndexMutation(c, body); - return { ok: true }; - }, - "organization.command.better_auth.account_index.upsert": async (c, body) => betterAuthUpsertAccountIndexMutation(c, body), - "organization.command.better_auth.account_index.delete": async (c, body) => { - await betterAuthDeleteAccountIndexMutation(c, body); - return { ok: true }; - }, - "organization.command.better_auth.verification.create": async (c, body) => betterAuthCreateVerificationMutation(c, body), - "organization.command.better_auth.verification.update": async (c, body) => betterAuthUpdateVerificationMutation(c, body), - "organization.command.better_auth.verification.update_many": async (c, body) => betterAuthUpdateManyVerificationMutation(c, body), - "organization.command.better_auth.verification.delete": async (c, body) => { - await betterAuthDeleteVerificationMutation(c, body); - return { ok: true }; - }, - "organization.command.better_auth.verification.delete_many": async (c, body) => betterAuthDeleteManyVerificationMutation(c, body), - // GitHub sync mutations "organization.command.github.sync_progress.apply": async (c, body) => { await applyGithubSyncProgressMutation(c, body); diff --git a/foundry/packages/backend/src/actors/user/actions/better-auth.ts b/foundry/packages/backend/src/actors/user/actions/better-auth.ts index 0fd950e..14e40bf 100644 --- a/foundry/packages/backend/src/actors/user/actions/better-auth.ts +++ b/foundry/packages/backend/src/actors/user/actions/better-auth.ts @@ -1,7 +1,51 @@ import { asc, count as sqlCount, desc } from "drizzle-orm"; import { applyJoinToRow, applyJoinToRows, buildWhere, columnFor, tableFor } from "../query-helpers.js"; +import { + createAuthRecordMutation, + updateAuthRecordMutation, + updateManyAuthRecordsMutation, + deleteAuthRecordMutation, + deleteManyAuthRecordsMutation, +} from "../workflow.js"; +/** + * Better Auth adapter actions — exposed as actions (not queue commands) so they + * execute immediately without competing in the user workflow queue. + * + * The user actor's workflow queue is shared with profile upserts, session state, + * and task state operations. When the queue is busy, auth operations would time + * out (10s), causing Better Auth's parseState to throw a non-StateError which + * redirects to ?error=please_restart_the_process. + * + * Auth operations are safe to run as actions because they are simple SQLite + * reads/writes scoped to this actor instance with no cross-actor side effects. + */ export const betterAuthActions = { + // --- Mutation actions (formerly queue commands) --- + + async betterAuthCreateRecord(c: any, input: { model: string; data: Record }) { + return await createAuthRecordMutation(c, input); + }, + + async betterAuthUpdateRecord(c: any, input: { model: string; where: any[]; update: Record }) { + return await updateAuthRecordMutation(c, input); + }, + + async betterAuthUpdateManyRecords(c: any, input: { model: string; where: any[]; update: Record }) { + return await updateManyAuthRecordsMutation(c, input); + }, + + async betterAuthDeleteRecord(c: any, input: { model: string; where: any[] }) { + await deleteAuthRecordMutation(c, input); + return { ok: true }; + }, + + async betterAuthDeleteManyRecords(c: any, input: { model: string; where: any[] }) { + return await deleteManyAuthRecordsMutation(c, input); + }, + + // --- Read actions --- + // Better Auth adapter action — called by the Better Auth adapter in better-auth.ts. // Schema and behavior are constrained by Better Auth. async betterAuthFindOneRecord(c, input: { model: string; where: any[]; join?: any }) { diff --git a/foundry/packages/backend/src/actors/user/workflow.ts b/foundry/packages/backend/src/actors/user/workflow.ts index 3bd3118..616633f 100644 --- a/foundry/packages/backend/src/actors/user/workflow.ts +++ b/foundry/packages/backend/src/actors/user/workflow.ts @@ -19,11 +19,6 @@ import { buildWhere, columnFor, materializeRow, persistInput, persistPatch, tabl // --------------------------------------------------------------------------- export const USER_QUEUE_NAMES = [ - "user.command.auth.create", - "user.command.auth.update", - "user.command.auth.update_many", - "user.command.auth.delete", - "user.command.auth.delete_many", "user.command.profile.upsert", "user.command.session_state.upsert", "user.command.task_state.upsert", @@ -240,14 +235,6 @@ export async function deleteTaskStateMutation(c: any, input: { taskId: string; s type WorkflowHandler = (loopCtx: any, body: any) => Promise; const COMMAND_HANDLERS: Record = { - "user.command.auth.create": async (c, body) => createAuthRecordMutation(c, body), - "user.command.auth.update": async (c, body) => updateAuthRecordMutation(c, body), - "user.command.auth.update_many": async (c, body) => updateManyAuthRecordsMutation(c, body), - "user.command.auth.delete": async (c, body) => { - await deleteAuthRecordMutation(c, body); - return { ok: true }; - }, - "user.command.auth.delete_many": async (c, body) => deleteManyAuthRecordsMutation(c, body), "user.command.profile.upsert": async (c, body) => upsertUserProfileMutation(c, body), "user.command.session_state.upsert": async (c, body) => upsertSessionStateMutation(c, body), "user.command.task_state.upsert": async (c, body) => upsertTaskStateMutation(c, body), diff --git a/foundry/packages/backend/src/services/better-auth.ts b/foundry/packages/backend/src/services/better-auth.ts index 1f1ae4c..b30034d 100644 --- a/foundry/packages/backend/src/services/better-auth.ts +++ b/foundry/packages/backend/src/services/better-auth.ts @@ -5,7 +5,6 @@ import { organizationKey, userKey } from "../actors/keys.js"; import { logger } from "../logging.js"; import { expectQueueResponse } from "./queue.js"; import { userWorkflowQueueName } from "../actors/user/workflow.js"; -import { organizationWorkflowQueueName } from "../actors/organization/queues.js"; const AUTH_BASE_PATH = "/v1/auth"; const SESSION_COOKIE = "better-auth.session_token"; @@ -160,11 +159,6 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin return null; }; - const ensureOrganizationVerification = async (queueName: string, payload: Record) => { - const organization = await appOrganization(); - return expectQueueResponse(await organization.send(organizationWorkflowQueueName(queueName as any), payload, { wait: true, timeout: 10_000 })); - }; - return { options: { useDatabaseGeneratedIds: false, @@ -173,7 +167,8 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin create: async ({ model, data }) => { const transformed = await transformInput(data, model, "create", true); if (model === "verification") { - return await ensureOrganizationVerification("organization.command.better_auth.verification.create", { data: transformed }); + const organization = await appOrganization(); + return await organization.betterAuthCreateVerification({ data: transformed }); } const userId = await resolveUserIdForQuery(model, undefined, transformed); @@ -182,51 +177,31 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } const userActor = await getUser(userId); - const created = expectQueueResponse( - await userActor.send(userWorkflowQueueName("user.command.auth.create"), { model, data: transformed }, { wait: true, timeout: 10_000 }), - ); + const created = await userActor.betterAuthCreateRecord({ model, data: transformed }); const organization = await appOrganization(); if (model === "user" && typeof transformed.email === "string" && transformed.email.length > 0) { - expectQueueResponse( - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.email_index.upsert"), - { - email: transformed.email.toLowerCase(), - userId, - }, - { wait: true, timeout: 10_000 }, - ), - ); + await organization.betterAuthUpsertEmailIndex({ + email: transformed.email.toLowerCase(), + userId, + }); } if (model === "session") { - expectQueueResponse( - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.session_index.upsert"), - { - sessionId: String(created.id), - sessionToken: String(created.token), - userId, - }, - { wait: true, timeout: 10_000 }, - ), - ); + await organization.betterAuthUpsertSessionIndex({ + sessionId: String(created.id), + sessionToken: String(created.token), + userId, + }); } if (model === "account") { - expectQueueResponse( - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.account_index.upsert"), - { - id: String(created.id), - providerId: String(created.providerId), - accountId: String(created.accountId), - userId, - }, - { wait: true, timeout: 10_000 }, - ), - ); + await organization.betterAuthUpsertAccountIndex({ + id: String(created.id), + providerId: String(created.providerId), + accountId: String(created.accountId), + userId, + }); } return (await transformOutput(created, model)) as any; @@ -309,7 +284,8 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const transformedWhere = transformWhereClause({ model, where, action: "update" }); const transformedUpdate = (await transformInput(update as Record, model, "update", true)) as Record; if (model === "verification") { - return await ensureOrganizationVerification("organization.command.better_auth.verification.update", { + const organization = await appOrganization(); + return await organization.betterAuthUpdateVerification({ where: transformedWhere, update: transformedUpdate, }); @@ -329,66 +305,42 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin : model === "session" ? await userActor.betterAuthFindOneRecord({ model, where: transformedWhere }) : null; - const updated = expectQueueResponse( - await userActor.send( - userWorkflowQueueName("user.command.auth.update"), - { model, where: transformedWhere, update: transformedUpdate }, - { wait: true, timeout: 10_000 }, - ), - ); + const updated = await userActor.betterAuthUpdateRecord({ + model, + where: transformedWhere, + update: transformedUpdate, + }); const organization = await appOrganization(); if (model === "user" && updated) { if (before?.email && before.email !== updated.email) { - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.email_index.delete"), - { - email: before.email.toLowerCase(), - }, - { wait: true, timeout: 10_000 }, - ); + await organization.betterAuthDeleteEmailIndex({ + email: before.email.toLowerCase(), + }); } if (updated.email) { - expectQueueResponse( - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.email_index.upsert"), - { - email: updated.email.toLowerCase(), - userId, - }, - { wait: true, timeout: 10_000 }, - ), - ); + await organization.betterAuthUpsertEmailIndex({ + email: updated.email.toLowerCase(), + userId, + }); } } if (model === "session" && updated) { - expectQueueResponse( - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.session_index.upsert"), - { - sessionId: String(updated.id), - sessionToken: String(updated.token), - userId, - }, - { wait: true, timeout: 10_000 }, - ), - ); + await organization.betterAuthUpsertSessionIndex({ + sessionId: String(updated.id), + sessionToken: String(updated.token), + userId, + }); } if (model === "account" && updated) { - expectQueueResponse( - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.account_index.upsert"), - { - id: String(updated.id), - providerId: String(updated.providerId), - accountId: String(updated.accountId), - userId, - }, - { wait: true, timeout: 10_000 }, - ), - ); + await organization.betterAuthUpsertAccountIndex({ + id: String(updated.id), + providerId: String(updated.providerId), + accountId: String(updated.accountId), + userId, + }); } return updated ? ((await transformOutput(updated, model)) as any) : null; @@ -398,7 +350,8 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const transformedWhere = transformWhereClause({ model, where, action: "updateMany" }); const transformedUpdate = (await transformInput(update as Record, model, "update", true)) as Record; if (model === "verification") { - return await ensureOrganizationVerification("organization.command.better_auth.verification.update_many", { + const organization = await appOrganization(); + return await organization.betterAuthUpdateManyVerification({ where: transformedWhere, update: transformedUpdate, }); @@ -410,24 +363,18 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } const userActor = await getUser(userId); - return expectQueueResponse( - await userActor.send( - userWorkflowQueueName("user.command.auth.update_many"), - { model, where: transformedWhere, update: transformedUpdate }, - { wait: true, timeout: 10_000 }, - ), - ); + return await userActor.betterAuthUpdateManyRecords({ + model, + where: transformedWhere, + update: transformedUpdate, + }); }, delete: async ({ model, where }) => { const transformedWhere = transformWhereClause({ model, where, action: "delete" }); if (model === "verification") { const organization = await appOrganization(); - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.verification.delete"), - { where: transformedWhere }, - { wait: true, timeout: 10_000 }, - ); + await organization.betterAuthDeleteVerification({ where: transformedWhere }); return; } @@ -439,46 +386,35 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const userActor = await getUser(userId); const organization = await appOrganization(); const before = await userActor.betterAuthFindOneRecord({ model, where: transformedWhere }); - await userActor.send(userWorkflowQueueName("user.command.auth.delete"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }); + await userActor.betterAuthDeleteRecord({ model, where: transformedWhere }); if (model === "session" && before) { - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.session_index.delete"), - { - sessionId: before.id, - sessionToken: before.token, - }, - { wait: true, timeout: 10_000 }, - ); + await organization.betterAuthDeleteSessionIndex({ + sessionId: before.id, + sessionToken: before.token, + }); } if (model === "account" && before) { - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.account_index.delete"), - { - id: before.id, - providerId: before.providerId, - accountId: before.accountId, - }, - { wait: true, timeout: 10_000 }, - ); + await organization.betterAuthDeleteAccountIndex({ + id: before.id, + providerId: before.providerId, + accountId: before.accountId, + }); } if (model === "user" && before?.email) { - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.email_index.delete"), - { - email: before.email.toLowerCase(), - }, - { wait: true, timeout: 10_000 }, - ); + await organization.betterAuthDeleteEmailIndex({ + email: before.email.toLowerCase(), + }); } }, deleteMany: async ({ model, where }) => { const transformedWhere = transformWhereClause({ model, where, action: "deleteMany" }); if (model === "verification") { - return await ensureOrganizationVerification("organization.command.better_auth.verification.delete_many", { where: transformedWhere }); + const organization = await appOrganization(); + return await organization.betterAuthDeleteManyVerification({ where: transformedWhere }); } if (model === "session") { @@ -489,18 +425,12 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const userActor = await getUser(userId); const organization = await appOrganization(); const sessions = await userActor.betterAuthFindManyRecords({ model, where: transformedWhere, limit: 5000 }); - const deleted = expectQueueResponse( - await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }), - ); + const deleted = await userActor.betterAuthDeleteManyRecords({ model, where: transformedWhere }); for (const session of sessions) { - await organization.send( - organizationWorkflowQueueName("organization.command.better_auth.session_index.delete"), - { - sessionId: session.id, - sessionToken: session.token, - }, - { wait: true, timeout: 10_000 }, - ); + await organization.betterAuthDeleteSessionIndex({ + sessionId: session.id, + sessionToken: session.token, + }); } return deleted; } @@ -511,10 +441,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } const userActor = await getUser(userId); - const deleted = expectQueueResponse( - await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }), - ); - return deleted; + return await userActor.betterAuthDeleteManyRecords({ model, where: transformedWhere }); }, count: async ({ model, where }) => {