Comments (3)
Oh nice! This is great - thanks for sharing.
from ai.
Yeah, what I ended up doing was waiting for the "textDone" event and manually getting the citations to send back in a sendDataMessage
. I then merge the role="assistant" messages the with the role="data" messages that directly follow client side.
// api/assistant/route.ts
import { AssistantResponse } from "ai";
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY || "",
});
export const runtime = "edge";
export const dynamic = "force-dynamic";
export async function POST(req: Request) {
const input: {
threadId: string | null;
message: string;
} = await req.json();
const threadId = input.threadId ?? (await openai.beta.threads.create({})).id;
const createdMessage = await openai.beta.threads.messages.create(threadId, {
role: "user",
content: input.message,
});
return AssistantResponse(
{ threadId, messageId: createdMessage.id },
async ({ forwardStream, sendDataMessage }) => {
let finalText = "";
let citations: string[] = [];
const runStream = openai.beta.threads.runs.stream(threadId, {
assistant_id:
process.env.ASSISTANT_ID ??
(() => {
throw new Error("ASSISTANT_ID is not set");
})(),
});
let textDonePromise = new Promise<void>((resolve) => {
runStream.on(
"textDone",
async (text: OpenAI.Beta.Threads.Messages.Text) => {
const { annotations } = text;
let updatedText = text.value;
const updatedCitations: string[] = [];
let index = 0;
for (let annotation of annotations) {
updatedText = updatedText.replace(
annotation.text,
"[" + index + "]",
);
// @ts-ignore
const { file_citation } = annotation;
if (file_citation) {
const citedFile = await openai.files.retrieve(
file_citation.file_id,
);
updatedCitations.push("[" + index + "]" + citedFile.filename);
}
index++;
}
finalText = updatedText;
citations = updatedCitations;
resolve();
},
);
});
await forwardStream(runStream);
await textDonePromise;
// Send the modified text and citations back to the client
if (citations.length > 0) {
sendDataMessage({
data: {
finalText,
citations,
},
role: "data",
});
}
},
);
}
This def isn't the best way to do this, but was the quickest thing I could get working. I'd love to know the better solutions if anyone out there has them.
from ai.
Awesome thanks! I'll show you what I did:
I created a new AssistantResponse class
import { AssistantMessage, DataMessage, formatStreamPart } from "ai";
import { AssistantStream } from "openai/lib/AssistantStream";
import { Run } from "openai/resources/beta/threads/runs/runs";
/**
You can pass the thread and the latest message into the `AssistantResponse`. This establishes the context for the response.
*/
type AssistantResponseSettings = {
/**
The thread ID that the response is associated with.
*/
threadId: string;
/**
The ID of the latest message that the response is associated with.
*/
messageId: string;
};
/**
The process parameter is a callback in which you can run the assistant on threads, and send messages and data messages to the client.
*/
type AssistantResponseCallback = (options: {
/**
@deprecated use variable from outer scope instead.
*/
threadId: string;
/**
@deprecated use variable from outer scope instead.
*/
messageId: string;
/**
Forwards an assistant message (non-streaming) to the client.
*/
sendMessage: (message: AssistantMessage) => void;
/**
Send a data message to the client. You can use this to provide information for rendering custom UIs while the assistant is processing the thread.
*/
sendDataMessage: (message: DataMessage) => void;
/**
Forwards the assistant response stream to the client. Returns the `Run` object after it completes, or when it requires an action.
*/
forwardStream: (stream: AssistantStream) => Promise<Run | undefined>;
onAnnotation: (
fn: (
index: number,
annotations: MessageAnnotation,
) => Promise<{ newText: string }>,
) => void;
}) => Promise<void>;
export interface MessageAnnotation {
text: string;
file_id: string;
start_index: number;
end_index: number;
}
/**
The `AssistantResponse` allows you to send a stream of assistant update to `useAssistant`.
It is designed to facilitate streaming assistant responses to the `useAssistant` hook.
It receives an assistant thread and a current message, and can send messages and data messages to the client.
*/
export function AssistantResponse2(
{ threadId, messageId }: AssistantResponseSettings,
process: AssistantResponseCallback,
): Response {
let annotationIndex = 0;
const stream = new ReadableStream({
async start(controller) {
const textEncoder = new TextEncoder();
const sendMessage = (message: AssistantMessage) => {
controller.enqueue(
textEncoder.encode(formatStreamPart("assistant_message", message)),
);
};
const sendDataMessage = (message: DataMessage) => {
controller.enqueue(
textEncoder.encode(formatStreamPart("data_message", message)),
);
};
const sendError = (errorMessage: string) => {
controller.enqueue(
textEncoder.encode(formatStreamPart("error", errorMessage)),
);
};
let annotationCallbackFn:
| undefined
| ((
index: number,
annotations: MessageAnnotation,
) => Promise<{
newText: string;
}>);
const forwardStream = async (stream: AssistantStream) => {
let result: Run | undefined = undefined;
for await (const value of stream) {
switch (value.event) {
case "thread.message.created": {
controller.enqueue(
textEncoder.encode(
formatStreamPart("assistant_message", {
id: value.data.id,
role: "assistant",
content: [{ type: "text", text: { value: "" } }],
}),
),
);
break;
}
case "thread.message.delta": {
const content = value.data.delta.content?.[0];
// also handle annotations
if (
content &&
content.type == "text" &&
content.text &&
content.text.value
) {
const annotations = content.text.annotations;
const text = content.text.value;
// queue up the message annotations
if (annotations && Array.isArray(annotations)) {
for (let annotation of annotations) {
if (annotation.type === "file_citation") {
if (!annotationCallbackFn) {
console.log(
"annotations: no annotation callback function",
);
continue;
}
// get the next text annotation
const { newText } = await annotationCallbackFn(
annotationIndex,
{
text,
file_id: annotation.file_citation?.file_id as string,
start_index: annotation.start_index as number,
end_index: annotation.end_index as number,
},
);
// increment the annotation index
annotationIndex++;
controller.enqueue(
textEncoder.encode(
formatStreamPart(
"text",
content.text.value.replace(text, newText),
),
),
);
}
}
} else {
controller.enqueue(
textEncoder.encode(
formatStreamPart("text", content.text.value),
),
);
}
}
break;
}
case "thread.run.completed":
case "thread.run.requires_action": {
result = value.data;
break;
}
}
}
return result;
};
// send the threadId and messageId as the first message:
controller.enqueue(
textEncoder.encode(
formatStreamPart("assistant_control_data", {
threadId,
messageId,
}),
),
);
try {
await process({
threadId,
messageId,
sendMessage,
sendDataMessage,
forwardStream,
onAnnotation: (fn) => {
console.log("annotations: setting annotation callback function");
annotationCallbackFn = fn;
},
});
} catch (error) {
sendError((error as any).message ?? `${error}`);
} finally {
controller.close();
}
},
pull(controller) {},
cancel() {},
});
return new Response(stream, {
status: 200,
headers: {
"Content-Type": "text/plain; charset=utf-8",
},
});
}
Notice how I'm handling thread.message.delta.
Then it's being used like this:
return AssistantResponse2(
{ threadId, messageId: createdMessage.id },
async ({ forwardStream, sendDataMessage, onAnnotation }) => {
if (!threadId) {
throw new Error("no threadId");
}
// Run the assistant on the thread
const runStream = openai.beta.threads.runs.stream(threadId, {
assistant_id: OPENAI_CHAT_ASSISTANT_ID,
});
console.log("annotations: before onAnnotations");
onAnnotation(async (index, annotation: MessageAnnotation) => {
// transform annotation here
return {
newText: `...`,
};
});
// forward run status would stream message deltas
let runResult = await forwardStream(runStream);
while (
runResult?.status === "requires_action" &&
runResult.required_action?.type === "submit_tool_outputs"
) {
// WE DON'T HAVE ANY TOOLS YET
const tool_outputs =
runResult.required_action.submit_tool_outputs.tool_calls.map(
(toolCall: any) => {
const parameters = JSON.parse(toolCall.function.arguments);
switch (toolCall.function.name) {
default:
throw new Error(
`Unknown tool call function: ${toolCall.function.name}`,
);
}
},
);
runResult = await forwardStream(
openai.beta.threads.runs.submitToolOutputsStream(
threadId,
runResult.id,
{ tool_outputs },
),
);
}
},
);
Notice the new 'onAnnotation' helper.
from ai.
Related Issues (20)
- Rendering a mock stream so that avoiding unecessary API calls HOT 2
- Ability to access request headers/metadata
- Append for useAssistant HOT 7
- handleSubmit not being called conditionally
- TypeScript errors with Node HOT 4
- Feature Request: Support for Multiple Completions in OpenAI API Integration
- Using OpenAI SDK with Ollama HOT 2
- Using Antropic with the new Generative UI HOT 3
- Option to still disable data streaming if I just need the text HOT 1
- Expose a `OpenAIAssistantStream` function
- Super weird bug (not updating) only on Vercel / deployed code HOT 2
- [AI Core] Abort signal causes `BodyStreamBuffer was aborted` HOT 1
- Pages Router AI SDK 404 error HOT 2
- Error in Gemini Chat Template HOT 3
- JSON parsing error - possible fixes HOT 1
- useAIState with key not correctly inferring type
- [AsistantResponse] - to save streaming data to external database HOT 8
- Router cache issue with NextJS and Vercel AI SDK
- Omit/Modify Conversation History When Triggering Request in useChat
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
š Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ššš
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ā¤ļø Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from ai.