Source code for agentscope_runtime.engine.services.memory.mem0_memory_service
# -*- coding: utf-8 -*-
import os
from typing import Optional, Dict, Any, List
from mem0 import AsyncMemoryClient
from .memory_service import MemoryService
from ...schemas.agent_schemas import Message, MessageType, ContentType
[docs]
class Mem0MemoryService(MemoryService):
"""
Memory service that uses mem0 to store and retrieve memories.
To get the api key, please refer to the following link:
https://docs.mem0.ai/platform/quickstart
"""
[docs]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.service = None
self._health = False
[docs]
@staticmethod
async def get_query_text(message: Message) -> str:
"""
Get the query text from the message.
"""
if message:
if message.type == MessageType.MESSAGE:
for content in message.content:
if content.type == ContentType.TEXT:
return content.text
return ""
[docs]
async def start(self):
mem0_api_key = os.getenv("MEM0_API_KEY")
if mem0_api_key is None:
raise ValueError("MEM0_API_KEY is not set")
mem0_api_key = os.getenv("MEM0_API_KEY")
# get the mem0 client instance
self.service = AsyncMemoryClient(api_key=mem0_api_key)
self._health = True
[docs]
async def stop(self):
self.service = None
self._health = False
[docs]
async def health(self):
return self._health
[docs]
async def add_memory(
self,
user_id: str,
messages: list,
session_id: Optional[str] = None,
):
messages = await self.transform_messages(messages)
return await self.service.add(
messages=messages,
user_id=user_id,
run_id=session_id,
# async_mode=True,
)
[docs]
async def search_memory(
self,
user_id: str,
messages: list,
filters: Optional[Dict[str, Any]] = None,
) -> list:
query = await self.get_query_text(messages[-1])
kwargs = {
"query": query,
"user_id": user_id,
}
if filters:
kwargs["filters"] = filters
return await self.service.search(**kwargs)
[docs]
async def list_memory(
self,
user_id: str,
filters: Optional[Dict[str, Any]] = None,
) -> list:
kwargs = {"user_id": user_id}
if filters:
kwargs["filters"] = filters
return await self.service.get_all(**kwargs)
[docs]
async def delete_memory(
self,
user_id: str,
session_id: Optional[str] = None,
) -> None:
if session_id:
return await self.service.delete_all(
user_id=user_id,
run_id=session_id,
)
else:
return await self.service.delete_all(user_id=user_id)