import io
import logging
import uuid
from contextlib import redirect_stderr, redirect_stdout
from datetime import datetime
from typing import Any, Dict, List, Optional, TypedDict, cast
from autogen_core import CancellationToken, Component, ComponentBase
from autogen_core.memory import Memory, MemoryContent, MemoryQueryResult, UpdateContextResult
from autogen_core.model_context import ChatCompletionContext
from autogen_core.models import SystemMessage
from mem0 import Memory as Memory0
from mem0 import MemoryClient
from pydantic import BaseModel, Field
from typing_extensions import Self
logger = logging.getLogger(__name__)
logging.getLogger("chromadb").setLevel(logging.ERROR)
[docs]
class Mem0MemoryConfig(BaseModel):
"""Configuration for Mem0Memory component."""
user_id: Optional[str] = Field(
default=None, description="User ID for memory operations. If not provided, a UUID will be generated."
)
limit: int = Field(default=10, description="Maximum number of results to return in memory queries.")
is_cloud: bool = Field(default=True, description="Whether to use cloud Mem0 client (True) or local client (False).")
api_key: Optional[str] = Field(
default=None, description="API key for cloud Mem0 client. Required if is_cloud=True."
)
config: Optional[Dict[str, Any]] = Field(
default=None, description="Configuration dictionary for local Mem0 client. Required if is_cloud=False."
)
class MemoryResult(TypedDict, total=False):
memory: str
score: float
metadata: Dict[str, Any]
created_at: str
updated_at: str
categories: List[str]
# pyright: reportGeneralTypeIssues=false
[docs]
class Mem0Memory(Memory, Component[Mem0MemoryConfig], ComponentBase[Mem0MemoryConfig]):
"""Mem0 memory implementation for AutoGen.
This component integrates with Mem0.ai's memory system, providing an implementation
of AutoGen's Memory interface. It supports both cloud and local backends through the
mem0ai Python package.
To use this component, you need to have the `mem0` (for cloud-only) or `mem0-local` (for local)
extra installed for the `autogen-ext` package:
.. code-block:: bash
pip install -U "autogen-ext[mem0]" # For cloud-based Mem0
pip install -U "autogen-ext[mem0-local]" # For local Mem0
The memory component can store and retrieve information that agents need to remember
across conversations. It also provides context updating for language models with
relevant memories.
Examples:
.. code-block:: python
import asyncio
from autogen_ext.memory.mem0 import Mem0Memory
from autogen_core.memory import MemoryContent
async def main() -> None:
# Create a local Mem0Memory (no API key required)
memory = Mem0Memory(
is_cloud=False,
config={"path": ":memory:"}, # Use in-memory storage for testing
)
print("Memory initialized successfully!")
# Add something to memory
test_content = "User likes the color blue."
await memory.add(MemoryContent(content=test_content, mime_type="text/plain"))
print(f"Added content: {test_content}")
# Retrieve memories with a search query
results = await memory.query("What color does the user like?")
print(f"Query results: {len(results.results)} found")
for i, result in enumerate(results.results):
print(f"Result {i+1}: {result}")
asyncio.run(main())
Output:
.. code-block:: text
Memory initialized successfully!
Added content: User likes the color blue.
Query results: 1 found
Result 1: content='User likes the color blue' mime_type='text/plain' metadata={'score': 0.6977155806281953, 'created_at': datetime.datetime(2025, 7, 6, 17, 25, 18, 754725, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=61200)))}
Using it with an :class:`~autogen_agentchat.agents.AssistantAgent`:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_core.memory import MemoryContent
from autogen_ext.memory.mem0 import Mem0Memory
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
# Create a model client
model_client = OpenAIChatCompletionClient(model="gpt-4.1")
# Create a Mem0 memory instance
memory = Mem0Memory(
user_id="user123",
is_cloud=False,
config={"path": ":memory:"}, # Use in-memory storage for testing
)
# Add something to memory
test_content = "User likes the color blue."
await memory.add(MemoryContent(content=test_content, mime_type="text/plain"))
# Create an assistant agent with Mem0 memory
agent = AssistantAgent(
name="assistant",
model_client=model_client,
memory=[memory],
system_message="You are a helpful assistant that remembers user preferences.",
)
# Run a sample task
result = await agent.run(task="What color does the user like?")
print(result.messages[-1].content) # type: ignore
asyncio.run(main())
Output:
.. code-block:: text
User likes the color blue.
Args:
user_id: Optional user ID for memory operations. If not provided, a UUID will be generated.
limit: Maximum number of results to return in memory queries.
is_cloud: Whether to use cloud Mem0 client (True) or local client (False).
api_key: API key for cloud Mem0 client. It will read from the environment MEM0_API_KEY if not provided.
config: Configuration dictionary for local Mem0 client. Required if is_cloud=False.
"""
component_type = "memory"
component_provider_override = "autogen_ext.memory.mem0.Mem0Memory"
component_config_schema = Mem0MemoryConfig
def __init__(
self,
user_id: Optional[str] = None,
limit: int = 10,
is_cloud: bool = True,
api_key: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> None:
# Validate parameters
if not is_cloud and config is None:
raise ValueError("config is required when using local Mem0 client (is_cloud=False)")
# Initialize instance variables
self._user_id = user_id or str(uuid.uuid4())
self._limit = limit
self._is_cloud = is_cloud
self._api_key = api_key
self._config = config
# Initialize client
if self._is_cloud:
self._client = MemoryClient(api_key=self._api_key)
else:
assert self._config is not None
config_dict = self._config
self._client = Memory0.from_config(config_dict=config_dict) # type: ignore
@property
def user_id(self) -> str:
"""Get the user ID for memory operations."""
return self._user_id
@property
def limit(self) -> int:
"""Get the maximum number of results to return in memory queries."""
return self._limit
@property
def is_cloud(self) -> bool:
"""Check if the Mem0 client is cloud-based."""
return self._is_cloud
@property
def config(self) -> Optional[Dict[str, Any]]:
"""Get the configuration for the Mem0 client."""
return self._config
[docs]
async def add(
self,
content: MemoryContent,
cancellation_token: Optional[CancellationToken] = None,
) -> None:
"""Add content to memory.
Args:
content: The memory content to add.
cancellation_token: Optional token to cancel operation.
Raises:
Exception: If there's an error adding content to mem0 memory.
"""
# Extract content based on mime type
if hasattr(content, "content") and hasattr(content, "mime_type"):
if content.mime_type in ["text/plain", "text/markdown"]:
message = str(content.content)
elif content.mime_type == "application/json":
# Convert JSON content to string representation
if isinstance(content.content, str):
message = content.content
else:
# Convert dict or other JSON serializable objects to string
import json
message = json.dumps(content.content)
else:
message = str(content.content)
# Extract metadata
metadata = content.metadata or {}
else:
# Handle case where content is directly provided as string
message = str(content)
metadata = {}
# Check if operation is cancelled
if cancellation_token is not None and cancellation_token.cancelled: # type: ignore
return
# Add to mem0 client
try:
user_id = metadata.pop("user_id", self._user_id)
# Suppress warning messages from mem0 MemoryClient
kwargs = {} if self._client.__class__.__name__ == "Memory" else {"output_format": "v1.1"}
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
self._client.add([{"role": "user", "content": message}], user_id=user_id, metadata=metadata, **kwargs) # type: ignore
except Exception as e:
# Log the error but don't crash
logger.error(f"Error adding to mem0 memory: {str(e)}")
raise
[docs]
async def query(
self,
query: str | MemoryContent = "",
cancellation_token: Optional[CancellationToken] = None,
**kwargs: Any,
) -> MemoryQueryResult:
"""Query memory for relevant content.
Args:
query: The query to search for, either as string or MemoryContent.
cancellation_token: Optional token to cancel operation.
**kwargs: Additional query parameters to pass to mem0.
Returns:
MemoryQueryResult containing search results.
"""
# Extract query text
if isinstance(query, str):
query_text = query
elif hasattr(query, "content"):
query_text = str(query.content)
else:
query_text = str(query)
# Check if operation is cancelled
if (
cancellation_token
and hasattr(cancellation_token, "cancelled")
and getattr(cancellation_token, "cancelled", False)
):
return MemoryQueryResult(results=[])
try:
limit = kwargs.pop("limit", self._limit)
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
# Query mem0 client
results = self._client.search( # type: ignore
query_text,
user_id=self._user_id,
limit=limit,
**kwargs,
)
# Type-safe handling of results
if isinstance(results, dict) and "results" in results:
result_list = cast(List[MemoryResult], results["results"])
else:
result_list = cast(List[MemoryResult], results)
# Convert results to MemoryContent objects
memory_contents: List[MemoryContent] = []
for result in result_list:
content_text = result.get("memory", "")
metadata: Dict[str, Any] = {}
if "metadata" in result and result["metadata"]:
metadata = result["metadata"]
# Add relevant fields to metadata
if "score" in result:
metadata["score"] = result["score"]
# For created_at
if "created_at" in result and result.get("created_at"):
try:
metadata["created_at"] = datetime.fromisoformat(result["created_at"])
except (ValueError, TypeError):
pass
# For updated_at
if "updated_at" in result and result.get("updated_at"):
try:
metadata["updated_at"] = datetime.fromisoformat(result["updated_at"])
except (ValueError, TypeError):
pass
# For categories
if "categories" in result and result.get("categories"):
metadata["categories"] = result["categories"]
# Create MemoryContent object
memory_content = MemoryContent(
content=content_text,
mime_type="text/plain", # Default to text/plain
metadata=metadata,
)
memory_contents.append(memory_content)
return MemoryQueryResult(results=memory_contents)
except Exception as e:
# Log the error but return empty results
logger.error(f"Error querying mem0 memory: {str(e)}")
return MemoryQueryResult(results=[])
[docs]
async def update_context(
self,
model_context: ChatCompletionContext,
) -> UpdateContextResult:
"""Update the model context with relevant memories.
This method retrieves the conversation history from the model context,
uses the last message as a query to find relevant memories, and then
adds those memories to the context as a system message.
Args:
model_context: The model context to update.
Returns:
UpdateContextResult containing memories added to the context.
"""
# Get messages from context
messages = await model_context.get_messages()
if not messages:
return UpdateContextResult(memories=MemoryQueryResult(results=[]))
# Use the last message as query
last_message = messages[-1]
query_text = last_message.content if isinstance(last_message.content, str) else str(last_message)
# Query memory
query_results = await self.query(query_text, limit=self._limit)
# If we have results, add them to the context
if query_results.results:
# Format memories as numbered list
memory_strings = [f"{i}. {str(memory.content)}" for i, memory in enumerate(query_results.results, 1)]
memory_context = "\nRelevant memories:\n" + "\n".join(memory_strings)
# Add as system message
await model_context.add_message(SystemMessage(content=memory_context))
return UpdateContextResult(memories=query_results)
[docs]
async def clear(self) -> None:
"""Clear all content from memory for the current user.
Raises:
Exception: If there's an error clearing mem0 memory.
"""
try:
self._client.delete_all(user_id=self._user_id) # type: ignore
except Exception as e:
logger.error(f"Error clearing mem0 memory: {str(e)}")
raise
[docs]
async def close(self) -> None:
"""Clean up resources if needed.
This is a no-op for Mem0 clients as they don't require explicit cleanup.
"""
pass
@classmethod
def _from_config(cls, config: Mem0MemoryConfig) -> Self:
"""Create instance from configuration.
Args:
config: Configuration for Mem0Memory component.
Returns:
A new Mem0Memory instance.
"""
return cls(
user_id=config.user_id,
limit=config.limit,
is_cloud=config.is_cloud,
api_key=config.api_key,
config=config.config,
)
def _to_config(self) -> Mem0MemoryConfig:
"""Convert instance to configuration.
Returns:
Configuration representing this Mem0Memory instance.
"""
return Mem0MemoryConfig(
user_id=self._user_id,
limit=self._limit,
is_cloud=self._is_cloud,
api_key=self._api_key,
config=self._config,
)