Memory Consolidation API¶
Status: v0.5.0+ Module: nexus.core.ace.consolidation
Overview¶
The Memory Consolidation API prevents context collapse by intelligently merging similar low-importance memories while preserving critical high-importance knowledge. Based on importance-based preservation from the ACE paper, it ensures agents maintain essential details as their memory grows.
Key Features:
- 🧠 Importance-based preservation
- 🔄 Semantic similarity detection
- 📊 Configurable consolidation strategies
- 🔒 High-importance memory protection
- 📈 Batch processing for efficiency
- 🎯 LLM-powered semantic merging
- 🔍 Consolidation lineage tracking
The Context Collapse Problem¶
As agents accumulate memories, context windows become overloaded:
Day 1: [Memory 1] [Memory 2] [Memory 3]
Day 30: [Memory 1] [Memory 2] ... [Memory 500] ← Context overflow!
Day 60: [Memory 1] [Memory 2] ... [Memory 1000] ← Critical details lost
Without consolidation: - ❌ Context windows exceeded - ❌ Critical details buried - ❌ Query performance degraded - ❌ Irrelevant old memories persist
With consolidation: - ✅ Context stays manageable - ✅ Important details preserved - ✅ Related memories merged - ✅ Query performance maintained
API Reference¶
ConsolidationEngine¶
Main class for memory consolidation.
Initialization¶
from nexus.core.ace.consolidation import ConsolidationEngine
consolidation_engine = ConsolidationEngine(
session=db_session,
backend=storage_backend,
llm_provider=llm_provider,
user_id="user_123",
agent_id="agent_456",
tenant_id="tenant_789"
)
Parameters:
session(Session): Database sessionbackend(Any): Storage backend for CAS contentllm_provider(LLMProvider): LLM provider for semantic merginguser_id(str): User ID for ownershipagent_id(str, optional): Agent ID for scopingtenant_id(str, optional): Tenant ID for multi-tenancy
consolidate_by_criteria()¶
Consolidate memories matching specific criteria.
results = consolidation_engine.consolidate_by_criteria(
memory_type="experience",
importance_max=0.5,
batch_size=10,
min_age_days=7,
scope="agent"
)
Parameters:
memory_type(str, optional): Filter by memory type"fact","preference","experience","observation", etc.importance_max(float): Maximum importance threshold (default: 0.5)- Only consolidate memories with importance ≤ this value
batch_size(int): Memories per consolidation batch (default: 10)min_age_days(int): Minimum age in days (default: 0)- Only consolidate memories older than this
scope(str, optional): Filter by scope"agent","user","tenant","global"
Returns: List[dict] - Consolidation results
[
{
"consolidated_memory_id": str,
"source_memory_ids": List[str],
"memories_merged": int,
"content_preview": str,
"importance": float,
"consolidation_strategy": str
},
...
]
Example:
# Consolidate old low-importance experiences
results = consolidation_engine.consolidate_by_criteria(
memory_type="experience",
importance_max=0.5, # Only importance ≤ 0.5
batch_size=20,
min_age_days=7, # Only memories > 7 days old
scope="agent"
)
for result in results:
print(f"Merged {result['memories_merged']} memories")
print(f"New memory: {result['consolidated_memory_id']}")
print(f"Preview: {result['content_preview'][:100]}...")
consolidate_similar_memories()¶
Consolidate semantically similar memories.
result = consolidation_engine.consolidate_similar_memories(
memory_ids=["mem_1", "mem_2", "mem_3"],
preserve_importance=True,
strategy="semantic_merge"
)
Parameters:
memory_ids(List[str]): Memory IDs to consolidatepreserve_importance(bool): Keep highest importance (default: True)strategy(str): Consolidation strategy"semantic_merge"- LLM merges content semantically (default)"concatenate"- Simple concatenation"summarize"- LLM generates summary
Returns: dict - Consolidation result
{
"consolidated_memory_id": str,
"source_memory_ids": List[str],
"content": str,
"importance": float,
"metadata": dict
}
Example:
# Find similar memories
similar_groups = consolidation_engine.find_similar_memory_groups(
scope="agent",
similarity_threshold=0.85
)
# Consolidate each group
for group in similar_groups:
result = consolidation_engine.consolidate_similar_memories(
memory_ids=group,
preserve_importance=True,
strategy="semantic_merge"
)
print(f"Consolidated {len(group)} → {result['consolidated_memory_id']}")
find_similar_memory_groups()¶
Identify groups of similar memories for consolidation.
groups = consolidation_engine.find_similar_memory_groups(
scope="agent",
memory_type="experience",
similarity_threshold=0.85,
min_group_size=2,
max_groups=10
)
Parameters:
scope(str, optional): Filter by scopememory_type(str, optional): Filter by typesimilarity_threshold(float): Semantic similarity threshold (default: 0.85)min_group_size(int): Minimum memories per group (default: 2)max_groups(int): Maximum groups to return (default: 10)
Returns: List[List[str]] - Groups of similar memory IDs
[
["mem_1", "mem_2", "mem_3"], # Group 1: Similar memories
["mem_10", "mem_11"], # Group 2: Similar memories
...
]
Example:
# Find similar memory groups
groups = consolidation_engine.find_similar_memory_groups(
scope="agent",
memory_type="experience",
similarity_threshold=0.9 # High similarity required
)
print(f"Found {len(groups)} groups of similar memories")
for i, group in enumerate(groups):
print(f"Group {i+1}: {len(group)} memories")
get_consolidation_candidates()¶
Get memories eligible for consolidation.
candidates = consolidation_engine.get_consolidation_candidates(
importance_max=0.5,
min_age_days=7,
memory_type="experience",
limit=100
)
Parameters:
importance_max(float): Maximum importance thresholdmin_age_days(int): Minimum age in daysmemory_type(str, optional): Filter by typelimit(int): Maximum candidates (default: 100)
Returns: List[dict] - Candidate memories
Example:
# Preview consolidation candidates
candidates = consolidation_engine.get_consolidation_candidates(
importance_max=0.5,
min_age_days=30,
limit=50
)
print(f"Found {len(candidates)} consolidation candidates")
for c in candidates[:5]:
print(f" {c['memory_id']}: importance={c['importance']}")
rollback_consolidation()¶
Rollback a consolidation by restoring source memories.
success = consolidation_engine.rollback_consolidation(
consolidated_memory_id="mem_consolidated_123"
)
Parameters:
consolidated_memory_id(str): Consolidated memory ID
Returns: bool - True if rollback successful
Example:
# Rollback if consolidation was incorrect
if consolidation_engine.rollback_consolidation(consolidated_id):
print("Consolidation rolled back, source memories restored")
else:
print("Rollback failed or no lineage found")
Consolidation Strategies¶
1. Importance-Based Preservation¶
Never consolidate high-importance memories:
# Safe consolidation
consolidation_engine.consolidate_by_criteria(
importance_max=0.5, # Only low-importance
batch_size=10
)
# High-importance memories (>0.5) are protected!
2. Semantic Merging¶
LLM intelligently merges related content:
# Before consolidation:
# mem_1: "User prefers dark mode in IDE"
# mem_2: "User likes dark themes"
# mem_3: "User uses dark color scheme"
result = consolidation_engine.consolidate_similar_memories(
memory_ids=["mem_1", "mem_2", "mem_3"],
strategy="semantic_merge"
)
# After consolidation:
# "User consistently prefers dark themes across IDE and applications"
3. Time-Based Consolidation¶
Consolidate old memories, keep recent ones fresh:
# Only consolidate memories >30 days old
results = consolidation_engine.consolidate_by_criteria(
min_age_days=30,
importance_max=0.6
)
Usage Examples¶
Basic Consolidation¶
import nexus
nx = nexus.connect()
# Run consolidation
results = nx.ace.consolidation_engine.consolidate_by_criteria(
memory_type="experience",
importance_max=0.5,
batch_size=20,
min_age_days=7
)
print(f"Consolidated {len(results)} memory batches")
for result in results:
print(f" Merged {result['memories_merged']} memories")
print(f" New importance: {result['importance']}")
Scheduled Consolidation¶
import schedule
import time
def consolidation_job():
"""Background job for memory consolidation."""
results = nx.ace.consolidation_engine.consolidate_by_criteria(
importance_max=0.5,
batch_size=10,
min_age_days=7
)
if results:
print(f"Consolidated {len(results)} batches")
else:
print("No consolidation needed")
# Run daily at 2 AM
schedule.every().day.at("02:00").do(consolidation_job)
while True:
schedule.run_pending()
time.sleep(3600) # Check hourly
Dry Run Preview¶
# Preview what would be consolidated
candidates = nx.ace.consolidation_engine.get_consolidation_candidates(
importance_max=0.5,
min_age_days=7,
limit=100
)
print(f"Would consolidate {len(candidates)} memories:")
for c in candidates[:10]:
print(f" {c['memory_id']}: {c['content'][:50]}...")
print(f" Importance: {c['importance']}, Age: {c['age_days']} days")
# Proceed if acceptable
if input("Consolidate? (y/n): ").lower() == 'y':
results = nx.ace.consolidation_engine.consolidate_by_criteria(
importance_max=0.5,
min_age_days=7
)
print(f"Consolidated {len(results)} batches")
Smart Similarity-Based Consolidation¶
# Find similar memory groups
groups = nx.ace.consolidation_engine.find_similar_memory_groups(
scope="agent",
similarity_threshold=0.85,
min_group_size=3 # At least 3 similar memories
)
print(f"Found {len(groups)} groups of similar memories")
# Consolidate each group
for i, group in enumerate(groups):
print(f"Consolidating group {i+1} ({len(group)} memories)...")
result = nx.ace.consolidation_engine.consolidate_similar_memories(
memory_ids=group,
preserve_importance=True,
strategy="semantic_merge"
)
print(f" ✓ Created: {result['consolidated_memory_id']}")
print(f" Preview: {result['content'][:100]}...")
Integration with Learning Loop¶
from nexus.core.ace.learning_loop import LearningLoop
learning_loop = nx.ace.learning_loop
# After executing many tasks, consolidate old memories
def cleanup_old_memories():
"""Periodic memory cleanup."""
# Consolidate low-importance memories
results = learning_loop.consolidate_memories(
memory_type="experience",
importance_max=0.5,
batch_size=10
)
print(f"Consolidated {len(results)} memory batches")
# Also consolidate old reflections
results = learning_loop.consolidate_memories(
memory_type="reflection",
importance_max=0.6,
batch_size=5
)
print(f"Consolidated {len(results)} reflection batches")
# Run cleanup
cleanup_old_memories()
Rollback Incorrect Consolidation¶
# Consolidate memories
result = nx.ace.consolidation_engine.consolidate_similar_memories(
memory_ids=["mem_1", "mem_2", "mem_3"]
)
consolidated_id = result['consolidated_memory_id']
# Later: User reports loss of important detail
# Rollback the consolidation
if nx.ace.consolidation_engine.rollback_consolidation(consolidated_id):
print("✓ Rollback successful, original memories restored")
else:
print("✗ Rollback failed")
CLI Commands¶
Consolidate Memories¶
# Basic consolidation
nexus memory consolidate \
--type experience \
--threshold 0.5 \
--json
# Dry run to preview
nexus memory consolidate \
--type experience \
--threshold 0.5 \
--dry-run
# With filters
nexus memory consolidate \
--type observation \
--threshold 0.6 \
--json
Best Practices¶
1. Conservative Importance Thresholds¶
Start with low thresholds and increase gradually:
# ✓ Good: Conservative start
consolidate_by_criteria(importance_max=0.3) # Very low importance only
# ⚠️ Risky: Too aggressive
consolidate_by_criteria(importance_max=0.8) # May lose important details
2. Age-Based Protection¶
Protect recent memories:
# Only consolidate memories >30 days old
consolidate_by_criteria(
importance_max=0.5,
min_age_days=30 # Keep recent memories fresh
)
3. Batch Size Tuning¶
Balance between efficiency and quality:
# Small batches: Better semantic coherence
consolidate_by_criteria(batch_size=5)
# Large batches: More efficient but less coherent
consolidate_by_criteria(batch_size=50)
# Recommended: 10-20
consolidate_by_criteria(batch_size=15)
4. Regular Monitoring¶
Track consolidation metrics:
def consolidate_with_metrics():
"""Consolidation with monitoring."""
before_count = count_memories(scope="agent")
results = consolidation_engine.consolidate_by_criteria(
importance_max=0.5,
batch_size=10
)
after_count = count_memories(scope="agent")
reduction = before_count - after_count
print(f"Memories: {before_count} → {after_count} (-{reduction})")
print(f"Reduction: {reduction / before_count:.1%}")
consolidate_with_metrics()
5. Type-Specific Strategies¶
Different consolidation for different types:
# Aggressive for observations (transient)
consolidate_by_criteria(
memory_type="observation",
importance_max=0.6,
batch_size=20
)
# Conservative for facts (persistent)
consolidate_by_criteria(
memory_type="fact",
importance_max=0.3,
batch_size=5
)
# Never consolidate preferences
# (Don't run consolidation on preference type)
Performance Considerations¶
Batch Processing¶
Process in batches for efficiency:
# Process in chunks
total_candidates = get_consolidation_candidates(
importance_max=0.5,
limit=1000
)
for i in range(0, len(total_candidates), 10):
batch = total_candidates[i:i+10]
consolidate_similar_memories([m['memory_id'] for m in batch])
time.sleep(0.1) # Rate limiting
Async Processing¶
Use async for background consolidation:
import asyncio
async def async_consolidate():
"""Async consolidation job."""
results = await consolidation_engine.consolidate_async(
importance_max=0.5,
batch_size=10
)
return results
# Run in background
asyncio.create_task(async_consolidate())
Caching¶
Cache similarity calculations:
Consolidation Lineage¶
Track consolidation history:
# After consolidation
result = consolidate_similar_memories(
memory_ids=["mem_1", "mem_2", "mem_3"]
)
# Lineage tracked automatically
consolidated_memory = nx.memory.get(result['consolidated_memory_id'])
print(f"Consolidated from: {consolidated_memory['consolidated_from']}")
# Output: ["mem_1", "mem_2", "mem_3"]
# Can rollback anytime
rollback_consolidation(result['consolidated_memory_id'])
Related Documentation¶
See Also¶
- ACE Paper - Importance-based preservation
- Memory Management API - Core memory operations
- ACE Examples - Working consolidation examples