Source code for autogen_core._cache_store
from abc import ABC, abstractmethod
from typing import Dict, Generic, Optional, TypeVar
from pydantic import BaseModel
from typing_extensions import Self
from ._component_config import Component, ComponentBase
T = TypeVar("T")
[docs]
class CacheStore(ABC, Generic[T], ComponentBase[BaseModel]):
"""
This protocol defines the basic interface for store/cache operations.
Sub-classes should handle the lifecycle of underlying storage.
"""
component_type = "cache_store"
[docs]
@abstractmethod
def get(self, key: str, default: Optional[T] = None) -> Optional[T]:
"""
Retrieve an item from the store.
Args:
key: The key identifying the item in the store.
default (optional): The default value to return if the key is not found.
Defaults to None.
Returns:
The value associated with the key if found, else the default value.
"""
...
[docs]
@abstractmethod
def set(self, key: str, value: T) -> None:
"""
Set an item in the store.
Args:
key: The key under which the item is to be stored.
value: The value to be stored in the store.
"""
...
class InMemoryStoreConfig(BaseModel):
pass
[docs]
class InMemoryStore(CacheStore[T], Component[InMemoryStoreConfig]):
component_provider_override = "autogen_core.InMemoryStore"
component_config_schema = InMemoryStoreConfig
def __init__(self) -> None:
self.store: Dict[str, T] = {}
[docs]
def get(self, key: str, default: Optional[T] = None) -> Optional[T]:
return self.store.get(key, default)
[docs]
def set(self, key: str, value: T) -> None:
self.store[key] = value
[docs]
def _to_config(self) -> InMemoryStoreConfig:
return InMemoryStoreConfig()
[docs]
@classmethod
def _from_config(cls, config: InMemoryStoreConfig) -> Self:
return cls()