Queue Messages While Streaming
Friday streams responses. Claude generates text token by token, Friday relays it to Telegram in real time, and the message builds up on screen as the model thinks. This works well – until you want to say something while the model is still talking.
The Session class tracks whether a stream is active with a #streaming flag. When prompt() was called during a stream, it refused:
prompt(message: string): { error?: string; status: number } {
if (this.#streaming) {
return { error: 'Already streaming', status: 409 };
}
// ...
}
This is from
src/acp/session.ts(before)
A 409. The bridge catches that error and replies with the text in Telegram. Then the message is gone – no retry, no buffer, no record that it ever happened.
In practice this means watching Friday stream a response, typing a follow-up thought, and getting slapped with "Already streaming." Then waiting for the stream to finish. Then retyping. Sometimes forgetting what you were going to say.
The problem isn't that streaming blocks concurrent prompts – that's a reasonable constraint. The problem is that a blocked prompt destroys the message. User input should never be silently discarded. If the system can't handle it right now, it should hold onto it until it can.
A Queue in Session
The fix is a queue. Messages that arrive during a stream get pushed onto an array instead of rejected.
The queue lives in Session, not in the bridge. The bridge shouldn't need to know about ordering or retry logic – it calls prompt() and reacts to what comes back. Streaming state is a session concern, so queuing is too.
export class Session {
#queue: string[] = [];
#streaming = false;
// ...
}
The updated prompt() method checks #streaming and branches:
prompt(message: string): {
error?: string;
queued?: boolean;
status: number;
} {
if (this.#streaming) {
this.#queue.push(message);
return { status: 202, queued: true };
}
this.#streaming = true;
this.#done = false;
this.#buffer.clear();
this.#currentPrompt = message;
// ... build options, start stream
this.#query = sdkQuery({ prompt: message, options });
this.#streamTurn();
return { status: 202 };
}
This is from
src/acp/session.ts
Two changes. First, the #streaming branch pushes instead of rejecting – status: 202 with queued: true instead of status: 409 with an error string. Second, the return type now includes a queued flag so callers can distinguish between "started a new stream" and "held for later."
The queue is unbounded. There's no cap. In theory someone could queue hundreds of messages. In practice, a human typing in Telegram can't outpace Claude finishing a stream. Adding a limit would mean deciding what to do when the limit is hit – reject? drop the oldest? – and every answer to that question is worse than just holding the messages. Unbounded is the simplest thing that works and the hardest to get wrong.
Draining the Queue
Queuing messages is half the problem. The other half is submitting them when the stream finishes.
The drain logic lives in the finally block of #streamTurn() – the method that processes the SDK's event stream:
async #streamTurn(): Promise<void> {
try {
// ... process stream events
} catch (err) {
this.#buffer.push({
type: 'error',
message: err instanceof Error ? err.message : String(err),
});
} finally {
this.#streaming = false;
this.#done = true;
this.#query = null;
const next = this.#queue.shift();
if (next) {
this.prompt(next);
}
}
}
This is from
src/acp/session.ts
Three things matter about placement.
It's in finally, not after the try. If the stream errors – network failure, SDK crash, malformed response – the queued messages still get processed. A catch block handles the error; finally handles the cleanup. The queue is cleanup.
#streaming = false comes before the drain. This is the critical ordering. When this.prompt(next) runs, it checks #streaming. If it's still true, the message would be re-queued instead of starting a new stream. Setting #streaming = false first means the shifted message enters prompt() through the normal path – it starts a fresh stream, sets #streaming = true again, and the cycle continues.
It shifts one message, not all of them. Each prompt() call starts a stream. When that stream finishes, its finally block shifts the next message. If three messages are queued, they'll run sequentially – stream, drain, stream, drain, stream, drain – not all at once. The recursion is indirect: finally calls prompt(), which calls #streamTurn(), whose finally will drain the next one. Each message gets its own full stream cycle.
The Bridge Reacts
The session handles queuing. The bridge handles acknowledgement.
Before the queue, the bridge's message handler replied with the error text when prompt() returned a 409. Now it needs to handle a third case: the message was accepted but deferred.
this.#bot.on('message:text', async (ctx) => {
const text = ctx.message.text;
if (text.startsWith('/')) return;
this.#resetMessage();
const result = this.#session.prompt(text);
if (result.queued) {
await this.#bot.api.setMessageReaction(
ctx.chat.id,
ctx.message.message_id,
[{ type: 'emoji', emoji: '👍' }]
);
return;
}
if (result.error) {
await ctx.reply(result.error);
return;
}
this.#startPolling(ctx.chat.id);
});
This is from
src/telegram/bridge.ts
A 👍 reaction on the original message. Not a text reply – reactions are lightweight, don't clutter the chat, and communicate exactly what happened: "got it, will get to it." The user sees their message get a thumbs-up and knows it's queued without any noise in the conversation.
The other change is in the polling loop. When the bridge polls for messages and sees done: true, it used to stop polling. But now "done" might mean "the current stream finished and a queued message just started a new one." The bridge needs to check:
if (done) {
if (this.#session.streaming) {
// A queued message started a new stream – reset for the new turn
this.#resetMessage();
this.#lastCursor = 0;
} else {
this.#stopPolling();
}
}
If session.streaming is still true after done, a queued message kicked off a new stream. The bridge resets its display state – clears the response message ID, segments, and cursor – so the new stream's output renders as a fresh message. If streaming is false, the queue is empty and we stop polling as before.
Polish
Working in the bridge to add queue handling surfaced a few adjacent problems. None were in the plan. All were hard to ignore once they were visible.
Rate-limited message edits. The bridge updates the Telegram message on every poll – once per second. Each update is an API call. Telegram rate-limits edit operations, and hitting the limit causes edits to fail silently. The fix is a timestamp check:
const now = Date.now();
const shouldUpdate = done || now - this.#lastUpsertAt >= 5000;
if (shouldUpdate) {
// ... upsert message
this.#lastUpsertAt = now;
}
This is from
src/telegram/bridge.ts
Edits now happen at most every five seconds, unless the stream is done – in which case the final state always renders immediately. The stream still polls every second for responsiveness, but message edits are throttled. Less API pressure, same user experience.
The #send helper. Telegram API calls were scattered through the bridge, each with their own error handling (or none). A failed sendMessage would throw and break the polling loop. The fix was a wrapper:
async #send(
chatId: number,
text: string,
options?: Record<string, unknown>
) {
try {
return await this.#bot.api.sendMessage(chatId, text, options);
} catch {
return null;
}
}
Swallows errors, returns null on failure. Every call site that used this.#bot.api.sendMessage now uses this.#send. A failed Telegram call shouldn't crash the assistant – it should degrade gracefully. The message didn't send; move on.
Model switching. Two new commands – /cheap and /expensive – to switch between Sonnet and Opus mid-conversation. Both call resetSession() because changing the model means starting a fresh ACP session:
this.#bot.command('cheap', async (ctx) => {
this.#session.resetSession();
// ... switch to sonnet
await ctx.reply('Switched to Sonnet. Session reset.');
});
These aren't directly related to the queue feature. They shipped in the same batch because the bridge was already open for surgery. That's the pragmatic choice – when you're already changing a file, small improvements that don't complicate the diff are worth taking.
Conclusion
Before: sending a message during a stream returned a 409 error and the message was gone. The user had to wait, remember, and re-send. The bridge updated Telegram on every poll with no rate limiting. API errors in send calls could break the polling loop. Model switching required restarting the process.
After: messages sent during a stream are queued and drained automatically in order. The user sees a 👍 and moves on. Message edits are throttled to every five seconds. API errors are swallowed gracefully. /cheap and /expensive switch models with a session reset.
The queue implementation is twelve lines of meaningful code. Three lines in prompt() to push and return. Six lines in finally to drain. One property declaration. The bridge changes are similarly small – a reaction call, a streaming check in the done handler.
The design principle is simple: user input is sacred. If the system can't process it right now, it holds it. It doesn't reject it, doesn't drop it, doesn't ask the user to try again. The assistant works around its own constraints so the user doesn't have to.
That's what separates a tool from an assistant. A tool tells you it's busy. An assistant says "got it" and gets to it when it can.