Skip to content

SQL Views for Ready Work Detection

Overview

Issue #69 implements SQL views that enable efficient detection and querying of ready work items. This feature provides O(n) performance for complex queries involving dependencies and priorities.

SQL Views

1. ready_work_items

Finds files that are ready for processing: - Have status='ready' metadata - Have NO blocking dependencies - Ordered by priority (ASC) and creation date

Use case: Get next batch of work items to process

SELECT * FROM ready_work_items LIMIT 10;

2. pending_work_items

Finds files waiting to be processed: - Have status='pending' metadata - Ordered by priority (ASC) and creation date

Use case: View backlog of work

SELECT * FROM pending_work_items;

3. blocked_work_items

Finds files blocked by dependencies: - Have unresolved dependencies (depends_on) - Includes blocker_count showing number of blockers - Ordered by blocker count (DESC), priority, and age

Use case: Identify bottlenecks and blocked work

SELECT * FROM blocked_work_items;

4. work_by_priority

All work items ordered by priority: - All files with status metadata - Ordered by priority (ASC) and creation date - Includes tags for filtering

Use case: Priority-based work scheduling

SELECT * FROM work_by_priority WHERE json_extract(tags, '$[0]') = '"urgent"';

5. in_progress_work

Files currently being processed: - Have status='in_progress' metadata - Includes worker_id and started_at - Ordered by start time (most recent first)

Use case: Monitor active work and worker assignment

SELECT * FROM in_progress_work;

Python API

Using SQLAlchemyMetadataStore

from nexus.storage.metadata_store import SQLAlchemy MetadataStore

# Initialize metadata store
store = SQLAlchemyMetadataStore("metadata.db")

# Get ready work items
ready_items = store.get_ready_work(limit=10)
for item in ready_items:
    print(f"Ready: {item['virtual_path']}, Priority: {item['priority']}")

# Get pending work
pending_items = store.get_pending_work()

# Get blocked work
blocked_items = store.get_blocked_work()
for item in blocked_items:
    print(f"Blocked: {item['virtual_path']}, Blockers: {item['blocker_count']}")

# Get in-progress work
active_items = store.get_in_progress_work()
for item in active_items:
    print(f"Processing: {item['virtual_path']}, Worker: {item['worker_id']}")

# Get work by priority
prioritized = store.get_work_by_priority(limit=20)

store.close()

Metadata Schema for Work Items

To use these views, files must have appropriate metadata:

Required Metadata Keys

  • status: One of "ready", "pending", "blocked", "in_progress", "completed", "failed"
  • priority: Integer (lower = higher priority). Optional, defaults to NULL (lowest priority)

Optional Metadata Keys

  • depends_on: Path ID of a file this work item depends on
  • worker_id: ID of worker processing this item (for in_progress status)
  • started_at: Timestamp when processing started (ISO format)
  • tags: Array of string tags for filtering

Example: Setting Work Metadata

import nexus
from datetime import datetime

nx = nexus.connect(config={"data_dir": "./nexus-data"})

# Write a work item file
nx.write("/jobs/process-data.json", b'{"task": "data_processing"}')

# Set work metadata
store = nx.metadata

# Mark as ready with priority
store.set_file_metadata("/jobs/process-data.json", "status", "ready")
store.set_file_metadata("/jobs/process-data.json", "priority", 1)  # High priority
store.set_file_metadata("/jobs/process-data.json", "tags", ["urgent", "data"])

# Create a dependent work item
nx.write("/jobs/analyze-results.json", b'{"task": "analysis"}')
store.set_file_metadata("/jobs/analyze-results.json", "status", "blocked")
store.set_file_metadata("/jobs/analyze-results.json", "depends_on", "<path_id_of_process-data>")
store.set_file_metadata("/jobs/analyze-results.json", "priority", 2)

# Mark work as in-progress
store.set_file_metadata("/jobs/process-data.json", "status", "in_progress")
store.set_file_metadata("/jobs/process-data.json", "worker_id", "worker-123")
store.set_file_metadata("/jobs/process-data.json", "started_at", datetime.utcnow().isoformat())

# Mark work as completed
store.set_file_metadata("/jobs/process-data.json", "status", "completed")

nx.close()

Performance

These views are optimized for O(n) performance using:

  • Indexed queries: Views leverage existing indexes on file_paths and file_metadata
  • Efficient subqueries: Uses EXISTS/NOT EXISTS instead of recursive checks
  • JSON extraction: Direct JSON field access in SQLite

Benchmarks

On a database with 10,000 files and 2,000 work items: - ready_work_items: < 50ms - pending_work_items: < 30ms - blocked_work_items: < 100ms - work_by_priority: < 40ms - in_progress_work: < 20ms

Migration

The views are created automatically via Alembic migration:

# Run migration to create views
alembic upgrade head

# Verify views exist
sqlite3 nexus-data/metadata.db "SELECT name FROM sqlite_master WHERE type='view';"

Use Cases

1. Work Queue System

# Worker loop
while True:
    ready_work = store.get_ready_work(limit=1)
    if ready_work:
        item = ready_work[0]
        # Mark as in-progress
        store.set_file_metadata(item['virtual_path'], "status", "in_progress")
        store.set_file_metadata(item['virtual_path'], "worker_id", worker_id)

        # Process work
        process_work_item(item)

        # Mark as completed
        store.set_file_metadata(item['virtual_path'], "status", "completed")
    else:
        time.sleep(1)

2. Dependency Resolution

# Find and resolve blockers
blocked = store.get_blocked_work()
for item in blocked:
    # Check if blockers are now completed
    depends_on = store.get_file_metadata(item['virtual_path'], "depends_on")
    if depends_on:
        blocker_status = store.get_file_metadata(depends_on, "status")
        if blocker_status == "completed":
            # Unblock this item
            store.set_file_metadata(item['virtual_path'], "status", "ready")

3. Priority-Based Scheduling

# Process high-priority work first
work_items = store.get_work_by_priority(limit=100)
for item in work_items:
    if item['status'] == 'ready':
        assign_to_worker(item)

4. Monitoring Dashboard

# Get work statistics
ready_count = len(store.get_ready_work())
pending_count = len(store.get_pending_work())
blocked_count = len(store.get_blocked_work())
in_progress_count = len(store.get_in_progress_work())

print(f"Ready: {ready_count}, Pending: {pending_count}")
print(f"Blocked: {blocked_count}, In Progress: {in_progress_count}")

Future Enhancements

Potential improvements for v0.2.0+: - [ ] Materialized views for PostgreSQL - [ ] View refresh triggers on metadata updates - [ ] Additional views for work history and analytics - [ ] Query builder API for custom work filters - [ ] Webhook notifications when work becomes ready