feat(agent): Add history compression to increase longevity and efficiency

* Compress steps in the prompt to reduce token usage, and to increase longevity when using models with limited context windows
* Move multiple copies of step formatting code to `Episode.format` method
* Add `EpisodicActionHistory.handle_compression` method to handle compression of new steps
This commit is contained in:
Reinier van der Leer
2024-01-31 17:51:45 +01:00
parent 55433f468a
commit 66e0c87894
3 changed files with 77 additions and 40 deletions

View File

@@ -274,6 +274,9 @@ class Agent(
# Update action history
self.event_history.register_result(result)
await self.event_history.handle_compression(
self.llm_provider, self.legacy_config
)
return result

View File

@@ -264,25 +264,16 @@ class OneShotAgentPromptStrategy(PromptStrategy):
steps: list[str] = []
tokens: int = 0
# start: int = len(episode_history)
n_episodes = len(episode_history)
for i, c in reversed(list(enumerate(episode_history))):
step = f"### Step {i+1}: Executed `{c.action.format_call()}`\n"
step += f'- **Reasoning:** "{c.action.reasoning}"\n'
step += (
f"- **Status:** `{c.result.status if c.result else 'did_not_finish'}`\n"
)
if c.result:
if c.result.status == "success":
result = str(c.result)
result = "\n" + indent(result) if "\n" in result else result
step += f"- **Output:** {result}"
elif c.result.status == "error":
step += f"- **Reason:** {c.result.reason}\n"
if c.result.error:
step += f"- **Error:** {c.result.error}\n"
elif c.result.status == "interrupted_by_human":
step += f"- **Feedback:** {c.result.feedback}\n"
for i, episode in enumerate(reversed(episode_history)):
# Use full format for the latest 4 steps, summary or format for older steps
if i < 4 or episode.summary is None:
step_content = indent(episode.format(), 2).strip()
else:
step_content = episode.summary
step = f"* Step {n_episodes - i}: {step_content}"
if max_tokens and count_tokens:
step_tokens = count_tokens(step)
@@ -291,10 +282,6 @@ class OneShotAgentPromptStrategy(PromptStrategy):
tokens += step_tokens
steps.insert(0, step)
# start = i
# # TODO: summarize remaining
# part = slice(0, start)
return "\n\n".join(steps)

View File

@@ -1,11 +1,17 @@
from __future__ import annotations
from typing import Any, Iterator, Literal, Optional
import asyncio
from typing import TYPE_CHECKING, Any, Iterator, Literal, Optional
from pydantic import BaseModel, Field
from autogpt.processing.text import summarize_text
from autogpt.prompts.utils import format_numbered_list, indent
if TYPE_CHECKING:
from autogpt.config.config import Config
from autogpt.core.resource.model_providers import ChatModelProvider
class Action(BaseModel):
name: str
@@ -84,6 +90,27 @@ ActionResult = ActionSuccessResult | ActionErrorResult | ActionInterruptedByHuma
class Episode(BaseModel):
action: Action
result: ActionResult | None
summary: str | None = None
def format(self):
step = f"Executed `{self.action.format_call()}`\n"
step += f'- **Reasoning:** "{self.action.reasoning}"\n'
step += (
"- **Status:** "
f"`{self.result.status if self.result else 'did_not_finish'}`\n"
)
if self.result:
if self.result.status == "success":
result = str(self.result)
result = "\n" + indent(result) if "\n" in result else result
step += f"- **Output:** {result}"
elif self.result.status == "error":
step += f"- **Reason:** {self.result.reason}\n"
if self.result.error:
step += f"- **Error:** {self.result.error}\n"
elif self.result.status == "interrupted_by_human":
step += f"- **Feedback:** {self.result.feedback}\n"
return step
def __str__(self) -> str:
executed_action = f"Executed `{self.action.format_call()}`"
@@ -96,6 +123,7 @@ class EpisodicActionHistory(BaseModel):
episodes: list[Episode] = Field(default_factory=list)
cursor: int = 0
_lock = asyncio.Lock()
@property
def current_episode(self) -> Episode | None:
@@ -148,29 +176,48 @@ class EpisodicActionHistory(BaseModel):
self.episodes = self.episodes[:-number_of_episodes]
self.cursor = len(self.episodes)
async def handle_compression(
self, llm_provider: ChatModelProvider, app_config: Config
) -> None:
"""Compresses each episode in the action history using a ChatModelProvider.
This method iterates over all episodes in the action history, uses a ChatModelProvider
to generate a summary for each episode, and then sets the `summary` property of the episode.
"""
compress_instruction = (
"The text represents an action, the reason for its execution, "
"and its result. "
"Condense the action taken and its result into one line. "
"Preserve any specific factual information gathered by the action."
)
async with self._lock:
# Gather all episodes without a summary
episodes_to_summarize = [ep for ep in self.episodes if ep.summary is None]
# Parallelize summarization calls
summarize_coroutines = [
summarize_text(
episode.format(),
instruction=compress_instruction,
llm_provider=llm_provider,
config=app_config,
)
for episode in episodes_to_summarize
]
summaries = await asyncio.gather(*summarize_coroutines)
# Assign summaries to episodes
for episode, (summary, _) in zip(episodes_to_summarize, summaries):
episode.summary = summary
def fmt_list(self) -> str:
return format_numbered_list(self.episodes)
def fmt_paragraph(self) -> str:
steps: list[str] = []
for i, c in enumerate(self.episodes, 1):
step = f"### Step {i}: Executed `{c.action.format_call()}`\n"
step += f'- **Reasoning:** "{c.action.reasoning}"\n'
step += (
f"- **Status:** `{c.result.status if c.result else 'did_not_finish'}`\n"
)
if c.result:
if c.result.status == "success":
result = str(c.result)
result = "\n" + indent(result) if "\n" in result else result
step += f"- **Output:** {result}"
elif c.result.status == "error":
step += f"- **Reason:** {c.result.reason}\n"
if c.result.error:
step += f"- **Error:** {c.result.error}\n"
elif c.result.status == "interrupted_by_human":
step += f"- **Feedback:** {c.result.feedback}\n"
for i, episode in enumerate(self.episodes, 1):
step = f"### Step {i}: {episode.format()}\n"
steps.append(step)