mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-15 05:02:11 +00:00
fix(foundry): move Better Auth operations from queues to actions to fix production auth timeout
The org actor's workflow queue is shared with GitHub sync, webhooks, task mutations, and billing (20+ queue names processed sequentially). During OAuth callback, auth operations would time out waiting behind long-running queue handlers, causing Better Auth's parseState to redirect to ?error=please_restart_the_process. Auth operations are simple SQLite reads/writes with no cross-actor side effects, so they are safe to run as actions that execute immediately without competing in the queue. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
84a80d59d7
commit
e7b9ac6854
7 changed files with 211 additions and 233 deletions
|
|
@ -272,6 +272,16 @@ Each user has independent unread state. The existing `userTaskState` table track
|
|||
|
||||
These two unread systems must stay in sync via the `user.mark_read` queue command.
|
||||
|
||||
## Better Auth: Actions, Not Queues
|
||||
|
||||
All Better Auth adapter operations (verification CRUD, session/email/account index mutations, and user-actor auth record mutations) are exposed as **actions**, not queue commands. This is an intentional exception to the normal pattern of using queues for mutations.
|
||||
|
||||
**Why:** The org actor's workflow queue is shared with GitHub sync, webhook processing, task mutations, and billing — 20+ queue names processed sequentially. During the OAuth callback, Better Auth needs to read/write verification records and upsert session/account indexes. If any long-running queue handler (e.g., a GitHub sync step) is ahead in the queue, auth operations time out (10s), `expectQueueResponse` throws a regular `Error`, and Better Auth's `parseState` catches it as a non-`StateError` → redirects to `?error=please_restart_the_process`.
|
||||
|
||||
**Why it's safe:** Auth operations are simple SQLite reads/writes scoped to a single actor instance with no cross-actor side effects. They don't need workflow replay semantics or sequential ordering guarantees relative to other queue commands.
|
||||
|
||||
**Rule:** Never move Better Auth operations back to queue commands. If new auth-related mutations are added, expose them as actions on the relevant actor.
|
||||
|
||||
## Maintenance
|
||||
|
||||
- Keep this file up to date whenever actor ownership, hierarchy, or lifecycle responsibilities change.
|
||||
|
|
|
|||
|
|
@ -1,21 +1,4 @@
|
|||
import {
|
||||
and,
|
||||
asc,
|
||||
count as sqlCount,
|
||||
desc,
|
||||
eq,
|
||||
gt,
|
||||
gte,
|
||||
inArray,
|
||||
isNotNull,
|
||||
isNull,
|
||||
like,
|
||||
lt,
|
||||
lte,
|
||||
ne,
|
||||
notInArray,
|
||||
or,
|
||||
} from "drizzle-orm";
|
||||
import { and, asc, count as sqlCount, desc, eq, gt, gte, inArray, isNotNull, isNull, like, lt, lte, ne, notInArray, or } from "drizzle-orm";
|
||||
import { authAccountIndex, authEmailIndex, authSessionIndex, authVerification } from "../db/schema.js";
|
||||
import { APP_SHELL_ORGANIZATION_ID } from "../constants.js";
|
||||
|
||||
|
|
@ -151,10 +134,7 @@ export async function betterAuthDeleteEmailIndexMutation(c: any, input: { email:
|
|||
await c.db.delete(authEmailIndex).where(eq(authEmailIndex.email, input.email)).run();
|
||||
}
|
||||
|
||||
export async function betterAuthUpsertAccountIndexMutation(
|
||||
c: any,
|
||||
input: { id: string; providerId: string; accountId: string; userId: string },
|
||||
) {
|
||||
export async function betterAuthUpsertAccountIndexMutation(c: any, input: { id: string; providerId: string; accountId: string; userId: string }) {
|
||||
assertAppOrganization(c);
|
||||
|
||||
const now = Date.now();
|
||||
|
|
@ -198,8 +178,15 @@ export async function betterAuthDeleteAccountIndexMutation(c: any, input: { id?:
|
|||
export async function betterAuthCreateVerificationMutation(c: any, input: { data: Record<string, unknown> }) {
|
||||
assertAppOrganization(c);
|
||||
|
||||
await c.db.insert(authVerification).values(input.data as any).run();
|
||||
return await c.db.select().from(authVerification).where(eq(authVerification.id, input.data.id as string)).get();
|
||||
await c.db
|
||||
.insert(authVerification)
|
||||
.values(input.data as any)
|
||||
.run();
|
||||
return await c.db
|
||||
.select()
|
||||
.from(authVerification)
|
||||
.where(eq(authVerification.id, input.data.id as string))
|
||||
.get();
|
||||
}
|
||||
|
||||
export async function betterAuthUpdateVerificationMutation(c: any, input: { where: any[]; update: Record<string, unknown> }) {
|
||||
|
|
@ -209,7 +196,11 @@ export async function betterAuthUpdateVerificationMutation(c: any, input: { wher
|
|||
if (!predicate) {
|
||||
return null;
|
||||
}
|
||||
await c.db.update(authVerification).set(input.update as any).where(predicate).run();
|
||||
await c.db
|
||||
.update(authVerification)
|
||||
.set(input.update as any)
|
||||
.where(predicate)
|
||||
.run();
|
||||
return await c.db.select().from(authVerification).where(predicate).get();
|
||||
}
|
||||
|
||||
|
|
@ -220,7 +211,11 @@ export async function betterAuthUpdateManyVerificationMutation(c: any, input: {
|
|||
if (!predicate) {
|
||||
return 0;
|
||||
}
|
||||
await c.db.update(authVerification).set(input.update as any).where(predicate).run();
|
||||
await c.db
|
||||
.update(authVerification)
|
||||
.set(input.update as any)
|
||||
.where(predicate)
|
||||
.run();
|
||||
const row = await c.db.select({ value: sqlCount() }).from(authVerification).where(predicate).get();
|
||||
return row?.value ?? 0;
|
||||
}
|
||||
|
|
@ -247,7 +242,71 @@ export async function betterAuthDeleteManyVerificationMutation(c: any, input: {
|
|||
return rows.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Better Auth adapter actions — exposed as actions (not queue commands) so they
|
||||
* execute immediately without competing in the organization workflow queue.
|
||||
*
|
||||
* The org actor's workflow queue is shared with GitHub sync, webhook processing,
|
||||
* task mutations, and billing operations. When the queue is busy, auth operations
|
||||
* would time out (10s), causing Better Auth's parseState to throw a non-StateError
|
||||
* which redirects to ?error=please_restart_the_process.
|
||||
*
|
||||
* Auth operations are safe to run as actions because they are simple SQLite
|
||||
* reads/writes scoped to this actor instance with no cross-actor side effects.
|
||||
*/
|
||||
export const organizationBetterAuthActions = {
|
||||
// --- Mutation actions (formerly queue commands) ---
|
||||
|
||||
async betterAuthCreateVerification(c: any, input: { data: Record<string, unknown> }) {
|
||||
return await betterAuthCreateVerificationMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthUpdateVerification(c: any, input: { where: any[]; update: Record<string, unknown> }) {
|
||||
return await betterAuthUpdateVerificationMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthUpdateManyVerification(c: any, input: { where: any[]; update: Record<string, unknown> }) {
|
||||
return await betterAuthUpdateManyVerificationMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthDeleteVerification(c: any, input: { where: any[] }) {
|
||||
await betterAuthDeleteVerificationMutation(c, input);
|
||||
return { ok: true };
|
||||
},
|
||||
|
||||
async betterAuthDeleteManyVerification(c: any, input: { where: any[] }) {
|
||||
return await betterAuthDeleteManyVerificationMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthUpsertSessionIndex(c: any, input: { sessionId: string; sessionToken: string; userId: string }) {
|
||||
return await betterAuthUpsertSessionIndexMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthDeleteSessionIndex(c: any, input: { sessionId?: string; sessionToken?: string }) {
|
||||
await betterAuthDeleteSessionIndexMutation(c, input);
|
||||
return { ok: true };
|
||||
},
|
||||
|
||||
async betterAuthUpsertEmailIndex(c: any, input: { email: string; userId: string }) {
|
||||
return await betterAuthUpsertEmailIndexMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthDeleteEmailIndex(c: any, input: { email: string }) {
|
||||
await betterAuthDeleteEmailIndexMutation(c, input);
|
||||
return { ok: true };
|
||||
},
|
||||
|
||||
async betterAuthUpsertAccountIndex(c: any, input: { id: string; providerId: string; accountId: string; userId: string }) {
|
||||
return await betterAuthUpsertAccountIndexMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthDeleteAccountIndex(c: any, input: { id?: string; providerId?: string; accountId?: string }) {
|
||||
await betterAuthDeleteAccountIndexMutation(c, input);
|
||||
return { ok: true };
|
||||
},
|
||||
|
||||
// --- Read actions ---
|
||||
|
||||
async betterAuthFindSessionIndex(c: any, input: { sessionId?: string; sessionToken?: string }) {
|
||||
assertAppOrganization(c);
|
||||
|
||||
|
|
|
|||
|
|
@ -7,17 +7,6 @@ export const ORGANIZATION_QUEUE_NAMES = [
|
|||
"organization.command.refreshTaskSummaryForBranch",
|
||||
"organization.command.snapshot.broadcast",
|
||||
"organization.command.syncGithubSession",
|
||||
"organization.command.better_auth.session_index.upsert",
|
||||
"organization.command.better_auth.session_index.delete",
|
||||
"organization.command.better_auth.email_index.upsert",
|
||||
"organization.command.better_auth.email_index.delete",
|
||||
"organization.command.better_auth.account_index.upsert",
|
||||
"organization.command.better_auth.account_index.delete",
|
||||
"organization.command.better_auth.verification.create",
|
||||
"organization.command.better_auth.verification.update",
|
||||
"organization.command.better_auth.verification.update_many",
|
||||
"organization.command.better_auth.verification.delete",
|
||||
"organization.command.better_auth.verification.delete_many",
|
||||
"organization.command.github.sync_progress.apply",
|
||||
"organization.command.github.webhook_receipt.record",
|
||||
"organization.command.github.organization_shell.sync_from_github",
|
||||
|
|
|
|||
|
|
@ -20,19 +20,6 @@ import {
|
|||
registerTaskBranchMutation,
|
||||
removeTaskSummaryMutation,
|
||||
} from "./actions/task-mutations.js";
|
||||
import {
|
||||
betterAuthCreateVerificationMutation,
|
||||
betterAuthDeleteAccountIndexMutation,
|
||||
betterAuthDeleteEmailIndexMutation,
|
||||
betterAuthDeleteManyVerificationMutation,
|
||||
betterAuthDeleteSessionIndexMutation,
|
||||
betterAuthDeleteVerificationMutation,
|
||||
betterAuthUpdateManyVerificationMutation,
|
||||
betterAuthUpdateVerificationMutation,
|
||||
betterAuthUpsertAccountIndexMutation,
|
||||
betterAuthUpsertEmailIndexMutation,
|
||||
betterAuthUpsertSessionIndexMutation,
|
||||
} from "./actions/better-auth.js";
|
||||
import {
|
||||
applyOrganizationFreePlanMutation,
|
||||
applyOrganizationStripeCustomerMutation,
|
||||
|
|
@ -85,31 +72,6 @@ const COMMAND_HANDLERS: Record<OrganizationQueueName, WorkflowHandler> = {
|
|||
return { ok: true };
|
||||
},
|
||||
|
||||
// Better Auth index mutations
|
||||
"organization.command.better_auth.session_index.upsert": async (c, body) => betterAuthUpsertSessionIndexMutation(c, body),
|
||||
"organization.command.better_auth.session_index.delete": async (c, body) => {
|
||||
await betterAuthDeleteSessionIndexMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
"organization.command.better_auth.email_index.upsert": async (c, body) => betterAuthUpsertEmailIndexMutation(c, body),
|
||||
"organization.command.better_auth.email_index.delete": async (c, body) => {
|
||||
await betterAuthDeleteEmailIndexMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
"organization.command.better_auth.account_index.upsert": async (c, body) => betterAuthUpsertAccountIndexMutation(c, body),
|
||||
"organization.command.better_auth.account_index.delete": async (c, body) => {
|
||||
await betterAuthDeleteAccountIndexMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
"organization.command.better_auth.verification.create": async (c, body) => betterAuthCreateVerificationMutation(c, body),
|
||||
"organization.command.better_auth.verification.update": async (c, body) => betterAuthUpdateVerificationMutation(c, body),
|
||||
"organization.command.better_auth.verification.update_many": async (c, body) => betterAuthUpdateManyVerificationMutation(c, body),
|
||||
"organization.command.better_auth.verification.delete": async (c, body) => {
|
||||
await betterAuthDeleteVerificationMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
"organization.command.better_auth.verification.delete_many": async (c, body) => betterAuthDeleteManyVerificationMutation(c, body),
|
||||
|
||||
// GitHub sync mutations
|
||||
"organization.command.github.sync_progress.apply": async (c, body) => {
|
||||
await applyGithubSyncProgressMutation(c, body);
|
||||
|
|
|
|||
|
|
@ -1,7 +1,51 @@
|
|||
import { asc, count as sqlCount, desc } from "drizzle-orm";
|
||||
import { applyJoinToRow, applyJoinToRows, buildWhere, columnFor, tableFor } from "../query-helpers.js";
|
||||
import {
|
||||
createAuthRecordMutation,
|
||||
updateAuthRecordMutation,
|
||||
updateManyAuthRecordsMutation,
|
||||
deleteAuthRecordMutation,
|
||||
deleteManyAuthRecordsMutation,
|
||||
} from "../workflow.js";
|
||||
|
||||
/**
|
||||
* Better Auth adapter actions — exposed as actions (not queue commands) so they
|
||||
* execute immediately without competing in the user workflow queue.
|
||||
*
|
||||
* The user actor's workflow queue is shared with profile upserts, session state,
|
||||
* and task state operations. When the queue is busy, auth operations would time
|
||||
* out (10s), causing Better Auth's parseState to throw a non-StateError which
|
||||
* redirects to ?error=please_restart_the_process.
|
||||
*
|
||||
* Auth operations are safe to run as actions because they are simple SQLite
|
||||
* reads/writes scoped to this actor instance with no cross-actor side effects.
|
||||
*/
|
||||
export const betterAuthActions = {
|
||||
// --- Mutation actions (formerly queue commands) ---
|
||||
|
||||
async betterAuthCreateRecord(c: any, input: { model: string; data: Record<string, unknown> }) {
|
||||
return await createAuthRecordMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthUpdateRecord(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
|
||||
return await updateAuthRecordMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthUpdateManyRecords(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
|
||||
return await updateManyAuthRecordsMutation(c, input);
|
||||
},
|
||||
|
||||
async betterAuthDeleteRecord(c: any, input: { model: string; where: any[] }) {
|
||||
await deleteAuthRecordMutation(c, input);
|
||||
return { ok: true };
|
||||
},
|
||||
|
||||
async betterAuthDeleteManyRecords(c: any, input: { model: string; where: any[] }) {
|
||||
return await deleteManyAuthRecordsMutation(c, input);
|
||||
},
|
||||
|
||||
// --- Read actions ---
|
||||
|
||||
// Better Auth adapter action — called by the Better Auth adapter in better-auth.ts.
|
||||
// Schema and behavior are constrained by Better Auth.
|
||||
async betterAuthFindOneRecord(c, input: { model: string; where: any[]; join?: any }) {
|
||||
|
|
|
|||
|
|
@ -19,11 +19,6 @@ import { buildWhere, columnFor, materializeRow, persistInput, persistPatch, tabl
|
|||
// ---------------------------------------------------------------------------
|
||||
|
||||
export const USER_QUEUE_NAMES = [
|
||||
"user.command.auth.create",
|
||||
"user.command.auth.update",
|
||||
"user.command.auth.update_many",
|
||||
"user.command.auth.delete",
|
||||
"user.command.auth.delete_many",
|
||||
"user.command.profile.upsert",
|
||||
"user.command.session_state.upsert",
|
||||
"user.command.task_state.upsert",
|
||||
|
|
@ -240,14 +235,6 @@ export async function deleteTaskStateMutation(c: any, input: { taskId: string; s
|
|||
type WorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
|
||||
|
||||
const COMMAND_HANDLERS: Record<UserQueueName, WorkflowHandler> = {
|
||||
"user.command.auth.create": async (c, body) => createAuthRecordMutation(c, body),
|
||||
"user.command.auth.update": async (c, body) => updateAuthRecordMutation(c, body),
|
||||
"user.command.auth.update_many": async (c, body) => updateManyAuthRecordsMutation(c, body),
|
||||
"user.command.auth.delete": async (c, body) => {
|
||||
await deleteAuthRecordMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
"user.command.auth.delete_many": async (c, body) => deleteManyAuthRecordsMutation(c, body),
|
||||
"user.command.profile.upsert": async (c, body) => upsertUserProfileMutation(c, body),
|
||||
"user.command.session_state.upsert": async (c, body) => upsertSessionStateMutation(c, body),
|
||||
"user.command.task_state.upsert": async (c, body) => upsertTaskStateMutation(c, body),
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import { organizationKey, userKey } from "../actors/keys.js";
|
|||
import { logger } from "../logging.js";
|
||||
import { expectQueueResponse } from "./queue.js";
|
||||
import { userWorkflowQueueName } from "../actors/user/workflow.js";
|
||||
import { organizationWorkflowQueueName } from "../actors/organization/queues.js";
|
||||
|
||||
const AUTH_BASE_PATH = "/v1/auth";
|
||||
const SESSION_COOKIE = "better-auth.session_token";
|
||||
|
|
@ -160,11 +159,6 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
return null;
|
||||
};
|
||||
|
||||
const ensureOrganizationVerification = async (queueName: string, payload: Record<string, unknown>) => {
|
||||
const organization = await appOrganization();
|
||||
return expectQueueResponse(await organization.send(organizationWorkflowQueueName(queueName as any), payload, { wait: true, timeout: 10_000 }));
|
||||
};
|
||||
|
||||
return {
|
||||
options: {
|
||||
useDatabaseGeneratedIds: false,
|
||||
|
|
@ -173,7 +167,8 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
create: async ({ model, data }) => {
|
||||
const transformed = await transformInput(data, model, "create", true);
|
||||
if (model === "verification") {
|
||||
return await ensureOrganizationVerification("organization.command.better_auth.verification.create", { data: transformed });
|
||||
const organization = await appOrganization();
|
||||
return await organization.betterAuthCreateVerification({ data: transformed });
|
||||
}
|
||||
|
||||
const userId = await resolveUserIdForQuery(model, undefined, transformed);
|
||||
|
|
@ -182,51 +177,31 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
}
|
||||
|
||||
const userActor = await getUser(userId);
|
||||
const created = expectQueueResponse(
|
||||
await userActor.send(userWorkflowQueueName("user.command.auth.create"), { model, data: transformed }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
const created = await userActor.betterAuthCreateRecord({ model, data: transformed });
|
||||
const organization = await appOrganization();
|
||||
|
||||
if (model === "user" && typeof transformed.email === "string" && transformed.email.length > 0) {
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.email_index.upsert"),
|
||||
{
|
||||
email: transformed.email.toLowerCase(),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
await organization.betterAuthUpsertEmailIndex({
|
||||
email: transformed.email.toLowerCase(),
|
||||
userId,
|
||||
});
|
||||
}
|
||||
|
||||
if (model === "session") {
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.session_index.upsert"),
|
||||
{
|
||||
sessionId: String(created.id),
|
||||
sessionToken: String(created.token),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
await organization.betterAuthUpsertSessionIndex({
|
||||
sessionId: String(created.id),
|
||||
sessionToken: String(created.token),
|
||||
userId,
|
||||
});
|
||||
}
|
||||
|
||||
if (model === "account") {
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.account_index.upsert"),
|
||||
{
|
||||
id: String(created.id),
|
||||
providerId: String(created.providerId),
|
||||
accountId: String(created.accountId),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
await organization.betterAuthUpsertAccountIndex({
|
||||
id: String(created.id),
|
||||
providerId: String(created.providerId),
|
||||
accountId: String(created.accountId),
|
||||
userId,
|
||||
});
|
||||
}
|
||||
|
||||
return (await transformOutput(created, model)) as any;
|
||||
|
|
@ -309,7 +284,8 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
const transformedWhere = transformWhereClause({ model, where, action: "update" });
|
||||
const transformedUpdate = (await transformInput(update as Record<string, unknown>, model, "update", true)) as Record<string, unknown>;
|
||||
if (model === "verification") {
|
||||
return await ensureOrganizationVerification("organization.command.better_auth.verification.update", {
|
||||
const organization = await appOrganization();
|
||||
return await organization.betterAuthUpdateVerification({
|
||||
where: transformedWhere,
|
||||
update: transformedUpdate,
|
||||
});
|
||||
|
|
@ -329,66 +305,42 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
: model === "session"
|
||||
? await userActor.betterAuthFindOneRecord({ model, where: transformedWhere })
|
||||
: null;
|
||||
const updated = expectQueueResponse(
|
||||
await userActor.send(
|
||||
userWorkflowQueueName("user.command.auth.update"),
|
||||
{ model, where: transformedWhere, update: transformedUpdate },
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
const updated = await userActor.betterAuthUpdateRecord({
|
||||
model,
|
||||
where: transformedWhere,
|
||||
update: transformedUpdate,
|
||||
});
|
||||
const organization = await appOrganization();
|
||||
|
||||
if (model === "user" && updated) {
|
||||
if (before?.email && before.email !== updated.email) {
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.email_index.delete"),
|
||||
{
|
||||
email: before.email.toLowerCase(),
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
await organization.betterAuthDeleteEmailIndex({
|
||||
email: before.email.toLowerCase(),
|
||||
});
|
||||
}
|
||||
if (updated.email) {
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.email_index.upsert"),
|
||||
{
|
||||
email: updated.email.toLowerCase(),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
await organization.betterAuthUpsertEmailIndex({
|
||||
email: updated.email.toLowerCase(),
|
||||
userId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (model === "session" && updated) {
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.session_index.upsert"),
|
||||
{
|
||||
sessionId: String(updated.id),
|
||||
sessionToken: String(updated.token),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
await organization.betterAuthUpsertSessionIndex({
|
||||
sessionId: String(updated.id),
|
||||
sessionToken: String(updated.token),
|
||||
userId,
|
||||
});
|
||||
}
|
||||
|
||||
if (model === "account" && updated) {
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.account_index.upsert"),
|
||||
{
|
||||
id: String(updated.id),
|
||||
providerId: String(updated.providerId),
|
||||
accountId: String(updated.accountId),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
await organization.betterAuthUpsertAccountIndex({
|
||||
id: String(updated.id),
|
||||
providerId: String(updated.providerId),
|
||||
accountId: String(updated.accountId),
|
||||
userId,
|
||||
});
|
||||
}
|
||||
|
||||
return updated ? ((await transformOutput(updated, model)) as any) : null;
|
||||
|
|
@ -398,7 +350,8 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
const transformedWhere = transformWhereClause({ model, where, action: "updateMany" });
|
||||
const transformedUpdate = (await transformInput(update as Record<string, unknown>, model, "update", true)) as Record<string, unknown>;
|
||||
if (model === "verification") {
|
||||
return await ensureOrganizationVerification("organization.command.better_auth.verification.update_many", {
|
||||
const organization = await appOrganization();
|
||||
return await organization.betterAuthUpdateManyVerification({
|
||||
where: transformedWhere,
|
||||
update: transformedUpdate,
|
||||
});
|
||||
|
|
@ -410,24 +363,18 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
}
|
||||
|
||||
const userActor = await getUser(userId);
|
||||
return expectQueueResponse(
|
||||
await userActor.send(
|
||||
userWorkflowQueueName("user.command.auth.update_many"),
|
||||
{ model, where: transformedWhere, update: transformedUpdate },
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
return await userActor.betterAuthUpdateManyRecords({
|
||||
model,
|
||||
where: transformedWhere,
|
||||
update: transformedUpdate,
|
||||
});
|
||||
},
|
||||
|
||||
delete: async ({ model, where }) => {
|
||||
const transformedWhere = transformWhereClause({ model, where, action: "delete" });
|
||||
if (model === "verification") {
|
||||
const organization = await appOrganization();
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.verification.delete"),
|
||||
{ where: transformedWhere },
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
await organization.betterAuthDeleteVerification({ where: transformedWhere });
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -439,46 +386,35 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
const userActor = await getUser(userId);
|
||||
const organization = await appOrganization();
|
||||
const before = await userActor.betterAuthFindOneRecord({ model, where: transformedWhere });
|
||||
await userActor.send(userWorkflowQueueName("user.command.auth.delete"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 });
|
||||
await userActor.betterAuthDeleteRecord({ model, where: transformedWhere });
|
||||
|
||||
if (model === "session" && before) {
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.session_index.delete"),
|
||||
{
|
||||
sessionId: before.id,
|
||||
sessionToken: before.token,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
await organization.betterAuthDeleteSessionIndex({
|
||||
sessionId: before.id,
|
||||
sessionToken: before.token,
|
||||
});
|
||||
}
|
||||
|
||||
if (model === "account" && before) {
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.account_index.delete"),
|
||||
{
|
||||
id: before.id,
|
||||
providerId: before.providerId,
|
||||
accountId: before.accountId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
await organization.betterAuthDeleteAccountIndex({
|
||||
id: before.id,
|
||||
providerId: before.providerId,
|
||||
accountId: before.accountId,
|
||||
});
|
||||
}
|
||||
|
||||
if (model === "user" && before?.email) {
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.email_index.delete"),
|
||||
{
|
||||
email: before.email.toLowerCase(),
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
await organization.betterAuthDeleteEmailIndex({
|
||||
email: before.email.toLowerCase(),
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
deleteMany: async ({ model, where }) => {
|
||||
const transformedWhere = transformWhereClause({ model, where, action: "deleteMany" });
|
||||
if (model === "verification") {
|
||||
return await ensureOrganizationVerification("organization.command.better_auth.verification.delete_many", { where: transformedWhere });
|
||||
const organization = await appOrganization();
|
||||
return await organization.betterAuthDeleteManyVerification({ where: transformedWhere });
|
||||
}
|
||||
|
||||
if (model === "session") {
|
||||
|
|
@ -489,18 +425,12 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
const userActor = await getUser(userId);
|
||||
const organization = await appOrganization();
|
||||
const sessions = await userActor.betterAuthFindManyRecords({ model, where: transformedWhere, limit: 5000 });
|
||||
const deleted = expectQueueResponse(
|
||||
await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
const deleted = await userActor.betterAuthDeleteManyRecords({ model, where: transformedWhere });
|
||||
for (const session of sessions) {
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.session_index.delete"),
|
||||
{
|
||||
sessionId: session.id,
|
||||
sessionToken: session.token,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
await organization.betterAuthDeleteSessionIndex({
|
||||
sessionId: session.id,
|
||||
sessionToken: session.token,
|
||||
});
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
|
@ -511,10 +441,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
}
|
||||
|
||||
const userActor = await getUser(userId);
|
||||
const deleted = expectQueueResponse(
|
||||
await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
return deleted;
|
||||
return await userActor.betterAuthDeleteManyRecords({ model, where: transformedWhere });
|
||||
},
|
||||
|
||||
count: async ({ model, where }) => {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue