事前質問例1
Streaming
For LLMs, streaming has become an increasingly popular feature. The idea is to rapidly return tokens as an LLM is generating them, rather than waiting for a full response to be created before returning anything.
Streaming is actually very easy to implement for simple use-cases, but it can get complicated when we start including things like Agents which have their own logic running which can block our attempts at streaming. Fortunately, we can make it work — it just requires a little extra effort.
We'll start easy by implementing streaming to the terminal for LLMs, but by the end of the notebook we'll be handling the more complex task of streaming via FastAPI for Agents.
First, let's install all of the libraries we'll be using.
!pip install -qU \
openai==0.28.0 \
langchain==0.0.301 \
fastapi==0.103.1 \
"uvicorn[standard]"==0.23.2
LLM Streaming to Stdout
The simplest form of streaming is to simply "print" the tokens as they're generated. To set this up we need to initialize an LLM (one that supports streaming, not all do) with two specific parameters:
streaming=True, to enable streaming
callbacks=[SomeCallBackHere()], where we pass a LangChain callback class (or list containing multiple).
The streaming parameter is self-explanatory. The callbacks parameter and callback classes less so — essentially they act as little bits of code that do something as each token from our LLM is generated. As mentioned, the simplest form of streaming is to print the tokens as they're being generated, like with the StreamingStdOutCallbackHandler.
import os
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") or "YOUR_API_KEY"
llm = ChatOpenAI(
openai_api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.0,
model_name="gpt-3.5-turbo",
streaming=True, # ! important
callbacks=[StreamingStdOutCallbackHandler()] # ! important
)
Now if we run the LLM we'll see the response being streamed.
from langchain.schema import HumanMessage
# create messages to be passed to chat LLM
messages = [HumanMessage(content="tell me a long story")]
llm(messages)
That was surprisingly easy, but things begin to get much more complicated as soon as we begin using agents. Let's first initialize an agent.
from langchain.memory import ConversationBufferWindowMemory
from langchain.agents import load_tools, AgentType, initialize_agent
# initialize conversational memory
memory = ConversationBufferWindowMemory(
memory_key="chat_history",
k=5,
return_messages=True,
output_key="output"
)
# create a single tool to see how it impacts streaming
tools = load_tools(["llm-math"], llm=llm)
# initialize the agent
agent = initialize_agent(
agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
tools=tools,
llm=llm,
memory=memory,
verbose=True,
max_iterations=3,
early_stopping_method="generate",
return_intermediate_steps=False
)
We already added our StreamingStdOutCallbackHandler to the agent as we initialized the agent with the same llm as we created with that callback. So let's see what we get when running the agent.
prompt = "Hello, how are you?"
agent(prompt)
Not bad, but we do now have the issue of streaming the entire output from the LLM. Because we're using an agent, the LLM is instructed to output the JSON format we can see here so that the agent logic can handle tool usage, multiple "thinking" steps, and so on. For example, if we ask a math question we'll see this:
agent("what is the square root of 71?")
It's interesting to see during development but we'll want to clean this streaming up a little in any actual use-case. For that we can go with two approaches — either we build a custom callback handler, or use a purpose built callback handler from LangChain (as usual, LangChain has something for everything). Let's first try LangChain's purpose-built FinalStreamingStdOutCallbackHandler.
We will overwrite the existing callbacks attribute found here:
agent.agent.llm_chain.llm
With the new callback handler:
from langchain.callbacks.streaming_stdout_final_only import (
FinalStreamingStdOutCallbackHandler,
)
agent.agent.llm_chain.llm.callbacks = [
FinalStreamingStdOutCallbackHandler(
answer_prefix_tokens=["Final", "Answer"]
)
]
Let's try it:
agent("what is the square root of 71?")
Not quite there, we should really clean up the answer_prefix_tokens argument but it is hard to get right. It's generally easier to use a custom callback handler like so:
import sys
class CallbackHandler(StreamingStdOutCallbackHandler):
def __init__(self):
self.content: str = ""
self.final_answer: bool = False
def on_llm_new_token(self, token: str, **kwargs: any) -> None:
self.content += token
if "Final Answer" in self.content:
# now we're in the final answer section, but don't print yet
self.final_answer = True
self.content = ""
if self.final_answer:
if '"action_input": "' in self.content:
if token not in ["}"]:
sys.stdout.write(token) # equal to `print(token, end="")`
sys.stdout.flush()
agent.agent.llm_chain.llm.callbacks = [CallbackHandler()]
Let's try again:
agent("what is the square root of 71?")
agent.agent.llm_chain.llm
It isn't perfect, but this is getting better. Now, in most scenarios we're unlikely to simply be printing output to a terminal or notebook. When we want to do something more complex like stream this data through another API, we need to do things differently.
Using FastAPI with Agents
In most cases we'll be placing our LLMs, Agents, etc behind something like an API. Let's add that into the mix and see how we can implement streaming for agents with FastAPI.
First, we'll create a simple main.py script to contain our FastAPI logic. You can find it in the same GitHub repo location as this notebook (here's a link).
To run the API, navigate to the directory and run uvicorn main:app --reload. Once complete, you can confirm it is running by looking for the 🤙 status in the next cell output:
import requests
res = requests.get("http://localhost:8000/health")
res.json()
{'status': '🤙'}
res = requests.get("http://localhost:8000/chat",
json={"text": "hello there!"}
)
res
<Response [200]>
res.json()
{'input': 'hello there!',
'chat_history': [],
'output': 'Hello! How can I assist you today?'}
Unlike with our StdOut streaming, we now need to send our tokens to a generator function that feeds those tokens to FastAPI via a StreamingResponse object. To handle this we need to use async code, otherwise our generator will not begin emitting anything until after generation is already complete.
The Queue is accessed by our callback handler, as as each token is generated, it puts the token into the queue. Our generator function asyncronously checks for new tokens being added to the queue. As soon as the generator sees a token has been added, it gets the token and yields it to our StreamingResponse.
To see it in action, we'll define a stream requests function called get_stream:
def get_stream(query: str):
s = requests.Session()
with s.get(
"http://localhost:8000/chat",
stream=True,
json={"text": query}
) as r:
for line in r.iter_content():
print(line.decode("utf-8"), end="")
get_stream("hi there!")
"Hello! How can I assist you today?"
アウトプット例
このプロジェクトの目的は、LLM(大規模言語モデル)におけるストリーミング機能の実装です。最初に、標準出力(Stdout)へのストリーミングを実装し、その後、FastAPIを通じてエージェントによるストリーミングを扱います。1. 必要なライブラリのインストール
以下のライブラリが必要です:
openai
langchain
fastapi
uvicorn
2. LLMストリーミングへの標準出力
最も単純な形式のストリーミングは、生成されたトークンを単に「印刷」することです。これを設定するために、特定のパラメーターを持つLLM(すべてのLLMがストリーミングをサポートするわけではありません)を初期化する必要があります:
streaming=True
:ストリーミングを有効にするcallbacks=[SomeCallBackHere()]
:LangChainコールバッククラス(または複数を含むリスト)を渡します。3. エージェントの初期化
次に、エージェントを初期化します。エージェントは、ツールの使用、複数の「思考」ステップなど、より複雑なロジックを処理するためにLLMとは異なるアプローチを必要とします。
4. ストリーミング出力のクリーンアップ
エージェントを使用すると、LLMからの出力全体がストリーミングされる問題が発生します。実際のユースケースでこのストリーミングを少しクリーンアップするために、カスタムコールバックハンドラを構築するか、LangChainが提供する目的に合ったコールバックハンドラを使用することができます。
5. FastAPIを使ったエージェントのストリーミング
LLMやエージェントをAPIの後ろに配置する場合がほとんどです。FastAPIを使用して、エージェントによるストリーミングを実装する方法を見てみましょう。
FastAPIロジックを含む単純な
main.py
スクリプトを作成します。APIを実行するには、ディレクトリに移動してuvicorn main:app --reload
を実行します。完了したら、次のセルの出力で🤙ステータスを確認して、実行中であることを確認できます。FastAPIの
StreamingResponse
オブジェクトを介してFastAPIにトークンをフィードするジェネレータ関数に、トークンを送信する必要があります。これを処理するためには、非同期コードを使用する必要があります。そうでなければ、ジェネレータは生成が完了するまで何も送信しません。キューはコールバックハンドラーによってアクセスされ、生成される各トークンがキューに入れられます。ジェネレータ関数は非同期的に新しいトークンがキューに追加されたことを確認し、トークンが追加されたと見るとすぐに、ジェネレータはトークンを取得して、
StreamingResponse
にそれを提供します。実際に動作を見るために、クエリに応じてストリームリクエストを返す
get_stream
関数を定義します。これにより、リアルタイムでデータをストリーミングし、FastAPIを通じてLLMまたはエージェントの応答をリアルタイムで取得できます。匿名
事前質問例2
このチュートリアルのシステム構成は?
アウトプット例
このチュートリアルでは、LLM(大規模言語モデル)のストリーミング応答を扱うシステムを構築しています。システムは以下のコンポーネントで構成されています:
- 大規模言語モデル(LLM):
- LLMは、ストリーミングをサポートするように設定されます。
- 生成される各トークンは、コールバックを通じてリアルタイムで処理されます。
- エージェント:
- エージェントは、ツールの使用、複数の思考ステップ、およびその他の複雑なタスクを管理します。
- エージェントは、LLMの出力をさらに処理し、より高度な対話やタスク実行を可能にします。
- コールバックハンドラー:
- LLMからの各トークンに対して何かしらのアクションを実行するために使用されます。
- 最も単純な形式は、生成される各トークンを印刷することですが、より複雑な処理が可能です。
- FastAPI:
- FastAPIは、LLMやエージェントを外部から利用可能にするためのAPIエンドポイントを提供します。
- FastAPIを使用することで、HTTPリクエストを介してLLMやエージェントの機能にアクセスできます。
- ストリーミングロジック:
- FastAPIを介してLLMやエージェントからストリーミング応答を送信するための特別なロジックが必要です。
- これには、非同期コードと特別なストリーミングレスポンスの取り扱いが含まれます。
- 開発環境:
- Pythonライブラリ(
openai
,langchain
,fastapi
,uvicorn
など)が必要です。- これらのライブラリは、LLMの操作、ストリーミングの実装、およびFastAPIを介したエンドポイントの提供をサポートします。
この構成は、ユーザーがリアルタイムでLLMやエージェントと対話できるシステムを提供します。最終的には、エージェントが生成する応答をストリーミングし、ユーザーがこれをリアルタイムで見ることができるようになります。
事前質問例3
処理フローをmermaidで
アウトプット例
以下は、このチュートリアルの処理フローを示すMermaidフローチャートです。このフローチャートは、大規模言語モデル(LLM)からのストリーミング応答を扱うシステムの主要なステップを表しています。flowchart TD A[ユーザーからのリクエスト] -->|リクエスト送信| B[FastAPI] B -->|リクエスト受信| C[エージェント初期化] C -->|エージェント設定| D[LLMとの通信] D --> E[ストリーミングパラメータ設定] E -->|streaming=True| F[トークン生成のリアルタイム監視] E -->|callbacks=[Callback]]| G[コールバックハンドラの設定] F --> H[トークンのリアルタイム処理] G --> I[特定のアクション実行] H --> J[ストリーミング応答の構築] I --> J J -->|ストリーミング応答生成| K[FastAPIを通じてユーザーに送信] K --> L[ユーザーの端末に表示]
このフローチャートは次のステップを示しています:
- ユーザーからのリクエスト: ユーザーがFastAPIを通じてリクエストを送信します。
- FastAPI: FastAPIがリクエストを受け取り、エージェントに渡します。
- エージェント初期化: エージェントが初期化され、LLMとの通信準備が行われます。
- LLMとの通信: エージェントがLLMと通信を始めます。
- ストリーミングパラメータ設定: LLMに対してストリーミングパラメータ(
streaming=True
およびcallbacks=[...]
)が設定されます。- トークン生成のリアルタイム監視: LLMがトークンをリアルタイムで生成し、監視します。
- コールバックハンドラの設定: 特定のアクションや処理を行うためのコールバックハンドラが設定されます。
- トークンのリアルタイム処理: 生成された各トークンがリアルタイムで処理されます。
- 特定のアクション実行: コールバックハンドラが特定のアクションを実行します。
- ストリーミング応答の構築: ストリーミング応答が構築されます。
- FastAPIを通じてユーザーに送信: 構築されたストリーミング応答がFastAPIを通じてユーザーに送信されます。
- ユーザーの端末に表示: ユーザーの端末にストリーミング応答が表示されます。
質問例
処理フローをmermaidでシーケンス図にしてください
アウトプット例
以下は、このチュートリアルの処理フローを表すMermaidシーケンス図です。このシーケンス図は、LLMのストリーミング応答を扱うシステムの各コンポーネント間の相互作用を時系列に沿って示しています。
sequenceDiagram participant ユーザー participant FastAPI participant エージェント participant LLM participant コールバックハンドラ ユーザー->>FastAPI: リクエスト送信 FastAPI->>エージェント: リクエスト受信 activate エージェント エージェント->>LLM: LLMの設定(streaming=True, callbacks=[...]) activate LLM LLM->>コールバックハンドラ: トークン生成のリアルタイム監視 activate コールバックハンドラ loop トークン生成中 コールバックハンドラ->>LLM: トークン処理 end コールバックハンドラ->>FastAPI: ストリーミング応答の構築 FastAPI->>ユーザー: ストリーミング応答送信 deactivate コールバックハンドラ deactivate LLM deactivate エージェント
このシーケンス図は、以下の相互作用を示しています:
- ユーザーからFastAPIへのリクエスト送信: ユーザーがFastAPIにリクエストを送信します。
- FastAPIからエージェントへのリクエスト受信: FastAPIがリクエストを受け取り、エージェントに渡します。
- エージェントからLLMへの設定: エージェントがLLMにストリーミングパラメータ(
streaming=True
およびcallbacks=[...]
)を設定します。- LLMからコールバックハンドラへのトークン生成のリアルタイム監視: LLMがトークンをリアルタイムで生成し、コールバックハンドラに通知します。
- コールバックハンドラによるトークンのリアルタイム処理: コールバックハンドラが生成された各トークンをリアルタイムで処理します。
- コールバックハンドラからFastAPIへのストリーミング応答の構築: コールバックハンドラがストリーミング応答を構築し、FastAPIに送信します。
- FastAPIからユーザーへのストリーミング応答送信: FastAPIが構築されたストリーミング応答をユーザーに送信します。
追加質問例1
participantは、ユーザー、フロントエンド、バックエンド、LLMとしてください。
アウトプット例
以下は、指定された参加者(ユーザー、フロントエンド、バックエンド、LLM)を含む、システムの処理フローを示すMermaidシーケンス図です。
sequenceDiagram participant ユーザー participant フロントエンド participant バックエンド participant LLM ユーザー->>フロントエンド: リクエスト送信 フロントエンド->>バックエンド: リクエスト転送 activate バックエンド バックエンド->>LLM: ストリーミングリクエスト activate LLM loop トークン生成中 LLM-->>バックエンド: ストリーミングトークン バックエンド-->>フロントエンド: ストリーミングデータ end フロントエンド->>ユーザー: ストリーミング応答送信 deactivate LLM deactivate バックエンド
このシーケンス図は以下のステップを示しています:
- ユーザーからフロントエンドへのリクエスト送信: ユーザーがフロントエンドにリクエストを送信します。
- フロントエンドからバックエンドへのリクエスト転送: フロントエンドがリクエストを受け取り、バックエンドに転送します。
- バックエンドからLLMへのストリーミングリクエスト: バックエンドがLLMに対してストリーミングリクエストを送信します。
- LLMからバックエンドへのストリーミングトークン送信: LLMが生成する各トークンをリアルタイムでバックエンドに送信します。
- バックエンドからフロントエンドへのストリーミングデータ転送: バックエンドが受け取ったストリーミングデータをリアルタイムでフロントエンドに転送します。
- フロントエンドからユーザーへのストリーミング応答送信: フロントエンドが構築されたストリーミング応答をリアルタイムでユーザーに送信します。
追加質問例2
participantは、ユーザー、フロントエンド、バックエンド、LLMとしたまま
処理内容として使用しているAPIがわかるように記載してください、
ユーザーからFastAPIへのリクエスト送信: ユーザーがFastAPIにリクエストを送信します。
FastAPIからエージェントへのリクエスト受信: FastAPIがリクエストを受け取り、エージェントに渡します。
エージェントからLLMへの設定: エージェントがLLMにストリーミングパラメータ(streaming=Trueおよびcallbacks=[...])を設定します。
LLMからコールバックハンドラへのトークン生成のリアルタイム監視: LLMがトークンをリアルタイムで生成し、コールバックハンドラに通知します。
コールバックハンドラによるトークンのリアルタイム処理: コールバックハンドラが生成された各トークンをリアルタイムで処理します。
コールバックハンドラからFastAPIへのストリーミング応答の構築: コールバックハンドラがストリーミング応答を構築し、FastAPIに送信します。
FastAPIからユーザーへのストリーミング応答送信: FastAPIが構築されたストリーミング応答をユーザーに送信します。
アウトプット例
以下は、指定された参加者(ユーザー、フロントエンド、バックエンド、LLM)を使用し、使用しているAPIがわかるように処理内容を記載したMermaidシーケンス図です。
sequenceDiagram participant ユーザー participant フロントエンド as FastAPI participant バックエンド as エージェント participant LLM participant コールバックハンドラ ユーザー->>+フロントエンド: リクエスト送信 (/chat) フロントエンド->>+バックエンド: リクエスト転送 (/initialize-agent) バックエンド->>+LLM: ストリーミングパラメータ設定 (streaming=True, callbacks=[...]) LLM->>+コールバックハンドラ: トークン生成のリアルタイム監視 loop トークン生成中 コールバックハンドラ->>LLM: トークン処理 コールバックハンドラ->>フロントエンド: ストリーミング応答の構築 (/streaming-response) end フロントエンド->>ユーザー: ストリーミング応答送信 deactivate コールバックハンドラ deactivate LLM deactivate バックエンド deactivate フロントエンド
このシーケンス図は以下のステップを示しています:
- ユーザーからFastAPIへのリクエスト送信 (
/chat
): ユーザーがFastAPIにリクエストを送信します。- FastAPIからエージェントへのリクエスト転送 (
/initialize-agent
): FastAPIがリクエストを受け取り、エージェントに転送します。- エージェントからLLMへのストリーミングパラメータ設定 (
streaming=True
,callbacks=[...]
): エージェントがLLMにストリーミングパラメータを設定します。- LLMからコールバックハンドラへのトークン生成のリアルタイム監視: LLMがトークンをリアルタイムで生成し、コールバックハンドラに通知します。
- コールバックハンドラによるトークンのリアルタイム処理: コールバックハンドラが生成された各トークンをリアルタイムで処理し、その結果をFastAPIに送信します (
/streaming-response
)。- FastAPIからユーザーへのストリーミング応答送信: FastAPIが構築されたストリーミング応答をリアルタイムでユーザーに送信します。