Coder Social home page Coder Social logo

Comments (3)

luhart avatar luhart commented on May 26, 2024 1

Oh nice! This is great - thanks for sharing.

from ai.

luhart avatar luhart commented on May 26, 2024

Yeah, what I ended up doing was waiting for the "textDone" event and manually getting the citations to send back in a sendDataMessage. I then merge the role="assistant" messages the with the role="data" messages that directly follow client side.

// api/assistant/route.ts
import { AssistantResponse } from "ai";
import OpenAI from "openai";

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY || "",
});

export const runtime = "edge";
export const dynamic = "force-dynamic";

export async function POST(req: Request) {
  const input: {
    threadId: string | null;
    message: string;
  } = await req.json();

  const threadId = input.threadId ?? (await openai.beta.threads.create({})).id;

  const createdMessage = await openai.beta.threads.messages.create(threadId, {
    role: "user",
    content: input.message,
  });

  return AssistantResponse(
    { threadId, messageId: createdMessage.id },
    async ({ forwardStream, sendDataMessage }) => {
      let finalText = "";
      let citations: string[] = [];

      const runStream = openai.beta.threads.runs.stream(threadId, {
        assistant_id:
          process.env.ASSISTANT_ID ??
          (() => {
            throw new Error("ASSISTANT_ID is not set");
          })(),
      });

      let textDonePromise = new Promise<void>((resolve) => {
        runStream.on(
          "textDone",
          async (text: OpenAI.Beta.Threads.Messages.Text) => {
            const { annotations } = text;
            let updatedText = text.value;
            const updatedCitations: string[] = [];

            let index = 0;
            for (let annotation of annotations) {
              updatedText = updatedText.replace(
                annotation.text,
                "[" + index + "]",
              );
              // @ts-ignore
              const { file_citation } = annotation;
              if (file_citation) {
                const citedFile = await openai.files.retrieve(
                  file_citation.file_id,
                );
                updatedCitations.push("[" + index + "]" + citedFile.filename);
              }
              index++;
            }

            finalText = updatedText;
            citations = updatedCitations;

            resolve();
          },
        );
      });

      await forwardStream(runStream);
      await textDonePromise;

      // Send the modified text and citations back to the client
      if (citations.length > 0) {
        sendDataMessage({
          data: {
            finalText,
            citations,
          },
          role: "data",
        });
      }
    },
  );
}

This def isn't the best way to do this, but was the quickest thing I could get working. I'd love to know the better solutions if anyone out there has them.

from ai.

camwest avatar camwest commented on May 26, 2024

Awesome thanks! I'll show you what I did:

I created a new AssistantResponse class

import { AssistantMessage, DataMessage, formatStreamPart } from "ai";
import { AssistantStream } from "openai/lib/AssistantStream";
import { Run } from "openai/resources/beta/threads/runs/runs";

/**
You can pass the thread and the latest message into the `AssistantResponse`. This establishes the context for the response.
 */
type AssistantResponseSettings = {
  /**
  The thread ID that the response is associated with.
     */
  threadId: string;

  /**
  The ID of the latest message that the response is associated with.
   */
  messageId: string;
};

/**
  The process parameter is a callback in which you can run the assistant on threads, and send messages and data messages to the client.
   */
type AssistantResponseCallback = (options: {
  /**
  @deprecated use variable from outer scope instead.
     */
  threadId: string;

  /**
  @deprecated use variable from outer scope instead.
     */
  messageId: string;

  /**
  Forwards an assistant message (non-streaming) to the client.
     */
  sendMessage: (message: AssistantMessage) => void;

  /**
  Send a data message to the client. You can use this to provide information for rendering custom UIs while the assistant is processing the thread.
   */
  sendDataMessage: (message: DataMessage) => void;

  /**
  Forwards the assistant response stream to the client. Returns the `Run` object after it completes, or when it requires an action.
     */
  forwardStream: (stream: AssistantStream) => Promise<Run | undefined>;

  onAnnotation: (
    fn: (
      index: number,
      annotations: MessageAnnotation,
    ) => Promise<{ newText: string }>,
  ) => void;
}) => Promise<void>;

export interface MessageAnnotation {
  text: string;
  file_id: string;
  start_index: number;
  end_index: number;
}

/**
  The `AssistantResponse` allows you to send a stream of assistant update to `useAssistant`.
  It is designed to facilitate streaming assistant responses to the `useAssistant` hook.
  It receives an assistant thread and a current message, and can send messages and data messages to the client.
   */
export function AssistantResponse2(
  { threadId, messageId }: AssistantResponseSettings,
  process: AssistantResponseCallback,
): Response {
  let annotationIndex = 0;

  const stream = new ReadableStream({
    async start(controller) {
      const textEncoder = new TextEncoder();

      const sendMessage = (message: AssistantMessage) => {
        controller.enqueue(
          textEncoder.encode(formatStreamPart("assistant_message", message)),
        );
      };

      const sendDataMessage = (message: DataMessage) => {
        controller.enqueue(
          textEncoder.encode(formatStreamPart("data_message", message)),
        );
      };

      const sendError = (errorMessage: string) => {
        controller.enqueue(
          textEncoder.encode(formatStreamPart("error", errorMessage)),
        );
      };

      let annotationCallbackFn:
        | undefined
        | ((
            index: number,
            annotations: MessageAnnotation,
          ) => Promise<{
            newText: string;
          }>);

      const forwardStream = async (stream: AssistantStream) => {
        let result: Run | undefined = undefined;

        for await (const value of stream) {
          switch (value.event) {
            case "thread.message.created": {
              controller.enqueue(
                textEncoder.encode(
                  formatStreamPart("assistant_message", {
                    id: value.data.id,
                    role: "assistant",
                    content: [{ type: "text", text: { value: "" } }],
                  }),
                ),
              );
              break;
            }

            case "thread.message.delta": {
              const content = value.data.delta.content?.[0];

              // also handle annotations
              if (
                content &&
                content.type == "text" &&
                content.text &&
                content.text.value
              ) {
                const annotations = content.text.annotations;
                const text = content.text.value;

                // queue up the message annotations
                if (annotations && Array.isArray(annotations)) {
                  for (let annotation of annotations) {
                    if (annotation.type === "file_citation") {
                      if (!annotationCallbackFn) {
                        console.log(
                          "annotations: no annotation callback function",
                        );
                        continue;
                      }

                      // get the next text annotation
                      const { newText } = await annotationCallbackFn(
                        annotationIndex,
                        {
                          text,
                          file_id: annotation.file_citation?.file_id as string,
                          start_index: annotation.start_index as number,
                          end_index: annotation.end_index as number,
                        },
                      );

                      // increment the annotation index
                      annotationIndex++;

                      controller.enqueue(
                        textEncoder.encode(
                          formatStreamPart(
                            "text",
                            content.text.value.replace(text, newText),
                          ),
                        ),
                      );
                    }
                  }
                } else {
                  controller.enqueue(
                    textEncoder.encode(
                      formatStreamPart("text", content.text.value),
                    ),
                  );
                }
              }

              break;
            }

            case "thread.run.completed":
            case "thread.run.requires_action": {
              result = value.data;
              break;
            }
          }
        }

        return result;
      };

      // send the threadId and messageId as the first message:
      controller.enqueue(
        textEncoder.encode(
          formatStreamPart("assistant_control_data", {
            threadId,
            messageId,
          }),
        ),
      );

      try {
        await process({
          threadId,
          messageId,
          sendMessage,
          sendDataMessage,
          forwardStream,
          onAnnotation: (fn) => {
            console.log("annotations: setting annotation callback function");
            annotationCallbackFn = fn;
          },
        });
      } catch (error) {
        sendError((error as any).message ?? `${error}`);
      } finally {
        controller.close();
      }
    },
    pull(controller) {},
    cancel() {},
  });

  return new Response(stream, {
    status: 200,
    headers: {
      "Content-Type": "text/plain; charset=utf-8",
    },
  });
}

Notice how I'm handling thread.message.delta.

Then it's being used like this:

  return AssistantResponse2(
    { threadId, messageId: createdMessage.id },
    async ({ forwardStream, sendDataMessage, onAnnotation }) => {
      if (!threadId) {
        throw new Error("no threadId");
      }
      // Run the assistant on the thread
      const runStream = openai.beta.threads.runs.stream(threadId, {
        assistant_id: OPENAI_CHAT_ASSISTANT_ID,
      });

      console.log("annotations: before onAnnotations");
      onAnnotation(async (index, annotation: MessageAnnotation) => {
               // transform annotation here
        return {
          newText: `...`,
        };
      });

      // forward run status would stream message deltas
      let runResult = await forwardStream(runStream);

      while (
        runResult?.status === "requires_action" &&
        runResult.required_action?.type === "submit_tool_outputs"
      ) {
        // WE DON'T HAVE ANY TOOLS YET
        const tool_outputs =
          runResult.required_action.submit_tool_outputs.tool_calls.map(
            (toolCall: any) => {
              const parameters = JSON.parse(toolCall.function.arguments);

              switch (toolCall.function.name) {
                default:
                  throw new Error(
                    `Unknown tool call function: ${toolCall.function.name}`,
                  );
              }
            },
          );

        runResult = await forwardStream(
          openai.beta.threads.runs.submitToolOutputsStream(
            threadId,
            runResult.id,
            { tool_outputs },
          ),
        );
      }
    },
  );

Notice the new 'onAnnotation' helper.

from ai.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    šŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. šŸ“ŠšŸ“ˆšŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ā¤ļø Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.