Agent memory
By default, an
Agent is stateless: each run starts from a blank conversation. MemoryStore gives an agent continuity across runs by persisting both the conversation transcript and arbitrary path-addressed artifacts to a flyte.io.Dir. This is what lets a scheduled or webhook-driven agent remember what it did last time.
Use cases:
- An “inbox triage” agent that recalls which threads it has already responded to.
- A research agent that builds up a scratchpad over many days.
- Any sleep/wake pattern where the agent wakes on a schedule and resumes prior context.
What a MemoryStore holds
A MemoryStore combines two complementary stores backed by a working directory:
messages— the live LLM conversation transcript, managed by the agent. Mutate it only viaappend()/extend().- Path-addressed files — arbitrary named blobs under the working root. Read and write them with
read_text/write_text/read_json/write_json/list_paths.
The on-disk layout under the root looks like:
<root>/messages.json # transcript
<root>/<your/path>.{txt,json,…} # path-addressed entries
<root>/meta/<encoded_path>.json # per-entry metadata (sha, actor, ts)
<root>/audit/log.jsonl # opt-in audit trail
<root>/versions/<encoded_path>/<ts>_<sha>.txt # opt-in version historySync vs async
The path-addressed I/O methods (read_text, read_json, write_text, write_json, get_meta, current_sha) and the lifecycle methods (create, get_or_create, save) are sync-by-default with an .aio(...) companion. Inside async def tasks, use the .aio form.
Keyed stores: the easy path
For durable agent memory, use a keyed store. MemoryStore.get_or_create(key=...) loads an existing store if present, otherwise creates a new one, saving to a deterministic blob-store namespace under the active Flyte raw-data root:
{storage_root}/agents/memory-store/v0/{org}/{project}/{domain}/{key}First, define the agent. Here it’s a small research assistant with a single, stateless web_search tool — its continuity comes from memory, not from the tool:
@env.task
async def web_search(query: str, max_results: int = 3) -> list[dict[str, str]]:
"""Search the web for `query` and return the top matching results.
A stateless tool — it knows nothing about the agent's memory. But because the
results it returns are recorded in the conversation transcript, the agent can
recall or build on them in a later run without searching again.
This stub returns canned results so the example runs offline. In a real
agent, replace it with a call to a search API (Tavily, Brave, SerpAPI, …);
keeping it an `@env.task` makes each search durable, retryable, and
observable in the dashboard.
"""
return [
{
"title": f"{query.title()} — overview ({i + 1})",
"url": f"https://example.com/?q={query.replace(' ', '+')}&r={i + 1}",
"snippet": f"Key point #{i + 1} about {query}.",
}
for i in range(max_results)
]
agent = Agent(
name="memory-assistant",
instructions=(
"You are a personal research assistant with long-term memory. You "
"remember what the user is working on and the facts they share, because "
"your prior conversation transcript is always available. Use web_search "
"to look things up, and reuse earlier findings from the conversation "
"instead of searching again when you already have the answer."
),
model="claude-haiku-4-5",
tools=[web_search],
max_turns=12,
)
Reuse the same key across runs to keep continuity. The chat task below picks up where the previous run left off (see the
full example for the TaskEnvironment setup):
@env.task(report=True)
async def chat(message: str, memory_key: str = MEMORY_KEY) -> str:
"""One conversation turn that picks up where the last run left off."""
# Load (or create) the keyed store; restores the prior transcript.
memory = await MemoryStore.get_or_create.aio(key=memory_key)
flyte.logger.info("Restored %d prior messages from memory.", len(memory.messages))
# Memory is passed in per call (not attached to the agent). The prior
# transcript is prepended to the conversation and this turn is appended back
# onto the store, which is also returned on result.memory.
result = await agent.run.aio(message, memory=memory)
# Saving is explicit — run never persists on its own. Write the updated
# transcript back to the deterministic keyed remote path.
await memory.save.aio()
return result.summary or result.error
The agent has no note-taking tools. Continuity comes entirely from the persisted transcript, and it remembers two kinds of things for free: the facts the user shares and the results its tools return. The first run records both in messages.json; a later run with the same memory_key reloads and prepends them, so the agent recalls earlier context — and reuses prior web_search findings instead of searching again. That is the core value of MemoryStore — no extra plumbing required.
Working with a MemoryStore independently
Beyond the transcript, you can persist structured artifacts under arbitrary paths in the same store. This is optional — most agents get all the continuity they need from the transcript above — but it’s useful when you want durable, queryable state (a scratchpad, a dedupe ledger, intermediate results).
A flyte task can commit its own artifact by loading the keyed store, read-modify-writing a path-addressed file, and calling save(). Every write is recorded in a metadata sidecar (sha256, actor, timestamp) and, by default, appended to an audit log:
from flyte.ai.agents import MemoryStore
MEMORY_KEY = "my-assistant"
NOTES_PATH = "notes/notes.json"
@env.task
async def add_note(note: str) -> str:
"""A tool that commits its own artifact to the keyed store."""
memory = await MemoryStore.get_or_create.aio(key=MEMORY_KEY)
notes = await memory.read_json.aio(NOTES_PATH, default=[])
notes.append(note)
await memory.write_json.aio(NOTES_PATH, notes, reason="agent note")
await memory.save.aio() # commit the artifact to the keyed remote path
return f"Noted: {note}"Artifacts live on independent paths (e.g. notes/notes.json) from the transcript (messages.json), so they never collide. But when a tool writes to the same keyed store that the orchestrator also saves, the orchestrator’s working copy goes stale mid-run. Reload the store with get_or_create after agent.run, carry over the updated transcript (reloaded.messages = result.memory.messages), and save once — otherwise the orchestrator’s final save re-uploads a stale copy and clobbers the tool’s artifact.
Optimistic concurrency
When several tasks or agents share one keyed store (e.g. parallel tool calls, or a sleep/wake pattern), guard against lost updates by passing expected_sha=. The write succeeds only if the current content still matches; otherwise it raises ConcurrencyError:
from flyte.ai.agents import ConcurrencyError
notes = await memory.read_json.aio("notes/notes.json", default=[])
sha = await memory.current_sha.aio("notes/notes.json")
notes.append(note)
try:
await memory.write_json.aio("notes/notes.json", notes, expected_sha=sha, reason="agent note")
except ConcurrencyError:
# Another writer updated the file between our read and write.
return "Memory changed while saving the note; please retry."Optional capabilities
MemoryStore (and create / get_or_create) accept a few flags:
| Option | Default | What it does |
|---|---|---|
audit |
True |
Append every successful write to audit/log.jsonl. Inspect with audit_tail(n). |
keep_versions |
False |
Snapshot every write under versions/ for full history (≈ 2× storage per write). |
read_only_prefixes |
() |
Reject direct writes into the given prefixes (e.g. ("memory/",)), so the agent must stage proposals elsewhere and a trusted pipeline promotes them. |
The internal audit/, meta/, and versions/ prefixes and messages.json are reserved — writes to them are rejected, and they’re excluded from list_paths.
Passing memory to the agent
Memory is not attached to the agent — it is passed in per call and returned on the result. agent.run(message, memory=store) prepends the store’s prior transcript, runs the loop, and appends the new turn back onto the store. Persisting is explicit: run never writes on its own, so call memory.save() (or await memory.save.aio()) yourself afterward.
memory = await MemoryStore.get_or_create.aio(key="my-assistant")
result = await agent.run.aio(message, memory=memory)
await memory.save.aio() # save() always targets the deterministic keyed pathYou can also pass a plain list[dict] of prior messages as memory for a stateless, single-shot history (nothing is persisted in that case).
Lower-level usage
Every MemoryStore is keyed — there is no unkeyed/ephemeral store. You normally obtain one via MemoryStore.create(key=...) or MemoryStore.get_or_create(key=...), but direct construction is supported for advanced use and serialization (MemoryStore is a Flyte I/O type, so it can be passed as a task input/output). save() takes no arguments: it always uploads the working root to the deterministic keyed remote_path. When root is omitted, a temporary working directory is created and cleaned up automatically.
Next steps
-
The Flyte Agent harness: how the agent loop uses
memory. - Deploy an agent as a service: schedule a memory-backed agent so it resumes context on each wakeup.