From 71bce3fc6ddab1ea88a38e60dc6e309c126c0d9b Mon Sep 17 00:00:00 2001 From: Claudia Date: Mon, 2 Feb 2026 10:45:50 +0100 Subject: [PATCH] fix(context): Use direct message fetch instead of consumer API Simpler approach that avoids TypeScript issues with consumer options --- src/infra/event-context.ts | 57 +++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/src/infra/event-context.ts b/src/infra/event-context.ts index 4c0457caf..69dc81ff7 100644 --- a/src/infra/event-context.ts +++ b/src/infra/event-context.ts @@ -93,12 +93,12 @@ async function queryEvents( const nc = await connect({ servers: config.natsUrl }); try { - const js = nc.jetstream(); const jsm = await nc.jetstreamManager(); - // Check if stream exists + // Check if stream exists and get info + let streamInfo; try { - await jsm.streams.info(config.streamName); + streamInfo = await jsm.streams.info(config.streamName); } catch { console.log("[event-context] Stream not found, returning empty context"); return []; @@ -109,30 +109,31 @@ async function queryEvents( const maxEvents = options.maxEvents ?? 1000; const startTime = Date.now() - hoursBack * 60 * 60 * 1000; - // Create ephemeral consumer for querying - const consumerName = `context-query-${Date.now()}`; + // Get message count + const lastSeq = streamInfo.state.last_seq; + if (lastSeq === 0) { + return []; + } - // Build subject filter - const subject = options.agent - ? `${config.subjectPrefix}.${options.agent}.>` - : `${config.subjectPrefix}.>`; + // Start from end and go back + const startSeq = Math.max(1, lastSeq - maxEvents); - const consumer = await js.consumers.get(config.streamName, { - name: consumerName, - ack_policy: AckPolicy.None, - deliver_policy: DeliverPolicy.StartTime, - opt_start_time: new Date(startTime).toISOString(), - filter_subject: subject, - inactive_threshold: 30_000_000_000, // 30 seconds in nanos - }); - - // Fetch messages - const messages = await consumer.fetch({ max_messages: maxEvents, expires: 5000 }); - - for await (const msg of messages) { + // Fetch messages by sequence + for (let seq = startSeq; seq <= lastSeq && events.length < maxEvents; seq++) { try { + const msg = await jsm.streams.getMessage(config.streamName, { seq }); const data = JSON.parse(sc.decode(msg.data)) as StoredEvent; + // Filter by timestamp + if (data.timestamp < startTime) { + continue; + } + + // Filter by agent if specified + if (options.agent && data.agent !== options.agent && data.agent !== "agent") { + continue; + } + // Filter by session if specified if (options.sessionKey && data.session !== options.sessionKey) { continue; @@ -144,18 +145,12 @@ async function queryEvents( } events.push(data); - } catch (err) { - console.error("[event-context] Failed to parse event:", err); + } catch { + // Message may have been deleted + continue; } } - // Delete ephemeral consumer - try { - await jsm.consumers.delete(config.streamName, consumerName); - } catch { - // Ignore - may have auto-deleted - } - return events; } finally { await nc.drain();