SQL Views for Ready Work Detection¶
Overview¶
Issue #69 implements SQL views that enable efficient detection and querying of ready work items. This feature provides O(n) performance for complex queries involving dependencies and priorities.
SQL Views¶
1. ready_work_items¶
Finds files that are ready for processing: - Have status='ready' metadata - Have NO blocking dependencies - Ordered by priority (ASC) and creation date
Use case: Get next batch of work items to process
2. pending_work_items¶
Finds files waiting to be processed: - Have status='pending' metadata - Ordered by priority (ASC) and creation date
Use case: View backlog of work
3. blocked_work_items¶
Finds files blocked by dependencies: - Have unresolved dependencies (depends_on) - Includes blocker_count showing number of blockers - Ordered by blocker count (DESC), priority, and age
Use case: Identify bottlenecks and blocked work
4. work_by_priority¶
All work items ordered by priority: - All files with status metadata - Ordered by priority (ASC) and creation date - Includes tags for filtering
Use case: Priority-based work scheduling
5. in_progress_work¶
Files currently being processed: - Have status='in_progress' metadata - Includes worker_id and started_at - Ordered by start time (most recent first)
Use case: Monitor active work and worker assignment
Python API¶
Using SQLAlchemyMetadataStore¶
from nexus.storage.metadata_store import SQLAlchemy MetadataStore
# Initialize metadata store
store = SQLAlchemyMetadataStore("metadata.db")
# Get ready work items
ready_items = store.get_ready_work(limit=10)
for item in ready_items:
print(f"Ready: {item['virtual_path']}, Priority: {item['priority']}")
# Get pending work
pending_items = store.get_pending_work()
# Get blocked work
blocked_items = store.get_blocked_work()
for item in blocked_items:
print(f"Blocked: {item['virtual_path']}, Blockers: {item['blocker_count']}")
# Get in-progress work
active_items = store.get_in_progress_work()
for item in active_items:
print(f"Processing: {item['virtual_path']}, Worker: {item['worker_id']}")
# Get work by priority
prioritized = store.get_work_by_priority(limit=20)
store.close()
Metadata Schema for Work Items¶
To use these views, files must have appropriate metadata:
Required Metadata Keys¶
- status: One of
"ready","pending","blocked","in_progress","completed","failed" - priority: Integer (lower = higher priority). Optional, defaults to NULL (lowest priority)
Optional Metadata Keys¶
- depends_on: Path ID of a file this work item depends on
- worker_id: ID of worker processing this item (for in_progress status)
- started_at: Timestamp when processing started (ISO format)
- tags: Array of string tags for filtering
Example: Setting Work Metadata¶
import nexus
from datetime import datetime
nx = nexus.connect(config={"data_dir": "./nexus-data"})
# Write a work item file
nx.write("/jobs/process-data.json", b'{"task": "data_processing"}')
# Set work metadata
store = nx.metadata
# Mark as ready with priority
store.set_file_metadata("/jobs/process-data.json", "status", "ready")
store.set_file_metadata("/jobs/process-data.json", "priority", 1) # High priority
store.set_file_metadata("/jobs/process-data.json", "tags", ["urgent", "data"])
# Create a dependent work item
nx.write("/jobs/analyze-results.json", b'{"task": "analysis"}')
store.set_file_metadata("/jobs/analyze-results.json", "status", "blocked")
store.set_file_metadata("/jobs/analyze-results.json", "depends_on", "<path_id_of_process-data>")
store.set_file_metadata("/jobs/analyze-results.json", "priority", 2)
# Mark work as in-progress
store.set_file_metadata("/jobs/process-data.json", "status", "in_progress")
store.set_file_metadata("/jobs/process-data.json", "worker_id", "worker-123")
store.set_file_metadata("/jobs/process-data.json", "started_at", datetime.utcnow().isoformat())
# Mark work as completed
store.set_file_metadata("/jobs/process-data.json", "status", "completed")
nx.close()
Performance¶
These views are optimized for O(n) performance using:
- Indexed queries: Views leverage existing indexes on
file_pathsandfile_metadata - Efficient subqueries: Uses EXISTS/NOT EXISTS instead of recursive checks
- JSON extraction: Direct JSON field access in SQLite
Benchmarks¶
On a database with 10,000 files and 2,000 work items: - ready_work_items: < 50ms - pending_work_items: < 30ms - blocked_work_items: < 100ms - work_by_priority: < 40ms - in_progress_work: < 20ms
Migration¶
The views are created automatically via Alembic migration:
# Run migration to create views
alembic upgrade head
# Verify views exist
sqlite3 nexus-data/metadata.db "SELECT name FROM sqlite_master WHERE type='view';"
Use Cases¶
1. Work Queue System¶
# Worker loop
while True:
ready_work = store.get_ready_work(limit=1)
if ready_work:
item = ready_work[0]
# Mark as in-progress
store.set_file_metadata(item['virtual_path'], "status", "in_progress")
store.set_file_metadata(item['virtual_path'], "worker_id", worker_id)
# Process work
process_work_item(item)
# Mark as completed
store.set_file_metadata(item['virtual_path'], "status", "completed")
else:
time.sleep(1)
2. Dependency Resolution¶
# Find and resolve blockers
blocked = store.get_blocked_work()
for item in blocked:
# Check if blockers are now completed
depends_on = store.get_file_metadata(item['virtual_path'], "depends_on")
if depends_on:
blocker_status = store.get_file_metadata(depends_on, "status")
if blocker_status == "completed":
# Unblock this item
store.set_file_metadata(item['virtual_path'], "status", "ready")
3. Priority-Based Scheduling¶
# Process high-priority work first
work_items = store.get_work_by_priority(limit=100)
for item in work_items:
if item['status'] == 'ready':
assign_to_worker(item)
4. Monitoring Dashboard¶
# Get work statistics
ready_count = len(store.get_ready_work())
pending_count = len(store.get_pending_work())
blocked_count = len(store.get_blocked_work())
in_progress_count = len(store.get_in_progress_work())
print(f"Ready: {ready_count}, Pending: {pending_count}")
print(f"Blocked: {blocked_count}, In Progress: {in_progress_count}")
Future Enhancements¶
Potential improvements for v0.2.0+: - [ ] Materialized views for PostgreSQL - [ ] View refresh triggers on metadata updates - [ ] Additional views for work history and analytics - [ ] Query builder API for custom work filters - [ ] Webhook notifications when work becomes ready
Related Documentation¶
- Architecture Document - See "SQL Views for Work Detection" section
- Database Compatibility - SQLite vs PostgreSQL
- Metadata Store API - Full API reference