fix(context): Use direct message fetch instead of consumer API
Some checks are pending
CI / install-check (push) Waiting to run
CI / checks (bunx tsc -p tsconfig.json --noEmit false, bun, build) (push) Waiting to run
CI / checks (pnpm build && pnpm lint, node, lint) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && bunx vitest run, bun, test) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks (pnpm format, node, format) (push) Waiting to run
CI / checks (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks (pnpm tsgo, node, tsgo) (push) Waiting to run
CI / secrets (push) Waiting to run
CI / checks-windows (pnpm build && pnpm lint, node, build & lint) (push) Waiting to run
CI / checks-windows (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks-windows (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks-macos (pnpm test, test) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift build --package-path apps/macos --configuration release; then
exit 0
fi
echo "swift build failed (attempt $attempt/3). Retrying…"
sleep $((attempt * 20))
done
exit 1
, build) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift test --package-path apps/macos --parallel --enable-code-coverage --show-codecov-path; then
exit 0
fi
echo "swift test failed (attempt $attempt/3). Retrying…"
sleep $((attempt … (push) Waiting to run
CI / macos-app (swiftlint --config .swiftlint.yml
swiftformat --lint apps/macos/Sources --config .swiftformat
, lint) (push) Waiting to run
CI / ios (push) Waiting to run
CI / android (./gradlew --no-daemon :app:assembleDebug, build) (push) Waiting to run
CI / android (./gradlew --no-daemon :app:testDebugUnitTest, test) (push) Waiting to run
Workflow Sanity / no-tabs (push) Waiting to run
Some checks are pending
CI / install-check (push) Waiting to run
CI / checks (bunx tsc -p tsconfig.json --noEmit false, bun, build) (push) Waiting to run
CI / checks (pnpm build && pnpm lint, node, lint) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && bunx vitest run, bun, test) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks (pnpm format, node, format) (push) Waiting to run
CI / checks (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks (pnpm tsgo, node, tsgo) (push) Waiting to run
CI / secrets (push) Waiting to run
CI / checks-windows (pnpm build && pnpm lint, node, build & lint) (push) Waiting to run
CI / checks-windows (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks-windows (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks-macos (pnpm test, test) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift build --package-path apps/macos --configuration release; then
exit 0
fi
echo "swift build failed (attempt $attempt/3). Retrying…"
sleep $((attempt * 20))
done
exit 1
, build) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift test --package-path apps/macos --parallel --enable-code-coverage --show-codecov-path; then
exit 0
fi
echo "swift test failed (attempt $attempt/3). Retrying…"
sleep $((attempt … (push) Waiting to run
CI / macos-app (swiftlint --config .swiftlint.yml
swiftformat --lint apps/macos/Sources --config .swiftformat
, lint) (push) Waiting to run
CI / ios (push) Waiting to run
CI / android (./gradlew --no-daemon :app:assembleDebug, build) (push) Waiting to run
CI / android (./gradlew --no-daemon :app:testDebugUnitTest, test) (push) Waiting to run
Workflow Sanity / no-tabs (push) Waiting to run
Simpler approach that avoids TypeScript issues with consumer options
This commit is contained in:
parent
f162fa1401
commit
71bce3fc6d
1 changed files with 26 additions and 31 deletions
|
|
@ -93,12 +93,12 @@ async function queryEvents(
|
|||
const nc = await connect({ servers: config.natsUrl });
|
||||
|
||||
try {
|
||||
const js = nc.jetstream();
|
||||
const jsm = await nc.jetstreamManager();
|
||||
|
||||
// Check if stream exists
|
||||
// Check if stream exists and get info
|
||||
let streamInfo;
|
||||
try {
|
||||
await jsm.streams.info(config.streamName);
|
||||
streamInfo = await jsm.streams.info(config.streamName);
|
||||
} catch {
|
||||
console.log("[event-context] Stream not found, returning empty context");
|
||||
return [];
|
||||
|
|
@ -109,30 +109,31 @@ async function queryEvents(
|
|||
const maxEvents = options.maxEvents ?? 1000;
|
||||
const startTime = Date.now() - hoursBack * 60 * 60 * 1000;
|
||||
|
||||
// Create ephemeral consumer for querying
|
||||
const consumerName = `context-query-${Date.now()}`;
|
||||
// Get message count
|
||||
const lastSeq = streamInfo.state.last_seq;
|
||||
if (lastSeq === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Build subject filter
|
||||
const subject = options.agent
|
||||
? `${config.subjectPrefix}.${options.agent}.>`
|
||||
: `${config.subjectPrefix}.>`;
|
||||
// Start from end and go back
|
||||
const startSeq = Math.max(1, lastSeq - maxEvents);
|
||||
|
||||
const consumer = await js.consumers.get(config.streamName, {
|
||||
name: consumerName,
|
||||
ack_policy: AckPolicy.None,
|
||||
deliver_policy: DeliverPolicy.StartTime,
|
||||
opt_start_time: new Date(startTime).toISOString(),
|
||||
filter_subject: subject,
|
||||
inactive_threshold: 30_000_000_000, // 30 seconds in nanos
|
||||
});
|
||||
|
||||
// Fetch messages
|
||||
const messages = await consumer.fetch({ max_messages: maxEvents, expires: 5000 });
|
||||
|
||||
for await (const msg of messages) {
|
||||
// Fetch messages by sequence
|
||||
for (let seq = startSeq; seq <= lastSeq && events.length < maxEvents; seq++) {
|
||||
try {
|
||||
const msg = await jsm.streams.getMessage(config.streamName, { seq });
|
||||
const data = JSON.parse(sc.decode(msg.data)) as StoredEvent;
|
||||
|
||||
// Filter by timestamp
|
||||
if (data.timestamp < startTime) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Filter by agent if specified
|
||||
if (options.agent && data.agent !== options.agent && data.agent !== "agent") {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Filter by session if specified
|
||||
if (options.sessionKey && data.session !== options.sessionKey) {
|
||||
continue;
|
||||
|
|
@ -144,18 +145,12 @@ async function queryEvents(
|
|||
}
|
||||
|
||||
events.push(data);
|
||||
} catch (err) {
|
||||
console.error("[event-context] Failed to parse event:", err);
|
||||
} catch {
|
||||
// Message may have been deleted
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Delete ephemeral consumer
|
||||
try {
|
||||
await jsm.consumers.delete(config.streamName, consumerName);
|
||||
} catch {
|
||||
// Ignore - may have auto-deleted
|
||||
}
|
||||
|
||||
return events;
|
||||
} finally {
|
||||
await nc.drain();
|
||||
|
|
|
|||
Loading…
Reference in a new issue