Skip to main content

Connector Control & Execution Architecture

TL;DR

This stream defines the per-tenant control plane for SecurityV0: how a tenant configures one or more connector instances, how each instance produces N independently scoped, independently observable scans, and how every scan run is recorded as a first-class MongoDB document. It introduces three new collections (connector_instances, scan_scopes, scan_runs), formalises the ScanScope concept that already lives in the ingestion pipeline as the identity of a unit of scan work, and pins down the contract Streams 2/3/4 will consume. The work is a prerequisite for the MediaPro pilot (multiple AWS accounts, multiple service categories, no engineer-on-laptop dependency), for cross-connector stitching (which needs a stable instance/scope identity to anchor entity provenance), and for the Lab 2 IaC demo (which needs an up → scan → teardown API).

Problem

The platform's current connector invocation model breaks down on three independent axes:

  1. Manual laptop runs. Every scan today is an engineer running sv0-aws scan --all --submit, sv0-entra-servicenow --all --submit, or sv0-foundry --all from a laptop with credentials in .env (see sv0-connectors#78). The operational consequences enumerated in #78 — data staleness, engineer bottleneck, no SLA, credential sprawl, no multi-tenancy path — are accepted but no longer survive an enterprise pilot.
  2. Monolithic scans. A single sv0-aws scan --all walks IAM, every Lambda function in every region, every Bedrock agent, every ECS service, every S3 bucket, every Secret, then tries to extract CloudTrail evidence per Lambda — sequentially, in one process, against one set of API quotas (see integrations/aws/src/sv0_aws/cli/main.py:115-331). One throttle on cloudtrail:LookupEvents (2 TPS per region; see sv0-platform#309) bricks the whole run; one IAM permission gap on bedrock:list_agents produces a single extracted_data blob with no clean way to say "IAM succeeded, Bedrock failed". Wiz, Snyk, and Datadog all decomposed this years ago. We need scoped scans whose failure modes are independent.
  3. No DB-level scan history. The 2026-04-20 Foundry demo session (see docs/session-notes/2026-04-20-foundry-demo-resolution-session-handoff.md) reconstructed "when did the Foundry connector last run, and what failed" by scraping Sergey's terminal and Cloudflare access logs. The platform has connector_syncs records (good) but they're keyed on connector_type, not on (instance × scope). An operator cannot answer "when did we last successfully scan tenant X's staging AWS account's IAM service category" because the data model doesn't represent it.

For MediaPro (target early May 2026, see sv0-documentation#195), this combination is the gating risk. MediaPro will provide multiple AWS accounts and we cannot ask Ezequiel's IAM team for a 4-hour read window on each. We need many small scans, scheduled, observable, with per-scope isolation.

Current state

What exists today.

  • A connector outputs a NormalizedGraph and POSTs it to /api/v1/ingest/normalized-graph (see src/api/routes/ingest.ts). The platform validates, dedupes by syncId, and enqueues sync_ingestion (see src/workers/runtime.ts).
  • ScanScope already exists in the ingestion-side type system (src/ingestion/types.ts:92) as { mode, sourceSystems[], scannedEntityTypes[], errors }. It controls deletion detection inside the diff engine and circuit breaker (02-processing-pipeline.md §3.2-3.5). It is declared by the connector at submission time, but is not persisted, scheduled, or addressable.
  • connector_syncs collection records per-submission run metadata (src/domain/syncs/types.ts): _id, tenant_id, connector_type, started_at, completed_at, status, sync_mode, metrics, error, cursor_state_after. There is no link to instance, no link to scope, no schedule reference, no per-category breakdown.
  • tenant_configs carries an opaque connector_credential_refs: Record<string, string> (see src/domain/tenants/types.ts:24). This is a single string per connector kind per tenant — assumes one instance per kind.
  • The worker runtime is a single-process in-memory FIFO (src/workers/runtime.ts:25). It runs sync_ingestion, evaluate_findings, build_evidence_pack, generate_report. It does not run scheduled triggers, scan execution, or anything outbound.

What is missing.

  • A persistent representation of "connector instance" — i.e., a tenant's specific AWS organisation, ServiceNow instance, Foundry project — with credentials, source-system targets, and lifecycle.
  • A persistent representation of "scoped scan" — a unit of work smaller than "the whole AWS account" — with a stable identity that scan history hangs off.
  • A scheduler that decides "it's time to run scope S of instance I now."
  • Connector CLIs that accept --instance-id and --scope-id and emit a NormalizedGraph whose scanScope matches.
  • An API call Stream 4 (Lab 2 IaC) can hit to say "this scope corresponds to ephemeral terraform state ID X; trigger a scan now and tell me when done."

Constraints.

  • MongoDB-only storage (per 03-database.md).
  • Deterministic logic only — no probabilistic scheduling decisions, no ML.
  • Read-only connectors — scheduler and execution layer never write back to source systems.
  • TypeScript ESM Node 24 in sv0-platform; Python 3.11+ uv-managed in sv0-connectors.
  • The in-process worker queue in src/workers/runtime.ts is what we have. Don't introduce Redis, RabbitMQ, or Temporal in this stream — extend the existing runtime.

Comparison: how peers do this

Wiz. Wiz models "Cloud Account" as a first-class object inside a tenant. Each cloud account has its own connector status, its own last-scan timestamp, and its own scoped scans (Wiz scans IAM, compute, storage, container images, and secrets as independent scan jobs per account; the UI shows a per-resource-type "last scanned" timestamp). Their multi-account onboarding flow is essentially "one cross-account IAM role per AWS account, one API call per service category." This is the model we are echoing.

Snyk. Snyk's "Integration" object is the closest analogue to our ConnectorInstance: per-org, per-tenant, with credentials stored as opaque references and a status surface visible in the org settings UI. Snyk separates "Integration" (the connection) from "Project" (the scoped unit of scan); a single GitHub integration produces N projects (one per repo), each scanned on its own cadence. The split is exactly the (ConnectorInstance, ScanScope) split we propose, with their "Project" being our "ScanScope".

Datadog. Datadog's CSPM and CIEM products separate "Cloud Account" (instance) from "Resource Type" (service category) and surface scan health per pair: e.g., "AWS account 123456: IAM scan healthy, Lambda scan healthy, ECS scan failing — last run 14m ago." Their cadences are configurable per account and per resource type. Their UI is the long-term target for ours; for now we mirror only the data shape, not the UX.

Common pattern. All three split (a) the connection identity and credentials from (b) the unit-of-scan-work, and persist scan run history at (b)'s grain. None of them treat a tenant's whole cloud as one scan target. None of them schedule globally — scheduling is per (instance, scope) pair. This validates our design direction.

Design proposal

Data model — new collections

Three new MongoDB collections live alongside the existing 14 (per 03-database.md). All carry tenant_id as the leading compound-index field. None duplicate state already in entities, findings, or connector_syncs — they are control-plane metadata.

connector_instances

Per-tenant connector connection. One tenant may have N instances of the same connector kind (two AWS organisations, three ServiceNow instances, etc.).

{
_id: "ci-uuid", // Stable, deterministic from tenant_id + kind + display key
tenant_id: "uuid-...",
connector_kind: "aws", // aws | entra_servicenow | azure_foundry | servicenow | …
display_name: "MediaPro production AWS org", // Human label, shown in scan history
status: "active", // active | paused | error | onboarding

// Credentials are stored OUTSIDE this collection.
// credentials_ref is an opaque pointer to whatever vault we end up using (1Password
// for Phase 1, AWS Secrets Manager / Azure Key Vault later).
credentials_ref: {
provider: "op", // op | aws_secretsmanager | azure_keyvault | env (dev only)
ref: "op://sv0-connectors/mediapro-aws/credentials"
},

// Source-system targets this instance can scan. Stream 2 (multi-account AWS) defines
// the AWS-specific shape. The control plane treats this as opaque structured data
// and passes it through to the connector CLI via the scope.
source_system_targets: [
{ kind: "aws_account", account_id: "111111111111", role_arn: "arn:aws:iam::111111111111:role/sv0-reader", external_id: "..." },
{ kind: "aws_account", account_id: "222222222222", role_arn: "arn:aws:iam::222222222222:role/sv0-reader", external_id: "..." }
],

// Discovery hints — connector reports back here after first run. Lets the scheduler
// know "you have N targets and M categories; you might want N×M scopes."
discovered_capabilities: {
accounts: 2,
regions: ["us-east-1", "eu-west-1"],
service_categories_available: ["iam", "lambda", "bedrock", "s3", "ecs", "secrets", "cloudtrail"],
last_discovered_at: ISODate("2026-04-22T15:00:00Z")
},

// Lifecycle
created_at: ISODate(...),
created_by_user_id: ObjectId(...), // The user (or "system") who provisioned this
updated_at: ISODate(...),
archived_at: null
}

Rationale. This separates "what tenant X uses to talk to source system Y instance Z" from credentials (which rotate independently) and from scope (which is finer-grained). discovered_capabilities is the bridge that lets the scheduler suggest reasonable defaults without hard-coding "AWS has 11 service categories" into the platform.

Indexes.

db.connector_instances.createIndex({ tenant_id: 1, connector_kind: 1, status: 1 });
db.connector_instances.createIndex({ tenant_id: 1, _id: 1 }, { unique: true });

Tenant scoping. Every query carries tenant_id. Cross-tenant access is impossible at the storage-adapter layer (per the existing platform contract).

Integration with existing collections. connector_instances._id becomes the new connector_id value emitted in NormalizedGraph.connectorId and threaded through connector_syncs.connector_type (we will rename that field to connector_instance_id in a non-breaking way; see Phase 4).

scan_scopes

A scan scope is the unit of work the scheduler executes. One ConnectorInstance has 1..N scan scopes. Two scopes never scan overlapping data; categories within a scope are atomic-on-failure together.

{
_id: "ss-uuid", // Deterministic from instance_id + scope_keys hash
tenant_id: "uuid-...",
instance_id: "ci-uuid",
connector_kind: "aws",
display_name: "AWS prod / IAM", // Human label

// Connector-specific scope keys. Opaque to the platform; meaningful to the connector.
// Stream 2 (multi-account AWS) defines the AWS shape; an Entra example would be
// { scope_keys: { entra_tenant_id: "..." } }; ServiceNow would be
// { scope_keys: { instance_url: "https://corp.service-now.com" } }.
scope_keys: {
// AWS-specific keys (defined by Stream 2). Always plural arrays even when length 1.
// A single AWS scope addresses 1..N accounts × 1..N regions; the array form is the
// canonical shape so Stream 1's diff engine and Stream 2's per-(account×category)
// executor agree on cardinality.
account_ids: ["111111111111"],
regions: ["us-east-1"]
},

// Service categories included in this scope. These map 1:1 to independently-failing units
// of work inside the connector. Each category emits its own per-category status in the ScanRun.
// For AWS, these match Stream 2's category list (iam, lambda, bedrock, s3, ecs, ecr,
// dynamodb, sns, secrets, stepfunctions, eventbridge, cloudtrail).
service_categories: ["iam", "lambda", "cloudtrail"],

// Scheduling — see Scheduling section. nullable means "manual trigger only."
schedule: {
cadence: "interval", // interval | cron | manual
interval_seconds: 3600, // Required if cadence=interval
cron_expression: null, // Required if cadence=cron, e.g. "0 */6 * * *"
timezone: "UTC",
next_run_at: ISODate("2026-04-22T16:00:00Z"),
last_run_at: ISODate("2026-04-22T15:00:00Z")
},

// Budget / safety knobs the scheduler enforces.
budget: {
max_runtime_seconds: 1800, // Hard cap; scan_runs for this scope go to status=timeout if exceeded
max_concurrent_runs: 1, // Per-scope concurrency cap (default 1)
cooldown_after_failure_seconds: 600 // After a failed run, don't reschedule sooner than this
},

// Lifecycle
status: "active", // active | paused | archived
created_at: ISODate(...),
updated_at: ISODate(...)
}

Rationale. This is the single most important new object. It encodes the user's "Wiz scans IAM separately from Bedrock separately from ECS" insight as schema. Putting service_categories on the scope (not on the instance) lets a tenant decide "scope A is IAM only, hourly; scope B is Bedrock + Lambda, every 6 hours; scope C is CloudTrail evidence, nightly."

Indexes.

db.scan_scopes.createIndex({ tenant_id: 1, instance_id: 1, status: 1 });
db.scan_scopes.createIndex({ status: 1, "schedule.next_run_at": 1 }); // Scheduler hot path — cluster-wide scan
db.scan_scopes.createIndex({ tenant_id: 1, connector_kind: 1 });

The second index is intentionally NOT prefixed by tenant_id because the scheduler runs cluster-wide, not per-tenant. The query is { status: "active", "schedule.next_run_at": { $lte: now } }.

scan_runs

One document per execution attempt. This is the operator-facing scan history.

{
_id: "sr-uuid",
tenant_id: "uuid-...",
instance_id: "ci-uuid",
scope_id: "ss-uuid",
sync_id: "uuid-...", // Links to connector_syncs once the connector POSTs
// a NormalizedGraph back. Null until then.

// Captured at run-creation time. Lets us reconstruct what scope was used even if the
// ScanScope doc was edited mid-flight.
scope_snapshot: {
connector_kind: "aws",
scope_keys: { account_ids: ["111111111111"], regions: ["us-east-1"] },
service_categories: ["iam", "lambda", "cloudtrail"]
},

// Trigger provenance.
trigger: {
type: "scheduled", // scheduled | manual_api | iac_lifecycle | retry
triggered_by_user_id: null, // Set if type=manual_api
triggered_by_run_id: null, // Set if type=retry (parent run)
iac_correlation_id: null // Set if type=iac_lifecycle (Stream 4)
},

started_at: ISODate("2026-04-22T15:00:00Z"),
ended_at: ISODate("2026-04-22T15:04:12Z"),
status: "partial", // running | succeeded | partial | failed | timeout | cancelled

// Per-category outcomes. This is the field that lets an analyst answer "did Lambda
// scan succeed for this scope?" without scraping logs.
category_results: {
iam: {
status: "succeeded",
items_scanned: 187,
started_at: ISODate("2026-04-22T15:00:01Z"),
ended_at: ISODate("2026-04-22T15:00:48Z"),
errors: []
},
lambda: {
status: "succeeded",
items_scanned: 42,
started_at: ISODate("2026-04-22T15:00:48Z"),
ended_at: ISODate("2026-04-22T15:02:11Z"),
errors: []
},
cloudtrail: {
status: "failed",
items_scanned: 0,
started_at: ISODate("2026-04-22T15:02:11Z"),
ended_at: ISODate("2026-04-22T15:04:12Z"),
errors: [
{
category: "rate_limit", // auth | rate_limit | api_error | data_error | timeout
code: "ThrottlingException",
message: "cloudtrail:LookupEvents throttled at 2 TPS, retries exhausted",
retryable: true,
occurred_at: ISODate("2026-04-22T15:04:11Z")
}
]
}
},

// Aggregated counts for quick rendering.
totals: {
items_scanned: 229,
errors: 1,
categories_succeeded: 2,
categories_failed: 1
},

// Reference back into the existing pipeline.
emitted_report_id: null, // Reserved for future ConnectorReport linkage
sync_id: "uuid-..." // Repeated here for join convenience
}

Rationale. category_results is a map keyed by service category name. It is intentionally NOT a separate collection: a scan run is a unit of operator interest, and one Mongo round-trip should give an analyst the full picture. At MVP scale (≤20 categories per scope) this is well under document size limits.

Status semantics:

statusmeaning
runningConnector started, has not reported back
succeededAll categories succeeded
partialAt least one category succeeded, at least one failed (per-category isolation worked)
failedAll categories failed, OR the connector failed before any category started
timeoutExceeded budget.max_runtime_seconds
cancelledOperator-initiated cancellation

Indexes.

db.scan_runs.createIndex({ tenant_id: 1, scope_id: 1, started_at: -1 });
db.scan_runs.createIndex({ tenant_id: 1, instance_id: 1, started_at: -1 });
db.scan_runs.createIndex({ tenant_id: 1, status: 1, started_at: -1 });
db.scan_runs.createIndex({ status: 1, started_at: 1 }); // For stale-run reaper
db.scan_runs.createIndex({ sync_id: 1 }, { sparse: true }); // Join to connector_syncs
db.scan_runs.createIndex({ "trigger.iac_correlation_id": 1 }, { sparse: true }); // Stream 4

Integration with existing 14 collections.

  • connector_syncs (existing) keeps its current shape but its connector_type field gains a sibling connector_instance_id. Existing code that reads connector_type continues to work; new code reads connector_instance_id. scan_run.sync_id is the join key.
  • entities and all downstream collections gain no new fields; they continue to be tagged with source_connector_id per the cross-connector merge logic in src/workers/handlers/sync-ingestion.ts:26-37. source_connector_id will become the connector_instance_id, giving Stream 3 (graph stitching) a stable per-instance identity to anchor merges on.
  • tenant_configs.connector_credential_refs is deprecated for new tenants in favour of connector_instances.credentials_ref. Existing entries stay readable until migration.

Scoped scan execution

How a single ConnectorInstance produces N independent ScanRuns

The control plane is the source of truth for "what scans should happen." When the scheduler decides scope ss-A is due, it:

  1. Atomically claims the scope: findOneAndUpdate({ _id: "ss-A", "schedule.next_run_at": { $lte: now } }, { $set: { "schedule.next_run_at": now + interval } }). If the find returns null, another scheduler (or test) already claimed it.
  2. Inserts a scan_runs document with status: running, capturing scope_snapshot and trigger.
  3. Enqueues a new worker job type execute_scan with { run_id, instance_id, scope_id }.
  4. The worker handler (new file src/workers/handlers/execute-scan.ts) reads the instance's credentials_ref, resolves it via the credential broker, and invokes the connector via a Connector Driver (see below).
  5. The connector executes one category at a time, reporting per-category outcome back via the Driver.
  6. On completion, the handler updates scan_runs.category_results, totals, status, ended_at. If at least one category produced data, the connector POSTs a NormalizedGraph with the existing pipeline, and the resulting sync_id is written back to scan_runs.sync_id.

Per-category failure isolation

Each category is its own try/except boundary inside the connector. The contract:

  • A category that fails records category_results[X].status = "failed" and a structured errors[] entry, but does not raise out of the connector main loop.
  • The category that succeeded still produces nodes/edges; the NormalizedGraph includes whatever was extracted.
  • ScanScope.mode is set to targeted and scannedEntityTypes reflects the categories that actually succeeded (so the existing diff engine and circuit breaker — 02-processing-pipeline.md §3.2-3.5 — only consider deletion within the successful scope, never inferring "Lambda is gone" from "Lambda scan failed").

This is a design constraint on connectors, not a platform-side enforcement. We add a fixture-based test in sv0-connectors that asserts: "if extractor X raises, the resulting payload still contains entities from extractor Y."

Concurrency / parallelism rules

  • ScanScope.budget.max_concurrent_runs defaults to 1. The scheduler's atomic claim (step 1 above) plus a check count(scan_runs where scope_id=X and status="running") < max_concurrent_runs enforces this.
  • Two scopes for the same instance MAY run concurrently. Two scopes for different instances MAY run concurrently. The platform places no global cap; per-tenant caps live in tenant_configs.feature_flags (e.g., "max_concurrent_scans_per_tenant": 4) for noisy-neighbour control.
  • The existing WorkerRuntime is single-threaded (src/workers/runtime.ts:30). For Phase 1, that means execute_scan jobs run serially within one platform process — fine for MVP because the actual work is happening in subprocesses (or out-of-band — see Connector Driver). For multi-tenant prod we revisit when the queue depth alert (P2 in 02-processing-pipeline.md §10) starts firing.

How a scan posts results back to the platform

Two options for the Driver, decided in Open Question #1:

Option A — In-process Driver. Worker handler shells out to the Python connector CLI as a subprocess, captures the NormalizedGraph JSON from stdout, and POSTs it to /api/v1/ingest/normalized-graph over loopback. Connector code remains unchanged. Pros: zero migration. Cons: same-host coupling, hard to scale connectors independently.

Option B — Side-car Driver. Connector runs in its own container (per sv0-platform#247), takes --instance-id --scope-id --run-id arguments and calls back to a new platform endpoint POST /api/v1/scan-runs/:id/category per category and POST /api/v1/scan-runs/:id/complete at end. Pros: clean separation, can run on a different host. Cons: needs more new endpoints.

Recommendation: Option A for Phase 1, Option B as Phase 5 (out of this stream's scope). Option A unblocks MediaPro without rewriting the connector framework. The Driver interface is the same in both, so the migration is mechanical when we're ready.

The new ingest path is unchanged for either: connectors POST to /api/v1/ingest/normalized-graph with NormalizedGraph.scanScope populated from the scope. The diff engine, circuit breaker, materialisers, and finding evaluators all continue to work without modification.

Scheduling

Mechanism: in-process scheduler tick inside the existing platform. A new src/workers/scheduler.ts module runs setInterval(tick, 30_000) inside the platform process and:

  1. Queries db.scan_scopes.find({ status: "active", "schedule.next_run_at": { $lte: new Date() } }).limit(50).
  2. For each scope, tries the atomic claim from §"How a single ConnectorInstance produces N independent ScanRuns".
  3. Enqueues execute_scan for each successfully claimed scope.

Why in-process and not GHA / external cron.

  • All other production state lives in MongoDB; scheduler state should too (matches sv0-connectors#78 decision 6.A — "Use platform's events/jobs collection").
  • 30-second tick granularity is sufficient — minimum scan cadence is much higher (5+ minutes).
  • Adding GitHub Actions or Kubernetes CronJobs introduces ops overhead and a second source of truth.
  • When we move to multi-process platform deploy, the scheduler's atomic findOneAndUpdate claim is the leader-election mechanism — no separate coordination layer needed.

Where scheduler state lives. scan_scopes.schedule.next_run_at is the canonical "when next." scan_scopes.schedule.last_run_at is updated by the worker on completion. No separate scheduler-state collection.

Catch-up behaviour. If the scheduler was down and a scope's next_run_at is hours in the past:

  • We do not fire N runs to catch up. We fire one run, set next_run_at = now + interval. Connectors are full-state extractors, not delta-event consumers — replaying missed runs is meaningless (and burns API quota).
  • We do log a scheduler.catch_up_skipped warning with delay_seconds so operators see the gap in metrics.

Manual trigger API. POST /api/v1/scan-runs with body { scope_id, trigger: { type: "manual_api" } }:

  • Validates the user has write access to the tenant.
  • Bypasses next_run_at (manual triggers ignore schedule).
  • Still respects max_concurrent_runs — returns 409 Conflict if a run for that scope is already running.
  • Returns 202 Accepted with the new run_id.

Configuration (admin/CLI today, tenant self-service later)

A new CLI in sv0-platform/scripts/connector-instance-cli.ts provides:

# Create instance
npx tsx scripts/connector-instance-cli.ts instance create \
--tenant mediapro \
--kind aws \
--display-name "MediaPro production AWS org" \
--credentials-ref "op://sv0-connectors/mediapro-aws/credentials" \
--target '{"kind":"aws_account","account_id":"111111111111","role_arn":"arn:aws:iam::111111111111:role/sv0-reader","external_id":"abc"}'

# Create scope
npx tsx scripts/connector-instance-cli.ts scope create \
--tenant mediapro \
--instance ci-uuid-prod \
--display-name "AWS prod / IAM hourly" \
--scope-keys '{"account_ids":["111111111111"],"regions":["us-east-1"]}' \
--categories iam \
--cadence interval --interval-seconds 3600

# Trigger a manual run
npx tsx scripts/connector-instance-cli.ts run trigger --scope ss-uuid

# Show scan history for a scope
npx tsx scripts/connector-instance-cli.ts run list --scope ss-uuid --limit 10

The CLI writes directly via the StorageAdapter (no API round-trip needed for ops). The future self-service UI is a thin layer over the same StorageAdapter methods (Open Question #5).

Operational behaviour

Scan history view (per scope). An analyst sees these five fields per row:

  1. started_at (relative: "12m ago")
  2. status (running / succeeded / partial / failed / timeout)
  3. Per-category sparkline: iam ✓ lambda ✓ cloudtrail ✗
  4. totals.items_scanned
  5. Top error message (from the first failed category) — truncated to 80 chars, full message on hover

Error categorisation. Every error in category_results[X].errors[] carries a category field with a closed enum: auth | rate_limit | api_error | data_error | timeout. This drives:

  • Auth errors → page the tenant's connector owner (rotate creds), do not retry.
  • Rate-limit errors → backoff via cooldown_after_failure_seconds, then retry.
  • API errors (5xx) → retry once on next scheduled tick.
  • Data errors (parse / schema mismatch) → do not retry, file a P1 alert (it's a connector bug).
  • Timeout → reduce scope (operator action: split categories into separate scopes).

Partial-success semantics. A partial run is treated as a successful sync for the categories that succeeded. The NormalizedGraph is submitted with scanScope.scannedEntityTypes reflecting only successful categories, so the existing circuit breaker (02-processing-pipeline.md §3.3) does not flag missing data from the failed categories as deletion. The scan_runs document is the operator's truth source for what failed; the pipeline's connector_syncs.metrics reflects what was actually ingested.

Tenant isolation. All collections leading-key on tenant_id. The credential broker resolves credentials_ref only after validating the requesting worker's tenant context. Two scopes from different tenants cannot collide on _id (UUIDv4) and the storage adapter never accepts a query without tenant scoping.

Migration / backward compat

  • Existing manual scans keep working. A direct POST to /api/v1/ingest/normalized-graph with scanScope populated still ingests fine; it simply does not produce a scan_runs document. Engineers running sv0-aws scan --all --submit from a laptop see the same behaviour they do today.
  • connector_syncs is unchanged. New code reads connector_instance_id when present, falls back to connector_type. We add connector_instance_id as a sibling field, not a replacement.
  • tenant_configs.connector_credential_refs is read-only legacy. Migration script (Phase 4) walks each tenant, creates one ConnectorInstance per legacy entry, and copies the cred ref. Old entries stay until the next major version.
  • ConnectorReport (the alternate ingest path used by entra-servicenow and azure-foundry) is unchanged in this stream. It continues to work; future work can wrap it in the same scope/run lifecycle.

Interface to other streams

  • Stream 2 (multi-account AWS): Defines the AWS-specific shape of ConnectorInstance.source_system_targets[].kind = "aws_account" and ScanScope.scope_keys = { account_ids: string[], regions: string[] } (always plural arrays). Defines the canonical list of AWS service categories for ScanScope.service_categories (which lives outside scope_keys, at the top of the ScanScope document, validated against ConnectorInstance.discovered_capabilities.service_categories_available). Implements per-category extractors that respect the failure-isolation contract above.
  • Stream 3 (cross-connector graph stitching): Consumes connector_instance_id (= _id of connector_instances) as the stable provenance tag on every entity and relationship. The cross-connector merge logic in src/workers/handlers/sync-ingestion.ts:26-37 already keys on source_connector_id — that field becomes the instance ID, giving Stream 3 a stable per-(tenant, source-system, instance) identity to merge against.
  • Stream 4 (MediaPro Lab 2 IaC demo): Consumes the up→scan→teardown contract: POST /api/v1/connector-instances (up), POST /api/v1/scan-runs?scope_id=X&trigger.iac_correlation_id=Y then poll GET /api/v1/scan-runs/:id until status is terminal (scan), DELETE /api/v1/connector-instances/:id (teardown). The iac_correlation_id field on the run is how Stream 4's terraform state ID flows through scan history for post-demo audit.

Implementation plan

This section follows writing-plans skill conventions: each task is bite-sized (2-5 minutes of focused work), names exact files and exact commands, and ends in a TDD cycle plus a commit. Tasks are grouped into four phases that produce working, testable increments.

Repo legend: (P) = sv0-platform, (C) = sv0-connectors.

Phase 1: Data model + storage

Task 1.1 (P) — Add domain types for new collections.

  • File: src/domain/connector-instances/types.ts (new), src/domain/scan-scopes/types.ts (new), src/domain/scan-runs/types.ts (new).
  • Define the three TS interfaces from §"Data model" above as ConnectorInstanceDoc, ScanScopeDoc, ScanRunDoc. Use exactOptionalPropertyTypes-safe shapes (no undefined assignments — use spread).
  • Add closed-enum as const arrays for status, trigger.type, category_results[X].errors[].category, schedule.cadence.
  • Commit: feat(domain): add ConnectorInstance, ScanScope, ScanRun types.

Task 1.2 (P) — Write failing storage-adapter tests.

  • File: test/storage/control-plane.test.ts (new).
  • Tests for insertConnectorInstance, getConnectorInstance, queryConnectorInstances, insertScanScope, claimDueScopes, insertScanRun, updateScanRunCategoryResult, completeScanRun, queryScanRuns.
  • Each test sets up a tenant, exercises one method, asserts persisted shape and tenant isolation.
  • Run: npm test -- test/storage/control-plane.test.ts and confirm all tests fail with "method not implemented."
  • Commit: test(storage): failing tests for connector-instance / scan-scope / scan-run methods.

Task 1.3 (P) — Extend StorageAdapter interface and Mongo implementation.

  • File: src/storage/storage-adapter.ts — add the 9 method signatures from Task 1.2.
  • File: src/storage/mongo/adapter.ts — implement them. Index creation in the adapter's init().
  • Run: npm test -- test/storage/control-plane.test.ts and confirm green.
  • Commit: feat(storage): implement connector-instance / scan-scope / scan-run methods.

Task 1.4 (P) — Add Mongo indexes from §"Data model".

  • File: src/storage/mongo/adapter.ts — extend createIndexes() with the eleven new indexes per the §"Indexes" enumeration specified per collection above.
  • Run: npm run test:integration -- test/integration/storage/indexes.test.ts to verify indexes are created on a fresh DB.
  • Commit: feat(storage): create indexes for control-plane collections.

Phase 1 deliverable: the three new collections exist, are queryable, are properly indexed, and have full unit-test coverage. No execution behaviour yet.

Phase 2: Scoped scan execution

Task 2.1 (P) — Define the Connector Driver interface.

  • File: src/workers/connector-driver.ts (new).
  • Export interface ConnectorDriver { run(input: ConnectorRunInput): Promise<ConnectorRunOutput>; } where ConnectorRunInput = { instance, scope, runId } and ConnectorRunOutput = { categoryResults, normalizedGraph? }.
  • No implementation yet — just the interface + a stub InProcessSubprocessDriver class with throw new Error("not implemented").
  • Commit: feat(workers): define ConnectorDriver interface.

Task 2.2 (P) — Write failing test for execute_scan handler.

  • File: test/workers/execute-scan.test.ts (new).
  • Mock the StorageAdapter and a fake ConnectorDriver that returns a canned per-category outcome (one succeeded, one failed).
  • Assert: handler creates a scan_runs doc with status running, then updates to partial with the right category_results.
  • Run: npm test -- test/workers/execute-scan.test.ts and confirm fail.
  • Commit: test(workers): failing test for execute_scan handler.

Task 2.3 (P) — Implement execute_scan handler.

  • File: src/workers/handlers/execute-scan.ts (new).
  • File: src/workers/runtime.ts — extend WorkerJobType union with "execute_scan".
  • File: src/workers/index.ts — register the new handler.
  • Handler reads scope + instance, calls driver, writes per-category results, sets terminal status, ends.
  • Run: npm test -- test/workers/execute-scan.test.ts → green.
  • Commit: feat(workers): execute_scan handler with per-category result tracking.

Task 2.4 (P) — Wire NormalizedGraph submission to scan_runs.

  • File: src/workers/handlers/execute-scan.ts — after a successful driver call, POST the resulting NormalizedGraph to the existing ingest service in-process (call IngestService.submit() directly, not over HTTP) so scan_runs.sync_id can be filled atomically.
  • Test: extend test/workers/execute-scan.test.ts to assert scan_runs.sync_id is set.
  • Commit: feat(workers): link scan_runs to connector_syncs via sync_id.

Task 2.5 (C) — Add per-category extractor wrapping in AWS connector.

  • File: integrations/aws/src/sv0_aws/cli/main.py — wrap each extractor call (lines ~163-232) in a try / except that records to a category_results dict on the AWSConnector instance, instead of letting an exception kill the whole scan.
  • File: integrations/aws/tests/test_category_isolation.py (new) — fixture that simulates IAMExtractor.extract_all_iam_entities raising; asserts category_results["iam"]["status"] == "failed" AND category_results["lambda"]["status"] == "succeeded".
  • Run: cd integrations/aws && uv run pytest tests/test_category_isolation.py → green.
  • Commit: feat(aws): per-category failure isolation in scan loop.

Task 2.6 (C) — Add --scope-id and --run-id CLI flags to AWS connector.

  • File: integrations/aws/src/sv0_aws/cli/main.py — add --scope-id, --run-id, --category-results-out to the scan subparser. When --scope-id is set, the connector emits the category_results dict as JSON to --category-results-out so the platform Driver can consume it.
  • Update tests for the CLI parser.
  • Commit: feat(aws): accept --scope-id/--run-id from platform driver.

Task 2.7 (P) — Implement InProcessSubprocessDriver.

  • File: src/workers/connector-driver.ts — replace stub. Use child_process.spawn to invoke sv0-aws scan --scope-id ... --run-id ... --category-results-out /tmp/cr.json --graph-json /tmp/g.json. Read both files when subprocess exits. Surface non-zero exit as a failed run with a single-error category result.
  • Test: test/workers/connector-driver.test.ts (new) with a fake script in test/fixtures/fake-connector.sh that writes canned outputs.
  • Commit: feat(workers): InProcessSubprocessDriver — spawn connector CLI, read outputs.

Phase 2 deliverable: an admin can manually insert a ScanScope, enqueue an execute_scan job, and see a scan_runs document populate with per-category results plus a linked sync_id. No scheduling yet.

Phase 3: Scheduling

Task 3.1 (P) — Failing test for scope claim.

  • File: test/workers/scheduler.test.ts (new).
  • Two concurrent calls to claimDueScopes(now, limit=10) against the same due scope should result in exactly one returning the scope.
  • Run: npm test -- test/workers/scheduler.test.ts → fail.
  • Commit: test(workers): failing concurrent claim test for scheduler.

Task 3.2 (P) — Implement scheduler tick.

  • File: src/workers/scheduler.ts (new). Export class Scheduler with start(), stop(), tickOnce() (for tests).
  • tickOnce() queries due scopes, atomically advances schedule.next_run_at, enqueues execute_scan.
  • File: src/storage/mongo/adapter.ts — add claimDueScopes(now, limit) using findOneAndUpdate in a loop bounded by limit (no batch claim — concurrency is controlled per-document).
  • Run: npm test -- test/workers/scheduler.test.ts → green.
  • Commit: feat(workers): in-process scheduler with atomic scope claim.

Task 3.3 (P) — Wire scheduler into platform startup.

  • File: src/index.ts (or wherever WorkerRuntime is constructed) — instantiate Scheduler with tickIntervalMs = 30_000, start it after the runtime starts.
  • Add env flag SV0_SCHEDULER_ENABLED=true (default true in prod, false in tests / docker compose dev).
  • Commit: feat(workers): start scheduler at platform boot.

Task 3.4 (P) — Catch-up safety + cooldown enforcement.

  • File: src/workers/scheduler.ts — when claiming, if now - schedule.next_run_at > 2 × interval, log scheduler.catch_up_skipped and set next_run_at = now + interval (do NOT fire backlog).
  • Also enforce cooldown_after_failure_seconds: query last scan_runs for the scope, if it failed within cooldown, skip and bump next_run_at.
  • Test: test/workers/scheduler.test.ts — add cases for both behaviours.
  • Commit: feat(workers): scheduler honours catch-up cap and per-scope cooldown.

Task 3.5 (P) — Manual trigger API.

  • File: src/api/routes/scan-runs.ts (new). POST /api/v1/scan-runs body { scope_id, trigger?: { type, iac_correlation_id? } }. GET /api/v1/scan-runs/:id. GET /api/v1/scan-runs?scope_id=X&limit=N.
  • File: src/api/index.ts — register the router.
  • Reject if existing running run violates max_concurrent_runs → 409.
  • Tests: test/api/scan-runs.test.ts.
  • Commit: feat(api): scan-runs routes for manual trigger and history.

Phase 3 deliverable: a scope created via the CLI runs on its cadence without operator intervention; scan_runs history accumulates; an operator can hit the API to trigger an extra run or list history.

Phase 4: CLI provisioning + migration

Task 4.1 (P) — Connector-instance-cli scaffold.

  • File: scripts/connector-instance-cli.ts (new). Use Node's built-in parseArgs. Three subcommands: instance, scope, run. Each with create, list, archive.
  • Wire into the StorageAdapter directly.
  • Add to package.json scripts as "control-plane": "tsx scripts/connector-instance-cli.ts".
  • Commit: feat(scripts): connector-instance-cli scaffold.

Task 4.2 (P) — Migration script for legacy connector_credential_refs.

  • File: scripts/migrate-connector-instances.ts (new).
  • Walks all tenant_configs, for each entry in connector_credential_refs creates one ConnectorInstance with display_name = "<kind> (migrated from tenant config)", copies the credential ref, status onboarding, no scopes.
  • Idempotent: skips if an instance with connector_kind + display_name already exists for that tenant.
  • Test: test/scripts/migrate-connector-instances.test.ts.
  • Commit: feat(scripts): migrate legacy credential refs to ConnectorInstance.

Task 4.3 (P) — Add connector_instance_id to ConnectorSyncDoc.

  • File: src/domain/syncs/types.ts — add optional connector_instance_id?: string.
  • File: src/workers/handlers/sync-ingestion.ts — when receiving a NormalizedGraph whose connectorId matches an existing _id in connector_instances, write connector_instance_id. Otherwise leave it null.
  • Test: existing test/workers/sync-ingestion.test.ts — extend with a case that asserts the field is populated when an instance exists.
  • Commit: feat(syncs): backlink ConnectorSync to ConnectorInstance.

Task 4.4 (C) — Document the new CLI shape in connector READMEs.

  • File: integrations/aws/README.md — add a "Driven by platform scheduler" subsection that documents --scope-id / --run-id / --category-results-out and explains operators no longer need to invoke this from a laptop.
  • Commit: docs(aws): document platform-driven scan invocation.

Task 4.5 (P) — End-to-end happy path test.

  • File: test/integration/control-plane-e2e.test.ts (new).
  • Provision a tenant, create an instance via CLI (programmatically), create a scope with cadence: manual, trigger a run via the API, mock the driver to return a small NormalizedGraph, assert: scan_runs.status == "succeeded", scan_runs.sync_id set, downstream entities collection contains the expected entity.
  • Run: npm run test:integration → green.
  • Commit: test(integration): end-to-end control-plane happy path.

Task 4.6 (P) — ConnectorReport minimal-wrap adapter (per umbrella D15).

  • File: src/api/routes/ingest.ts — extend the existing /api/v1/ingest/connector-report handler.
  • On receipt: look up ConnectorInstance for (tenant_id, connector_kind) (use the most-recently-created active instance if multiple exist; log a warning if 0 — keeps unmigrated tenants working). If none, skip the wrap and proceed as today.
  • If found: insert a scan_runs doc with status derived from ConnectorReport.errors (none → succeeded, partial → partial, all-failed → failed), category_results synthesised one entry per ConnectorReport.source_systems, sync_id linked to the resulting connector_syncs._id. Do NOT change anything in the connector code itself; the wrap is platform-side only.
  • Test: test/api/ingest-connector-report-wrap.test.ts — POST a sample entra-servicenow ConnectorReport, assert a scan_runs doc appears with the correct status and category_results.entra.items_scanned > 0.
  • Commit: feat(ingest): minimal-wrap ConnectorReport into ScanRun lifecycle (D15).

Phase 4 deliverable: an existing tenant can be migrated without data loss; an admin can provision a new tenant entirely from the CLI; every connector (AWS via the new flow, entra-servicenow + foundry via the minimal-wrap adapter) lands a scan_runs document on each invocation, making umbrella V2 verifiable across all four; CI validates the full lifecycle.

Validation criteria

#CriterionHow to verify
V1After Phase 1: a developer can insert a ConnectorInstance / ScanScope / ScanRun and query them by tenant.npm test -- test/storage/control-plane.test.ts passes.
V2After Phase 2: an admin can run npx tsx scripts/connector-instance-cli.ts run trigger --scope <id> and a scan_runs doc transitions running → succeeded with category_results populated. entities collection contains the AWS-account entities for that scope.Manual happy-path against a dev AWS account. db.scan_runs.findOne({ _id: ... }) shows status: "succeeded", category_results.iam.items_scanned > 0. db.entities.countDocuments({ tenant_id, source_system: "aws" }) > 0.
V3After Phase 2: when one extractor (e.g. CloudTrail) is forced to fail, the scan_run shows status: "partial", that category shows status: "failed" with a structured error, and other categories still have status: "succeeded" with non-zero items_scanned. The downstream pipeline circuit breaker does NOT remove entities from the failed category's domain.Integration test in test/integration/circuit-breaker.test.ts extended with a "category-isolation preserves circuit-breaker correctness" case.
V4After Phase 3: a scope with cadence: interval, interval_seconds: 60 produces N scan_runs documents in N×60 seconds without operator action. last_run_at and next_run_at advance monotonically.Integration test runs scheduler for 200 seconds, asserts ≥ 3 scan_runs documents, asserts no scope was claimed by two workers concurrently (count of running per scope never exceeds max_concurrent_runs).
V5After Phase 4: the migration script run against a tenant with two legacy connector_credential_refs entries produces exactly two ConnectorInstance docs, status onboarding, with the credential refs preserved. Idempotent: second run is a no-op.test/scripts/migrate-connector-instances.test.ts.
V6Stream 4 contract: POST /api/v1/scan-runs with trigger.iac_correlation_id set, then GET /api/v1/scan-runs/:id returns the same correlation id.test/api/scan-runs.test.ts IaC correlation case.

Open questions

  1. Connector Driver model — Phase 1 subprocess vs side-car container? Recommended: subprocess for Phase 1, side-car as Phase 5. Need explicit user sign-off because side-car ties into sv0-connectors#247 Docker work and may want to be done together.
  2. Scheduler leader-election when platform is multi-process? Today's deploy is single-process per environment; the atomic findOneAndUpdate claim is sufficient. When MediaPro pilot expands and we run two API replicas, do we need MongoDB transactions or is the per-document atomic update enough? Recommend: enough for now, revisit when we observe duplicate runs.
  3. Credential broker — extend tenant_configs or new credentials collection? This stream treats credentials as opaque credentials_ref. The actual broker is out of scope but the team needs to decide before MediaPro: 1Password CLI direct (per #247) vs vault-agnostic indirection. Recommend separate research stream after this one lands.
  4. Per-category service-category enum: closed list per connector kind, or open string? Closed feels safer (typo prevention) but means a new AWS service requires a code change. Recommend open string for now, validated against discovered_capabilities.service_categories_available at scope-creation time.
  5. Tenant self-service UI — when? This stream defines a CLI only. The UI is a future workstream, gated on WorkOS being live (see sv0-platform#373) so connector-instance creation can be tied to a real authenticated user. Recommend post-pilot.
  6. Legacy ConnectorReport ingestion path — wrap in scope/run lifecycle now or later? RESOLVED in umbrella D15: ship a minimal platform-side wrap in Phase 4 (Task 4.6 above) — the connector keeps emitting ConnectorReport exactly as today, the platform synthesises a ScanRun doc on receipt. Full CLI migration of entra-servicenow and foundry connectors stays deferred. This satisfies umbrella V2 ("all four ScanRuns succeed") without forcing a two-week non-AWS connector refactor.
  7. scan_runs retention? A tenant scanning every hour produces ~9k runs/year per scope. At ~5KB per doc that's ~45MB/scope/year — fine for now, but propose 1-year TTL on terminal-status runs (configurable per tenant). Open for sign-off.

References

Issues consulted

  • sv0-connectors#78 — epic: scheduled connector scans (the parent epic this design slots into).
  • sv0-platform#309 — research: multi-tenant connector throttling and rate-limit budgets at scale (informs budget field on ScanScope and per-category isolation requirements).
  • sv0-platform#247 — Phase 1 connector automation with systemd + 1Password (the prior shape we're evolving past; informs Open Question #3).
  • sv0-connectors#89 — connector P0s: AWS multi-account, streaming pagination, retry hardening (Stream 2 territory).
  • sv0-documentation#195 — MediaPro pilot readiness umbrella (the customer-facing deadline gating this work).

Architecture docs

  • 02-processing-pipeline.md — defines today's sync handler, circuit breaker, and scan-safety contract this design extends.
  • 03-database.md — defines the existing 14 collections; this design adds three more.
  • 05-connectors.md — defines NormalizedGraph and connector lifecycle; this design adds the scheduling/scoping shell around it.

Source files

  • sv0-platform/src/storage/storage-adapter.ts — interface this design extends (specifically lines 142-338 for the existing method shape).
  • sv0-platform/src/workers/runtime.ts:5,25-147 — in-process queue this design extends with execute_scan.
  • sv0-platform/src/workers/handlers/sync-ingestion.ts:53-97 — where scanScope is consumed today; design preserves this exact contract.
  • sv0-platform/src/ingestion/types.ts:92-105 — existing ScanScope interface; design lifts this from "ingestion-time declaration" to "first-class persisted concept."
  • sv0-platform/src/domain/syncs/types.tsConnectorSyncDoc shape this design extends with connector_instance_id.
  • sv0-platform/src/api/middleware/auth.ts:55-118 — tenant isolation contract this design respects.
  • sv0-connectors/integrations/aws/src/sv0_aws/cli/main.py:115-331 — current monolithic scan() method that this design decomposes per-category.
  • sv0-platform/docs/session-notes/2026-04-20-foundry-demo-resolution-session-handoff.md — operational evidence motivating the "scan history in DB" requirement.