import { retryAsync } from "../infra/retry.js"; import type { OpenAiEmbeddingClient } from "./embeddings.js"; import { hashText } from "./internal.js"; export type OpenAiBatchRequest = { custom_id: string; method: "POST"; url: "/v1/embeddings"; body: { model: string; input: string; }; }; export type OpenAiBatchStatus = { id?: string; status?: string; output_file_id?: string | null; error_file_id?: string | null; }; export type OpenAiBatchOutputLine = { custom_id?: string; response?: { status_code?: number; body?: { data?: Array<{ embedding?: number[]; index?: number }>; error?: { message?: string }; }; }; error?: { message?: string }; }; export const OPENAI_BATCH_ENDPOINT = "/v1/embeddings"; const OPENAI_BATCH_COMPLETION_WINDOW = "24h"; const OPENAI_BATCH_MAX_REQUESTS = 50000; const OPENAI_BATCH_RETRY = { attempts: 3, minDelayMs: 500, maxDelayMs: 5000, jitter: 0.1, }; type RetryableError = Error & { status?: number }; function isRetryableBatchError(err: unknown): boolean { const status = typeof (err as RetryableError)?.status === "number" ? (err as RetryableError).status : undefined; if (typeof status === "number") { return status === 429 || status >= 500; } const message = err instanceof Error ? err.message : String(err); return /timeout|timed out|ECONNRESET|ECONNREFUSED|EHOSTUNREACH|ENOTFOUND|EAI_AGAIN|network|fetch failed|upstream connect/i.test( message, ); } function formatRetryError(err: unknown): string { return err instanceof Error ? err.message : String(err); } function getOpenAiBaseUrl(openAi: OpenAiEmbeddingClient): string { return openAi.baseUrl?.replace(/\/$/, "") ?? ""; } function getOpenAiHeaders( openAi: OpenAiEmbeddingClient, params: { json: boolean }, ): Record { const headers = openAi.headers ? { ...openAi.headers } : {}; if (params.json) { if (!headers["Content-Type"] && !headers["content-type"]) { headers["Content-Type"] = "application/json"; } } else { delete headers["Content-Type"]; delete headers["content-type"]; } return headers; } function splitOpenAiBatchRequests(requests: OpenAiBatchRequest[]): OpenAiBatchRequest[][] { if (requests.length <= OPENAI_BATCH_MAX_REQUESTS) return [requests]; const groups: OpenAiBatchRequest[][] = []; for (let i = 0; i < requests.length; i += OPENAI_BATCH_MAX_REQUESTS) { groups.push(requests.slice(i, i + OPENAI_BATCH_MAX_REQUESTS)); } return groups; } async function fetchOpenAiWithRetry(params: { openAi: OpenAiEmbeddingClient; url: string; init?: RequestInit; label: string; debug?: (message: string, data?: Record) => void; }): Promise { return await retryAsync( async () => { const res = await fetch(params.url, params.init); if (!res.ok) { const text = await res.text(); const err = new Error(`openai batch ${params.label} failed: ${res.status} ${text}`); (err as RetryableError).status = res.status; throw err; } return res; }, { ...OPENAI_BATCH_RETRY, label: params.label, shouldRetry: isRetryableBatchError, onRetry: (info) => { params.debug?.( `openai batch ${params.label} retry ${info.attempt}/${info.maxAttempts} in ${info.delayMs}ms`, { error: formatRetryError(info.err) }, ); }, }, ); } async function submitOpenAiBatch(params: { openAi: OpenAiEmbeddingClient; requests: OpenAiBatchRequest[]; agentId: string; debug?: (message: string, data?: Record) => void; }): Promise { const baseUrl = getOpenAiBaseUrl(params.openAi); const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n"); const form = new FormData(); form.append("purpose", "batch"); form.append( "file", new Blob([jsonl], { type: "application/jsonl" }), `memory-embeddings.${hashText(String(Date.now()))}.jsonl`, ); const fileRes = await fetchOpenAiWithRetry({ openAi: params.openAi, url: `${baseUrl}/files`, init: { method: "POST", headers: getOpenAiHeaders(params.openAi, { json: false }), body: form, }, label: "file upload", debug: params.debug, }); const filePayload = (await fileRes.json()) as { id?: string }; if (!filePayload.id) { throw new Error("openai batch file upload failed: missing file id"); } const batchRes = await fetchOpenAiWithRetry({ openAi: params.openAi, url: `${baseUrl}/batches`, init: { method: "POST", headers: getOpenAiHeaders(params.openAi, { json: true }), body: JSON.stringify({ input_file_id: filePayload.id, endpoint: OPENAI_BATCH_ENDPOINT, completion_window: OPENAI_BATCH_COMPLETION_WINDOW, metadata: { source: "clawdbot-memory", agent: params.agentId, }, }), }, label: "create", debug: params.debug, }); return (await batchRes.json()) as OpenAiBatchStatus; } async function fetchOpenAiBatchStatus(params: { openAi: OpenAiEmbeddingClient; batchId: string; debug?: (message: string, data?: Record) => void; }): Promise { const baseUrl = getOpenAiBaseUrl(params.openAi); const res = await fetchOpenAiWithRetry({ openAi: params.openAi, url: `${baseUrl}/batches/${params.batchId}`, init: { headers: getOpenAiHeaders(params.openAi, { json: true }) }, label: "status", debug: params.debug, }); return (await res.json()) as OpenAiBatchStatus; } async function fetchOpenAiFileContent(params: { openAi: OpenAiEmbeddingClient; fileId: string; debug?: (message: string, data?: Record) => void; }): Promise { const baseUrl = getOpenAiBaseUrl(params.openAi); const res = await fetchOpenAiWithRetry({ openAi: params.openAi, url: `${baseUrl}/files/${params.fileId}/content`, init: { headers: getOpenAiHeaders(params.openAi, { json: true }) }, label: "file content", debug: params.debug, }); return await res.text(); } function parseOpenAiBatchOutput(text: string): OpenAiBatchOutputLine[] { if (!text.trim()) return []; return text .split("\n") .map((line) => line.trim()) .filter(Boolean) .map((line) => JSON.parse(line) as OpenAiBatchOutputLine); } async function readOpenAiBatchError(params: { openAi: OpenAiEmbeddingClient; errorFileId: string; debug?: (message: string, data?: Record) => void; }): Promise { try { const content = await fetchOpenAiFileContent({ openAi: params.openAi, fileId: params.errorFileId, debug: params.debug, }); const lines = parseOpenAiBatchOutput(content); const first = lines.find((line) => line.error?.message || line.response?.body?.error); const message = first?.error?.message ?? (typeof first?.response?.body?.error?.message === "string" ? first?.response?.body?.error?.message : undefined); return message; } catch (err) { const message = err instanceof Error ? err.message : String(err); return message ? `error file unavailable: ${message}` : undefined; } } async function waitForOpenAiBatch(params: { openAi: OpenAiEmbeddingClient; batchId: string; wait: boolean; pollIntervalMs: number; timeoutMs: number; debug?: (message: string, data?: Record) => void; initial?: OpenAiBatchStatus; }): Promise<{ outputFileId: string; errorFileId?: string }> { const start = Date.now(); let current: OpenAiBatchStatus | undefined = params.initial; while (true) { const status = current ?? (await fetchOpenAiBatchStatus({ openAi: params.openAi, batchId: params.batchId, debug: params.debug, })); const state = status.status ?? "unknown"; if (state === "completed") { if (!status.output_file_id) { throw new Error(`openai batch ${params.batchId} completed without output file`); } return { outputFileId: status.output_file_id, errorFileId: status.error_file_id ?? undefined, }; } if (["failed", "expired", "cancelled", "canceled"].includes(state)) { const detail = status.error_file_id ? await readOpenAiBatchError({ openAi: params.openAi, errorFileId: status.error_file_id, debug: params.debug, }) : undefined; const suffix = detail ? `: ${detail}` : ""; throw new Error(`openai batch ${params.batchId} ${state}${suffix}`); } if (!params.wait) { throw new Error(`openai batch ${params.batchId} still ${state}; wait disabled`); } if (Date.now() - start > params.timeoutMs) { throw new Error(`openai batch ${params.batchId} timed out after ${params.timeoutMs}ms`); } params.debug?.(`openai batch ${params.batchId} ${state}; waiting ${params.pollIntervalMs}ms`); await new Promise((resolve) => setTimeout(resolve, params.pollIntervalMs)); current = undefined; } } async function runWithConcurrency(tasks: Array<() => Promise>, limit: number): Promise { if (tasks.length === 0) return []; const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); const results: T[] = Array.from({ length: tasks.length }); let next = 0; let firstError: unknown = null; const workers = Array.from({ length: resolvedLimit }, async () => { while (true) { if (firstError) return; const index = next; next += 1; if (index >= tasks.length) return; try { results[index] = await tasks[index](); } catch (err) { firstError = err; return; } } }); await Promise.allSettled(workers); if (firstError) throw firstError; return results; } export async function runOpenAiEmbeddingBatches(params: { openAi: OpenAiEmbeddingClient; agentId: string; requests: OpenAiBatchRequest[]; wait: boolean; pollIntervalMs: number; timeoutMs: number; concurrency: number; debug?: (message: string, data?: Record) => void; }): Promise> { if (params.requests.length === 0) return new Map(); const groups = splitOpenAiBatchRequests(params.requests); const byCustomId = new Map(); const tasks = groups.map((group, groupIndex) => async () => { const batchInfo = await submitOpenAiBatch({ openAi: params.openAi, requests: group, agentId: params.agentId, debug: params.debug, }); if (!batchInfo.id) { throw new Error("openai batch create failed: missing batch id"); } params.debug?.("memory embeddings: openai batch created", { batchId: batchInfo.id, status: batchInfo.status, group: groupIndex + 1, groups: groups.length, requests: group.length, }); if (!params.wait && batchInfo.status !== "completed") { throw new Error( `openai batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`, ); } const completed = batchInfo.status === "completed" ? { outputFileId: batchInfo.output_file_id ?? "", errorFileId: batchInfo.error_file_id ?? undefined, } : await waitForOpenAiBatch({ openAi: params.openAi, batchId: batchInfo.id, wait: params.wait, pollIntervalMs: params.pollIntervalMs, timeoutMs: params.timeoutMs, debug: params.debug, initial: batchInfo, }); if (!completed.outputFileId) { throw new Error(`openai batch ${batchInfo.id} completed without output file`); } const content = await fetchOpenAiFileContent({ openAi: params.openAi, fileId: completed.outputFileId, debug: params.debug, }); const outputLines = parseOpenAiBatchOutput(content); const errors: string[] = []; const remaining = new Set(group.map((request) => request.custom_id)); for (const line of outputLines) { const customId = line.custom_id; if (!customId) continue; remaining.delete(customId); if (line.error?.message) { errors.push(`${customId}: ${line.error.message}`); continue; } const response = line.response; const statusCode = response?.status_code ?? 0; if (statusCode >= 400) { const message = response?.body?.error?.message ?? (typeof response?.body === "string" ? response.body : undefined) ?? "unknown error"; errors.push(`${customId}: ${message}`); continue; } const data = response?.body?.data ?? []; const embedding = data[0]?.embedding ?? []; if (embedding.length === 0) { errors.push(`${customId}: empty embedding`); continue; } byCustomId.set(customId, embedding); } if (errors.length > 0) { throw new Error(`openai batch ${batchInfo.id} failed: ${errors.join("; ")}`); } if (remaining.size > 0) { throw new Error(`openai batch ${batchInfo.id} missing ${remaining.size} embedding responses`); } }); params.debug?.("memory embeddings: openai batch submit", { requests: params.requests.length, groups: groups.length, wait: params.wait, concurrency: params.concurrency, pollIntervalMs: params.pollIntervalMs, timeoutMs: params.timeoutMs, }); await runWithConcurrency(tasks, params.concurrency); return byCustomId; }