Custom Adapters
When to write a custom adapter
Section titled “When to write a custom adapter”Write a custom adapter when:
- Your storage backend isn’t covered by the built-in adapters
- You need specialized behavior (encryption, multi-region, custom caching)
- You’re integrating with an existing system that has its own storage layer
The adapter contract
Section titled “The adapter contract”Both Python and TypeScript SDKs define an abstract MemoryAdapter class. Your custom adapter must implement every abstract method.
from cognitive_memory.adapters.base import MemoryAdapterfrom cognitive_memory.types import Memoryfrom datetime import datetimefrom typing import Optional
class MyAdapter(MemoryAdapter): # CRUD async def create(self, memory: Memory) -> None: ... async def get(self, memory_id: str) -> Optional[Memory]: ... async def get_batch(self, memory_ids: list[str]) -> list[Memory]: ... async def update(self, memory: Memory) -> None: ... async def delete(self, memory_id: str) -> None: ... async def delete_batch(self, memory_ids: list[str]) -> None: ...
# Vector search async def search_similar( self, query_embedding: list[float], top_k: int = 10, include_superseded: bool = False, ) -> list[tuple[Memory, float]]: ...
# Tiered storage async def migrate_to_cold(self, memory_id: str, cold_since: datetime) -> None: ... async def migrate_to_hot(self, memory_id: str) -> None: ... async def convert_to_stub(self, memory_id: str, stub_content: str) -> None: ...
# Association links async def create_or_strengthen_link( self, source_id: str, target_id: str, weight: float, ) -> None: ... async def get_linked_memories( self, memory_id: str, min_weight: float = 0.3, ) -> list[tuple[Memory, float]]: ... async def delete_link(self, source_id: str, target_id: str) -> None: ...
# Consolidation helpers async def find_fading(self, threshold: float, exclude_core: bool = True) -> list[Memory]: ... async def find_stable(self, min_stability: float, min_access_count: int) -> list[Memory]: ... async def mark_superseded(self, memory_ids: list[str], summary_id: str) -> None: ...
# Traversal async def all_active(self) -> list[Memory]: ... async def all_hot(self) -> list[Memory]: ... async def all_cold(self) -> list[Memory]: ...
# Counts async def hot_count(self) -> int: ... async def cold_count(self) -> int: ... async def stub_count(self) -> int: ... async def total_count(self) -> int: ...
# Batch operations async def batch_update(self, memories: list[Memory]) -> None: ... async def update_retention_scores(self, updates: dict[str, float]) -> None: ...
# Transactions async def transaction(self, callback) -> any: ...
# Reset async def clear(self) -> None: ...import { MemoryAdapter, type MemoryFilters } from "cognitive-memory";import type { Memory, ScoredMemory } from "cognitive-memory";
export class MyAdapter extends MemoryAdapter { async createMemory(memory: Omit<Memory, "id" | "createdAt" | "updatedAt">): Promise<string> { ... } async getMemory(id: string): Promise<Memory | null> { ... } async getMemories(ids: string[]): Promise<Memory[]> { ... } async queryMemories(filters: MemoryFilters): Promise<Memory[]> { ... } async updateMemory(id: string, updates: Partial<Memory>): Promise<void> { ... } async deleteMemory(id: string): Promise<void> { ... } async deleteMemories(ids: string[]): Promise<void> { ... } async vectorSearch(embedding: number[], filters?: MemoryFilters): Promise<ScoredMemory[]> { ... } async updateRetentionScores(updates: Map<string, number>): Promise<void> { ... } async createOrStrengthenLink(sourceId: string, targetId: string, strength: number): Promise<void> { ... } async getLinkedMemories(memoryId: string, minStrength?: number): Promise<Array<Memory & { linkStrength: number }>> { ... } async getLinkedMemoriesMultiple(memoryIds: string[], minStrength?: number): Promise<Array<Memory & { linkStrength: number }>> { ... } async deleteLink(sourceId: string, targetId: string): Promise<void> { ... } async findFadingMemories(userId: string, maxRetention: number): Promise<Memory[]> { ... } async findStableMemories(userId: string, minStability: number, minAccessCount: number): Promise<Memory[]> { ... } async markSuperseded(memoryIds: string[], summaryId: string): Promise<void> { ... } async transaction<T>(callback: (adapter: MemoryAdapter) => Promise<T>): Promise<T> { ... }}Optional: Lexical search
Section titled “Optional: Lexical search”Custom adapters may optionally implement search_lexical (Python) / searchLexical (TypeScript) to support hybrid retrieval. The default implementation returns an empty list, so hybrid search gracefully falls back to vector-only if not implemented.
# Pythonasync def search_lexical(self, query_text: str, top_k: int = 10) -> list[tuple[Memory, float]]: # Implement BM25, full-text search, or any keyword-based retrieval ...// TypeScriptasync searchLexical(queryText: string, topK?: number): Promise<ScoredMemory[]> { // Implement keyword-based retrieval ...}Key implementation details
Section titled “Key implementation details”search_similar / vectorSearch
Section titled “search_similar / vectorSearch”This is the most performance-critical method. It must:
- Compute cosine similarity between the query embedding and all hot memories
- Optionally include superseded memories (when
include_superseded=True) - Exclude stubs
- Return results sorted by similarity score, descending
- Truncate to
top_k
If your backend supports native vector search (pgvector, Pinecone, etc.), use it. Otherwise, fall back to brute-force:
async def search_similar(self, query_embedding, top_k=10, include_superseded=False): results = [] for mem in await self.all_hot(): if mem.embedding is None or mem.is_stub: continue if mem.is_superseded and not include_superseded: continue sim = cosine_similarity(query_embedding, mem.embedding) results.append((mem, sim)) results.sort(key=lambda x: x[1], reverse=True) return results[:top_k]Tiered storage
Section titled “Tiered storage”Your adapter must support three tiers. The engine calls migrate_to_cold(), migrate_to_hot(), and convert_to_stub() to move memories between tiers. How you implement tiers is up to you:
- Separate tables/collections (recommended for databases)
- A
tiercolumn with an enum value - Separate key prefixes (for Redis)
Transactions
Section titled “Transactions”If your backend supports transactions, implement them. If not, provide a passthrough:
async def transaction(self, callback): return await callback(self) # no-op for non-transactional backendsTesting your adapter
Section titled “Testing your adapter”Use the built-in test suite to verify your adapter implements the contract correctly:
from cognitive_memory.tests.test_adapters import AdapterTestSuite
class TestMyAdapter(AdapterTestSuite): def create_adapter(self): return MyAdapter(...)This runs a comprehensive set of tests covering CRUD, vector search, tiered storage, associations, and consolidation.
See also
Section titled “See also”- Adapter specification — canonical contract documentation
- InMemory adapter source — reference implementation