autogen_ext.models.cache#

class ChatCompletionCache(client: ChatCompletionClient, store: CacheStore[CreateResult | List[str | CreateResult]] | None = None)[source]#

Bases: ChatCompletionClient, Component[ChatCompletionCacheConfig]

A wrapper around a ChatCompletionClient that caches creation results from an underlying client. Cache hits do not contribute to token usage of the original client.

Typical Usage:

Lets use caching on disk with openai client as an example. First install autogen-ext with the required packages:

pip install -U "autogen-ext[openai, diskcache]"

And use it as:

import asyncio
import tempfile

from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE
from autogen_ext.cache_store.diskcache import DiskCacheStore
from diskcache import Cache


async def main():
    with tempfile.TemporaryDirectory() as tmpdirname:
        # Initialize the original client
        openai_model_client = OpenAIChatCompletionClient(model="gpt-4o")

        # Then initialize the CacheStore, in this case with diskcache.Cache.
        # You can also use redis like:
        # from autogen_ext.cache_store.redis import RedisStore
        # import redis
        # redis_instance = redis.Redis()
        # cache_store = RedisCacheStore[CHAT_CACHE_VALUE_TYPE](redis_instance)
        cache_store = DiskCacheStore[CHAT_CACHE_VALUE_TYPE](Cache(tmpdirname))
        cache_client = ChatCompletionCache(openai_model_client, cache_store)

        response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
        print(response)  # Should print response from OpenAI
        response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
        print(response)  # Should print cached response


asyncio.run(main())

For Redis caching:

import asyncio

from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE
from autogen_ext.cache_store.redis import RedisStore
import redis


async def main():
    # Initialize the original client
    openai_model_client = OpenAIChatCompletionClient(model="gpt-4o")

    # Initialize Redis cache store
    redis_instance = redis.Redis()
    cache_store = RedisStore[CHAT_CACHE_VALUE_TYPE](redis_instance)
    cache_client = ChatCompletionCache(openai_model_client, cache_store)

    response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
    print(response)  # Should print response from OpenAI
    response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
    print(response)  # Should print cached response


asyncio.run(main())

For streaming with Redis caching:

import asyncio

from autogen_core.models import UserMessage, CreateResult
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE
from autogen_ext.cache_store.redis import RedisStore
import redis


async def main():
    # Initialize the original client
    openai_model_client = OpenAIChatCompletionClient(model="gpt-4o")

    # Initialize Redis cache store
    redis_instance = redis.Redis()
    cache_store = RedisStore[CHAT_CACHE_VALUE_TYPE](redis_instance)
    cache_client = ChatCompletionCache(openai_model_client, cache_store)

    # First streaming call
    async for chunk in cache_client.create_stream(
        [UserMessage(content="List all countries in Africa", source="user")]
    ):
        if isinstance(chunk, CreateResult):
            print("\n")
            print("Cached: ", chunk.cached)  # Should print False
        else:
            print(chunk, end="")

    # Second streaming call (cached)
    async for chunk in cache_client.create_stream(
        [UserMessage(content="List all countries in Africa", source="user")]
    ):
        if isinstance(chunk, CreateResult):
            print("\n")
            print("Cached: ", chunk.cached)  # Should print True
        else:
            print(chunk, end="")


asyncio.run(main())

You can now use the cached_client as you would the original client, but with caching enabled.

Parameters:
  • client (ChatCompletionClient) – The original ChatCompletionClient to wrap.

  • store (CacheStore) – A store object that implements get and set methods. The user is responsible for managing the store’s lifecycle & clearing it (if needed). Defaults to using in-memory cache.

component_type: ClassVar[ComponentType] = 'chat_completion_cache'#

The logical type of the component.

component_provider_override: ClassVar[str | None] = 'autogen_ext.models.cache.ChatCompletionCache'#

Override the provider string for the component. This should be used to prevent internal module names being a part of the module name.

component_config_schema#

alias of ChatCompletionCacheConfig

async create(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = [], tool_choice: Tool | Literal['auto', 'required', 'none'] = 'auto', json_output: bool | type[BaseModel] | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) CreateResult[source]#

Cached version of ChatCompletionClient.create. If the result of a call to create has been cached, it will be returned immediately without invoking the underlying client.

NOTE: cancellation_token is ignored for cached results.

create_stream(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = [], tool_choice: Tool | Literal['auto', 'required', 'none'] = 'auto', json_output: bool | type[BaseModel] | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) AsyncGenerator[str | CreateResult, None][source]#

Cached version of ChatCompletionClient.create_stream. If the result of a call to create_stream has been cached, it will be returned without streaming from the underlying client.

NOTE: cancellation_token is ignored for cached results.

async close() None[source]#
actual_usage() RequestUsage[source]#
count_tokens(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = []) int[source]#
property capabilities: ModelCapabilities#
property model_info: ModelInfo#
remaining_tokens(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = []) int[source]#
total_usage() RequestUsage[source]#
_to_config() ChatCompletionCacheConfig[source]#

Dump the configuration that would be requite to create a new instance of a component matching the configuration of this instance.

Returns:

T – The configuration of the component.

classmethod _from_config(config: ChatCompletionCacheConfig) Self[source]#

Create a new instance of the component from a configuration object.

Parameters:

config (T) – The configuration object.

Returns:

Self – The new instance of the component.