Connector Control & Execution Architecture
TL;DR
This stream defines the per-tenant control plane for SecurityV0: how a tenant configures one or more connector instances, how each instance produces N independently scoped, independently observable scans, and how every scan run is recorded as a first-class MongoDB document. It introduces three new collections (connector_instances, scan_scopes, scan_runs), formalises the ScanScope concept that already lives in the ingestion pipeline as the identity of a unit of scan work, and pins down the contract Streams 2/3/4 will consume. The work is a prerequisite for the MediaPro pilot (multiple AWS accounts, multiple service categories, no engineer-on-laptop dependency), for cross-connector stitching (which needs a stable instance/scope identity to anchor entity provenance), and for the Lab 2 IaC demo (which needs an up → scan → teardown API).
Problem
The platform's current connector invocation model breaks down on three independent axes:
- Manual laptop runs. Every scan today is an engineer running
sv0-aws scan --all --submit,sv0-entra-servicenow --all --submit, orsv0-foundry --allfrom a laptop with credentials in.env(seesv0-connectors#78). The operational consequences enumerated in #78 — data staleness, engineer bottleneck, no SLA, credential sprawl, no multi-tenancy path — are accepted but no longer survive an enterprise pilot. - Monolithic scans. A single
sv0-aws scan --allwalks IAM, every Lambda function in every region, every Bedrock agent, every ECS service, every S3 bucket, every Secret, then tries to extract CloudTrail evidence per Lambda — sequentially, in one process, against one set of API quotas (seeintegrations/aws/src/sv0_aws/cli/main.py:115-331). One throttle oncloudtrail:LookupEvents(2 TPS per region; seesv0-platform#309) bricks the whole run; one IAM permission gap onbedrock:list_agentsproduces a singleextracted_datablob with no clean way to say "IAM succeeded, Bedrock failed". Wiz, Snyk, and Datadog all decomposed this years ago. We need scoped scans whose failure modes are independent. - No DB-level scan history. The 2026-04-20 Foundry demo session (see
docs/session-notes/2026-04-20-foundry-demo-resolution-session-handoff.md) reconstructed "when did the Foundry connector last run, and what failed" by scraping Sergey's terminal and Cloudflare access logs. The platform hasconnector_syncsrecords (good) but they're keyed onconnector_type, not on (instance × scope). An operator cannot answer "when did we last successfully scan tenant X'sstagingAWS account's IAM service category" because the data model doesn't represent it.
For MediaPro (target early May 2026, see sv0-documentation#195), this combination is the gating risk. MediaPro will provide multiple AWS accounts and we cannot ask Ezequiel's IAM team for a 4-hour read window on each. We need many small scans, scheduled, observable, with per-scope isolation.
Current state
What exists today.
- A connector outputs a
NormalizedGraphand POSTs it to/api/v1/ingest/normalized-graph(seesrc/api/routes/ingest.ts). The platform validates, dedupes bysyncId, and enqueuessync_ingestion(seesrc/workers/runtime.ts). ScanScopealready exists in the ingestion-side type system (src/ingestion/types.ts:92) as{ mode, sourceSystems[], scannedEntityTypes[], errors }. It controls deletion detection inside the diff engine and circuit breaker (02-processing-pipeline.md§3.2-3.5). It is declared by the connector at submission time, but is not persisted, scheduled, or addressable.connector_syncscollection records per-submission run metadata (src/domain/syncs/types.ts):_id,tenant_id,connector_type,started_at,completed_at,status,sync_mode,metrics,error,cursor_state_after. There is no link to instance, no link to scope, no schedule reference, no per-category breakdown.tenant_configscarries an opaqueconnector_credential_refs: Record<string, string>(seesrc/domain/tenants/types.ts:24). This is a single string per connector kind per tenant — assumes one instance per kind.- The worker runtime is a single-process in-memory FIFO (
src/workers/runtime.ts:25). It runssync_ingestion,evaluate_findings,build_evidence_pack,generate_report. It does not run scheduled triggers, scan execution, or anything outbound.
What is missing.
- A persistent representation of "connector instance" — i.e., a tenant's specific AWS organisation, ServiceNow instance, Foundry project — with credentials, source-system targets, and lifecycle.
- A persistent representation of "scoped scan" — a unit of work smaller than "the whole AWS account" — with a stable identity that scan history hangs off.
- A scheduler that decides "it's time to run scope S of instance I now."
- Connector CLIs that accept
--instance-idand--scope-idand emit aNormalizedGraphwhosescanScopematches. - An API call Stream 4 (Lab 2 IaC) can hit to say "this scope corresponds to ephemeral terraform state ID X; trigger a scan now and tell me when done."
Constraints.
- MongoDB-only storage (per
03-database.md). - Deterministic logic only — no probabilistic scheduling decisions, no ML.
- Read-only connectors — scheduler and execution layer never write back to source systems.
- TypeScript ESM Node 24 in
sv0-platform; Python 3.11+ uv-managed insv0-connectors. - The in-process worker queue in
src/workers/runtime.tsis what we have. Don't introduce Redis, RabbitMQ, or Temporal in this stream — extend the existing runtime.
Comparison: how peers do this
Wiz. Wiz models "Cloud Account" as a first-class object inside a tenant. Each cloud account has its own connector status, its own last-scan timestamp, and its own scoped scans (Wiz scans IAM, compute, storage, container images, and secrets as independent scan jobs per account; the UI shows a per-resource-type "last scanned" timestamp). Their multi-account onboarding flow is essentially "one cross-account IAM role per AWS account, one API call per service category." This is the model we are echoing.
Snyk. Snyk's "Integration" object is the closest analogue to our ConnectorInstance: per-org, per-tenant, with credentials stored as opaque references and a status surface visible in the org settings UI. Snyk separates "Integration" (the connection) from "Project" (the scoped unit of scan); a single GitHub integration produces N projects (one per repo), each scanned on its own cadence. The split is exactly the (ConnectorInstance, ScanScope) split we propose, with their "Project" being our "ScanScope".
Datadog. Datadog's CSPM and CIEM products separate "Cloud Account" (instance) from "Resource Type" (service category) and surface scan health per pair: e.g., "AWS account 123456: IAM scan healthy, Lambda scan healthy, ECS scan failing — last run 14m ago." Their cadences are configurable per account and per resource type. Their UI is the long-term target for ours; for now we mirror only the data shape, not the UX.
Common pattern. All three split (a) the connection identity and credentials from (b) the unit-of-scan-work, and persist scan run history at (b)'s grain. None of them treat a tenant's whole cloud as one scan target. None of them schedule globally — scheduling is per (instance, scope) pair. This validates our design direction.
Design proposal
Data model — new collections
Three new MongoDB collections live alongside the existing 14 (per 03-database.md). All carry tenant_id as the leading compound-index field. None duplicate state already in entities, findings, or connector_syncs — they are control-plane metadata.
connector_instances
Per-tenant connector connection. One tenant may have N instances of the same connector kind (two AWS organisations, three ServiceNow instances, etc.).
{
_id: "ci-uuid", // Stable, deterministic from tenant_id + kind + display key
tenant_id: "uuid-...",
connector_kind: "aws", // aws | entra_servicenow | azure_foundry | servicenow | …
display_name: "MediaPro production AWS org", // Human label, shown in scan history
status: "active", // active | paused | error | onboarding
// Credentials are stored OUTSIDE this collection.
// credentials_ref is an opaque pointer to whatever vault we end up using (1Password
// for Phase 1, AWS Secrets Manager / Azure Key Vault later).
credentials_ref: {
provider: "op", // op | aws_secretsmanager | azure_keyvault | env (dev only)
ref: "op://sv0-connectors/mediapro-aws/credentials"
},
// Source-system targets this instance can scan. Stream 2 (multi-account AWS) defines
// the AWS-specific shape. The control plane treats this as opaque structured data
// and passes it through to the connector CLI via the scope.
source_system_targets: [
{ kind: "aws_account", account_id: "111111111111", role_arn: "arn:aws:iam::111111111111:role/sv0-reader", external_id: "..." },
{ kind: "aws_account", account_id: "222222222222", role_arn: "arn:aws:iam::222222222222:role/sv0-reader", external_id: "..." }
],
// Discovery hints — connector reports back here after first run. Lets the scheduler
// know "you have N targets and M categories; you might want N×M scopes."
discovered_capabilities: {
accounts: 2,
regions: ["us-east-1", "eu-west-1"],
service_categories_available: ["iam", "lambda", "bedrock", "s3", "ecs", "secrets", "cloudtrail"],
last_discovered_at: ISODate("2026-04-22T15:00:00Z")
},
// Lifecycle
created_at: ISODate(...),
created_by_user_id: ObjectId(...), // The user (or "system") who provisioned this
updated_at: ISODate(...),
archived_at: null
}
Rationale. This separates "what tenant X uses to talk to source system Y instance Z" from credentials (which rotate independently) and from scope (which is finer-grained). discovered_capabilities is the bridge that lets the scheduler suggest reasonable defaults without hard-coding "AWS has 11 service categories" into the platform.
Indexes.
db.connector_instances.createIndex({ tenant_id: 1, connector_kind: 1, status: 1 });
db.connector_instances.createIndex({ tenant_id: 1, _id: 1 }, { unique: true });
Tenant scoping. Every query carries tenant_id. Cross-tenant access is impossible at the storage-adapter layer (per the existing platform contract).
Integration with existing collections. connector_instances._id becomes the new connector_id value emitted in NormalizedGraph.connectorId and threaded through connector_syncs.connector_type (we will rename that field to connector_instance_id in a non-breaking way; see Phase 4).
scan_scopes
A scan scope is the unit of work the scheduler executes. One ConnectorInstance has 1..N scan scopes. Two scopes never scan overlapping data; categories within a scope are atomic-on-failure together.
{
_id: "ss-uuid", // Deterministic from instance_id + scope_keys hash
tenant_id: "uuid-...",
instance_id: "ci-uuid",
connector_kind: "aws",
display_name: "AWS prod / IAM", // Human label
// Connector-specific scope keys. Opaque to the platform; meaningful to the connector.
// Stream 2 (multi-account AWS) defines the AWS shape; an Entra example would be
// { scope_keys: { entra_tenant_id: "..." } }; ServiceNow would be
// { scope_keys: { instance_url: "https://corp.service-now.com" } }.
scope_keys: {
// AWS-specific keys (defined by Stream 2). Always plural arrays even when length 1.
// A single AWS scope addresses 1..N accounts × 1..N regions; the array form is the
// canonical shape so Stream 1's diff engine and Stream 2's per-(account×category)
// executor agree on cardinality.
account_ids: ["111111111111"],
regions: ["us-east-1"]
},
// Service categories included in this scope. These map 1:1 to independently-failing units
// of work inside the connector. Each category emits its own per-category status in the ScanRun.
// For AWS, these match Stream 2's category list (iam, lambda, bedrock, s3, ecs, ecr,
// dynamodb, sns, secrets, stepfunctions, eventbridge, cloudtrail).
service_categories: ["iam", "lambda", "cloudtrail"],
// Scheduling — see Scheduling section. nullable means "manual trigger only."
schedule: {
cadence: "interval", // interval | cron | manual
interval_seconds: 3600, // Required if cadence=interval
cron_expression: null, // Required if cadence=cron, e.g. "0 */6 * * *"
timezone: "UTC",
next_run_at: ISODate("2026-04-22T16:00:00Z"),
last_run_at: ISODate("2026-04-22T15:00:00Z")
},
// Budget / safety knobs the scheduler enforces.
budget: {
max_runtime_seconds: 1800, // Hard cap; scan_runs for this scope go to status=timeout if exceeded
max_concurrent_runs: 1, // Per-scope concurrency cap (default 1)
cooldown_after_failure_seconds: 600 // After a failed run, don't reschedule sooner than this
},
// Lifecycle
status: "active", // active | paused | archived
created_at: ISODate(...),
updated_at: ISODate(...)
}
Rationale. This is the single most important new object. It encodes the user's "Wiz scans IAM separately from Bedrock separately from ECS" insight as schema. Putting service_categories on the scope (not on the instance) lets a tenant decide "scope A is IAM only, hourly; scope B is Bedrock + Lambda, every 6 hours; scope C is CloudTrail evidence, nightly."
Indexes.
db.scan_scopes.createIndex({ tenant_id: 1, instance_id: 1, status: 1 });
db.scan_scopes.createIndex({ status: 1, "schedule.next_run_at": 1 }); // Scheduler hot path — cluster-wide scan
db.scan_scopes.createIndex({ tenant_id: 1, connector_kind: 1 });
The second index is intentionally NOT prefixed by tenant_id because the scheduler runs cluster-wide, not per-tenant. The query is { status: "active", "schedule.next_run_at": { $lte: now } }.
scan_runs
One document per execution attempt. This is the operator-facing scan history.
{
_id: "sr-uuid",
tenant_id: "uuid-...",
instance_id: "ci-uuid",
scope_id: "ss-uuid",
sync_id: "uuid-...", // Links to connector_syncs once the connector POSTs
// a NormalizedGraph back. Null until then.
// Captured at run-creation time. Lets us reconstruct what scope was used even if the
// ScanScope doc was edited mid-flight.
scope_snapshot: {
connector_kind: "aws",
scope_keys: { account_ids: ["111111111111"], regions: ["us-east-1"] },
service_categories: ["iam", "lambda", "cloudtrail"]
},
// Trigger provenance.
trigger: {
type: "scheduled", // scheduled | manual_api | iac_lifecycle | retry
triggered_by_user_id: null, // Set if type=manual_api
triggered_by_run_id: null, // Set if type=retry (parent run)
iac_correlation_id: null // Set if type=iac_lifecycle (Stream 4)
},
started_at: ISODate("2026-04-22T15:00:00Z"),
ended_at: ISODate("2026-04-22T15:04:12Z"),
status: "partial", // running | succeeded | partial | failed | timeout | cancelled
// Per-category outcomes. This is the field that lets an analyst answer "did Lambda
// scan succeed for this scope?" without scraping logs.
category_results: {
iam: {
status: "succeeded",
items_scanned: 187,
started_at: ISODate("2026-04-22T15:00:01Z"),
ended_at: ISODate("2026-04-22T15:00:48Z"),
errors: []
},
lambda: {
status: "succeeded",
items_scanned: 42,
started_at: ISODate("2026-04-22T15:00:48Z"),
ended_at: ISODate("2026-04-22T15:02:11Z"),
errors: []
},
cloudtrail: {
status: "failed",
items_scanned: 0,
started_at: ISODate("2026-04-22T15:02:11Z"),
ended_at: ISODate("2026-04-22T15:04:12Z"),
errors: [
{
category: "rate_limit", // auth | rate_limit | api_error | data_error | timeout
code: "ThrottlingException",
message: "cloudtrail:LookupEvents throttled at 2 TPS, retries exhausted",
retryable: true,
occurred_at: ISODate("2026-04-22T15:04:11Z")
}
]
}
},
// Aggregated counts for quick rendering.
totals: {
items_scanned: 229,
errors: 1,
categories_succeeded: 2,
categories_failed: 1
},
// Reference back into the existing pipeline.
emitted_report_id: null, // Reserved for future ConnectorReport linkage
sync_id: "uuid-..." // Repeated here for join convenience
}
Rationale. category_results is a map keyed by service category name. It is intentionally NOT a separate collection: a scan run is a unit of operator interest, and one Mongo round-trip should give an analyst the full picture. At MVP scale (≤20 categories per scope) this is well under document size limits.
Status semantics:
| status | meaning |
|---|---|
running | Connector started, has not reported back |
succeeded | All categories succeeded |
partial | At least one category succeeded, at least one failed (per-category isolation worked) |
failed | All categories failed, OR the connector failed before any category started |
timeout | Exceeded budget.max_runtime_seconds |
cancelled | Operator-initiated cancellation |
Indexes.
db.scan_runs.createIndex({ tenant_id: 1, scope_id: 1, started_at: -1 });
db.scan_runs.createIndex({ tenant_id: 1, instance_id: 1, started_at: -1 });
db.scan_runs.createIndex({ tenant_id: 1, status: 1, started_at: -1 });
db.scan_runs.createIndex({ status: 1, started_at: 1 }); // For stale-run reaper
db.scan_runs.createIndex({ sync_id: 1 }, { sparse: true }); // Join to connector_syncs
db.scan_runs.createIndex({ "trigger.iac_correlation_id": 1 }, { sparse: true }); // Stream 4
Integration with existing 14 collections.
connector_syncs(existing) keeps its current shape but itsconnector_typefield gains a siblingconnector_instance_id. Existing code that readsconnector_typecontinues to work; new code readsconnector_instance_id.scan_run.sync_idis the join key.entitiesand all downstream collections gain no new fields; they continue to be tagged withsource_connector_idper the cross-connector merge logic insrc/workers/handlers/sync-ingestion.ts:26-37.source_connector_idwill become theconnector_instance_id, giving Stream 3 (graph stitching) a stable per-instance identity to anchor merges on.tenant_configs.connector_credential_refsis deprecated for new tenants in favour ofconnector_instances.credentials_ref. Existing entries stay readable until migration.
Scoped scan execution
How a single ConnectorInstance produces N independent ScanRuns
The control plane is the source of truth for "what scans should happen." When the scheduler decides scope ss-A is due, it:
- Atomically claims the scope:
findOneAndUpdate({ _id: "ss-A", "schedule.next_run_at": { $lte: now } }, { $set: { "schedule.next_run_at": now + interval } }). If the find returns null, another scheduler (or test) already claimed it. - Inserts a
scan_runsdocument withstatus: running, capturingscope_snapshotandtrigger. - Enqueues a new worker job type
execute_scanwith{ run_id, instance_id, scope_id }. - The worker handler (new file
src/workers/handlers/execute-scan.ts) reads the instance'scredentials_ref, resolves it via the credential broker, and invokes the connector via a Connector Driver (see below). - The connector executes one category at a time, reporting per-category outcome back via the Driver.
- On completion, the handler updates
scan_runs.category_results,totals,status,ended_at. If at least one category produced data, the connector POSTs aNormalizedGraphwith the existing pipeline, and the resultingsync_idis written back toscan_runs.sync_id.
Per-category failure isolation
Each category is its own try/except boundary inside the connector. The contract:
- A category that fails records
category_results[X].status = "failed"and a structurederrors[]entry, but does not raise out of the connector main loop. - The category that succeeded still produces nodes/edges; the
NormalizedGraphincludes whatever was extracted. ScanScope.modeis set totargetedandscannedEntityTypesreflects the categories that actually succeeded (so the existing diff engine and circuit breaker —02-processing-pipeline.md§3.2-3.5 — only consider deletion within the successful scope, never inferring "Lambda is gone" from "Lambda scan failed").
This is a design constraint on connectors, not a platform-side enforcement. We add a fixture-based test in sv0-connectors that asserts: "if extractor X raises, the resulting payload still contains entities from extractor Y."
Concurrency / parallelism rules
ScanScope.budget.max_concurrent_runsdefaults to 1. The scheduler's atomic claim (step 1 above) plus a checkcount(scan_runs where scope_id=X and status="running") < max_concurrent_runsenforces this.- Two scopes for the same instance MAY run concurrently. Two scopes for different instances MAY run concurrently. The platform places no global cap; per-tenant caps live in
tenant_configs.feature_flags(e.g.,"max_concurrent_scans_per_tenant": 4) for noisy-neighbour control. - The existing
WorkerRuntimeis single-threaded (src/workers/runtime.ts:30). For Phase 1, that meansexecute_scanjobs run serially within one platform process — fine for MVP because the actual work is happening in subprocesses (or out-of-band — see Connector Driver). For multi-tenant prod we revisit when the queue depth alert (P2 in02-processing-pipeline.md§10) starts firing.
How a scan posts results back to the platform
Two options for the Driver, decided in Open Question #1:
Option A — In-process Driver. Worker handler shells out to the Python connector CLI as a subprocess, captures the NormalizedGraph JSON from stdout, and POSTs it to /api/v1/ingest/normalized-graph over loopback. Connector code remains unchanged. Pros: zero migration. Cons: same-host coupling, hard to scale connectors independently.
Option B — Side-car Driver. Connector runs in its own container (per sv0-platform#247), takes --instance-id --scope-id --run-id arguments and calls back to a new platform endpoint POST /api/v1/scan-runs/:id/category per category and POST /api/v1/scan-runs/:id/complete at end. Pros: clean separation, can run on a different host. Cons: needs more new endpoints.
Recommendation: Option A for Phase 1, Option B as Phase 5 (out of this stream's scope). Option A unblocks MediaPro without rewriting the connector framework. The Driver interface is the same in both, so the migration is mechanical when we're ready.
The new ingest path is unchanged for either: connectors POST to /api/v1/ingest/normalized-graph with NormalizedGraph.scanScope populated from the scope. The diff engine, circuit breaker, materialisers, and finding evaluators all continue to work without modification.
Scheduling
Mechanism: in-process scheduler tick inside the existing platform. A new src/workers/scheduler.ts module runs setInterval(tick, 30_000) inside the platform process and:
- Queries
db.scan_scopes.find({ status: "active", "schedule.next_run_at": { $lte: new Date() } }).limit(50). - For each scope, tries the atomic claim from §"How a single ConnectorInstance produces N independent ScanRuns".
- Enqueues
execute_scanfor each successfully claimed scope.
Why in-process and not GHA / external cron.
- All other production state lives in MongoDB; scheduler state should too (matches
sv0-connectors#78decision 6.A — "Use platform's events/jobs collection"). - 30-second tick granularity is sufficient — minimum scan cadence is much higher (5+ minutes).
- Adding GitHub Actions or Kubernetes CronJobs introduces ops overhead and a second source of truth.
- When we move to multi-process platform deploy, the scheduler's atomic
findOneAndUpdateclaim is the leader-election mechanism — no separate coordination layer needed.
Where scheduler state lives. scan_scopes.schedule.next_run_at is the canonical "when next." scan_scopes.schedule.last_run_at is updated by the worker on completion. No separate scheduler-state collection.
Catch-up behaviour. If the scheduler was down and a scope's next_run_at is hours in the past:
- We do not fire N runs to catch up. We fire one run, set
next_run_at = now + interval. Connectors are full-state extractors, not delta-event consumers — replaying missed runs is meaningless (and burns API quota). - We do log a
scheduler.catch_up_skippedwarning withdelay_secondsso operators see the gap in metrics.
Manual trigger API. POST /api/v1/scan-runs with body { scope_id, trigger: { type: "manual_api" } }:
- Validates the user has write access to the tenant.
- Bypasses
next_run_at(manual triggers ignore schedule). - Still respects
max_concurrent_runs— returns409 Conflictif a run for that scope is already running. - Returns
202 Acceptedwith the newrun_id.
Configuration (admin/CLI today, tenant self-service later)
A new CLI in sv0-platform/scripts/connector-instance-cli.ts provides:
# Create instance
npx tsx scripts/connector-instance-cli.ts instance create \
--tenant mediapro \
--kind aws \
--display-name "MediaPro production AWS org" \
--credentials-ref "op://sv0-connectors/mediapro-aws/credentials" \
--target '{"kind":"aws_account","account_id":"111111111111","role_arn":"arn:aws:iam::111111111111:role/sv0-reader","external_id":"abc"}'
# Create scope
npx tsx scripts/connector-instance-cli.ts scope create \
--tenant mediapro \
--instance ci-uuid-prod \
--display-name "AWS prod / IAM hourly" \
--scope-keys '{"account_ids":["111111111111"],"regions":["us-east-1"]}' \
--categories iam \
--cadence interval --interval-seconds 3600
# Trigger a manual run
npx tsx scripts/connector-instance-cli.ts run trigger --scope ss-uuid
# Show scan history for a scope
npx tsx scripts/connector-instance-cli.ts run list --scope ss-uuid --limit 10
The CLI writes directly via the StorageAdapter (no API round-trip needed for ops). The future self-service UI is a thin layer over the same StorageAdapter methods (Open Question #5).
Operational behaviour
Scan history view (per scope). An analyst sees these five fields per row:
started_at(relative: "12m ago")status(running / succeeded / partial / failed / timeout)- Per-category sparkline:
iam ✓ lambda ✓ cloudtrail ✗ totals.items_scanned- Top error message (from the first failed category) — truncated to 80 chars, full message on hover
Error categorisation. Every error in category_results[X].errors[] carries a category field with a closed enum: auth | rate_limit | api_error | data_error | timeout. This drives:
- Auth errors → page the tenant's connector owner (rotate creds), do not retry.
- Rate-limit errors → backoff via
cooldown_after_failure_seconds, then retry. - API errors (5xx) → retry once on next scheduled tick.
- Data errors (parse / schema mismatch) → do not retry, file a P1 alert (it's a connector bug).
- Timeout → reduce scope (operator action: split categories into separate scopes).
Partial-success semantics. A partial run is treated as a successful sync for the categories that succeeded. The NormalizedGraph is submitted with scanScope.scannedEntityTypes reflecting only successful categories, so the existing circuit breaker (02-processing-pipeline.md §3.3) does not flag missing data from the failed categories as deletion. The scan_runs document is the operator's truth source for what failed; the pipeline's connector_syncs.metrics reflects what was actually ingested.
Tenant isolation. All collections leading-key on tenant_id. The credential broker resolves credentials_ref only after validating the requesting worker's tenant context. Two scopes from different tenants cannot collide on _id (UUIDv4) and the storage adapter never accepts a query without tenant scoping.
Migration / backward compat
- Existing manual scans keep working. A direct POST to
/api/v1/ingest/normalized-graphwithscanScopepopulated still ingests fine; it simply does not produce ascan_runsdocument. Engineers runningsv0-aws scan --all --submitfrom a laptop see the same behaviour they do today. connector_syncsis unchanged. New code readsconnector_instance_idwhen present, falls back toconnector_type. We addconnector_instance_idas a sibling field, not a replacement.tenant_configs.connector_credential_refsis read-only legacy. Migration script (Phase 4) walks each tenant, creates one ConnectorInstance per legacy entry, and copies the cred ref. Old entries stay until the next major version.ConnectorReport(the alternate ingest path used by entra-servicenow and azure-foundry) is unchanged in this stream. It continues to work; future work can wrap it in the same scope/run lifecycle.
Interface to other streams
- Stream 2 (multi-account AWS): Defines the AWS-specific shape of
ConnectorInstance.source_system_targets[].kind = "aws_account"andScanScope.scope_keys = { account_ids: string[], regions: string[] }(always plural arrays). Defines the canonical list of AWS service categories forScanScope.service_categories(which lives outsidescope_keys, at the top of theScanScopedocument, validated againstConnectorInstance.discovered_capabilities.service_categories_available). Implements per-category extractors that respect the failure-isolation contract above. - Stream 3 (cross-connector graph stitching): Consumes
connector_instance_id(=_idofconnector_instances) as the stable provenance tag on every entity and relationship. The cross-connector merge logic insrc/workers/handlers/sync-ingestion.ts:26-37already keys onsource_connector_id— that field becomes the instance ID, giving Stream 3 a stable per-(tenant, source-system, instance) identity to merge against. - Stream 4 (MediaPro Lab 2 IaC demo): Consumes the up→scan→teardown contract:
POST /api/v1/connector-instances(up),POST /api/v1/scan-runs?scope_id=X&trigger.iac_correlation_id=Ythen pollGET /api/v1/scan-runs/:iduntilstatusis terminal (scan),DELETE /api/v1/connector-instances/:id(teardown). Theiac_correlation_idfield on the run is how Stream 4's terraform state ID flows through scan history for post-demo audit.
Implementation plan
This section follows writing-plans skill conventions: each task is bite-sized (2-5 minutes of focused work), names exact files and exact commands, and ends in a TDD cycle plus a commit. Tasks are grouped into four phases that produce working, testable increments.
Repo legend: (P) = sv0-platform, (C) = sv0-connectors.
Phase 1: Data model + storage
Task 1.1 (P) — Add domain types for new collections.
- File:
src/domain/connector-instances/types.ts(new),src/domain/scan-scopes/types.ts(new),src/domain/scan-runs/types.ts(new). - Define the three TS interfaces from §"Data model" above as
ConnectorInstanceDoc,ScanScopeDoc,ScanRunDoc. UseexactOptionalPropertyTypes-safe shapes (noundefinedassignments — use spread). - Add closed-enum
as constarrays forstatus,trigger.type,category_results[X].errors[].category,schedule.cadence. - Commit:
feat(domain): add ConnectorInstance, ScanScope, ScanRun types.
Task 1.2 (P) — Write failing storage-adapter tests.
- File:
test/storage/control-plane.test.ts(new). - Tests for
insertConnectorInstance,getConnectorInstance,queryConnectorInstances,insertScanScope,claimDueScopes,insertScanRun,updateScanRunCategoryResult,completeScanRun,queryScanRuns. - Each test sets up a tenant, exercises one method, asserts persisted shape and tenant isolation.
- Run:
npm test -- test/storage/control-plane.test.tsand confirm all tests fail with "method not implemented." - Commit:
test(storage): failing tests for connector-instance / scan-scope / scan-run methods.
Task 1.3 (P) — Extend StorageAdapter interface and Mongo implementation.
- File:
src/storage/storage-adapter.ts— add the 9 method signatures from Task 1.2. - File:
src/storage/mongo/adapter.ts— implement them. Index creation in the adapter'sinit(). - Run:
npm test -- test/storage/control-plane.test.tsand confirm green. - Commit:
feat(storage): implement connector-instance / scan-scope / scan-run methods.
Task 1.4 (P) — Add Mongo indexes from §"Data model".
- File:
src/storage/mongo/adapter.ts— extendcreateIndexes()with the eleven new indexes per the §"Indexes" enumeration specified per collection above. - Run:
npm run test:integration -- test/integration/storage/indexes.test.tsto verify indexes are created on a fresh DB. - Commit:
feat(storage): create indexes for control-plane collections.
Phase 1 deliverable: the three new collections exist, are queryable, are properly indexed, and have full unit-test coverage. No execution behaviour yet.
Phase 2: Scoped scan execution
Task 2.1 (P) — Define the Connector Driver interface.
- File:
src/workers/connector-driver.ts(new). - Export
interface ConnectorDriver { run(input: ConnectorRunInput): Promise<ConnectorRunOutput>; }whereConnectorRunInput = { instance, scope, runId }andConnectorRunOutput = { categoryResults, normalizedGraph? }. - No implementation yet — just the interface + a stub
InProcessSubprocessDriverclass withthrow new Error("not implemented"). - Commit:
feat(workers): define ConnectorDriver interface.
Task 2.2 (P) — Write failing test for execute_scan handler.
- File:
test/workers/execute-scan.test.ts(new). - Mock the StorageAdapter and a fake
ConnectorDriverthat returns a canned per-category outcome (one succeeded, one failed). - Assert: handler creates a
scan_runsdoc with statusrunning, then updates topartialwith the rightcategory_results. - Run:
npm test -- test/workers/execute-scan.test.tsand confirm fail. - Commit:
test(workers): failing test for execute_scan handler.
Task 2.3 (P) — Implement execute_scan handler.
- File:
src/workers/handlers/execute-scan.ts(new). - File:
src/workers/runtime.ts— extendWorkerJobTypeunion with"execute_scan". - File:
src/workers/index.ts— register the new handler. - Handler reads scope + instance, calls driver, writes per-category results, sets terminal status, ends.
- Run:
npm test -- test/workers/execute-scan.test.ts→ green. - Commit:
feat(workers): execute_scan handler with per-category result tracking.
Task 2.4 (P) — Wire NormalizedGraph submission to scan_runs.
- File:
src/workers/handlers/execute-scan.ts— after a successful driver call, POST the resulting NormalizedGraph to the existing ingest service in-process (callIngestService.submit()directly, not over HTTP) soscan_runs.sync_idcan be filled atomically. - Test: extend
test/workers/execute-scan.test.tsto assertscan_runs.sync_idis set. - Commit:
feat(workers): link scan_runs to connector_syncs via sync_id.
Task 2.5 (C) — Add per-category extractor wrapping in AWS connector.
- File:
integrations/aws/src/sv0_aws/cli/main.py— wrap each extractor call (lines ~163-232) in atry / exceptthat records to acategory_resultsdict on theAWSConnectorinstance, instead of letting an exception kill the whole scan. - File:
integrations/aws/tests/test_category_isolation.py(new) — fixture that simulatesIAMExtractor.extract_all_iam_entitiesraising; assertscategory_results["iam"]["status"] == "failed"ANDcategory_results["lambda"]["status"] == "succeeded". - Run:
cd integrations/aws && uv run pytest tests/test_category_isolation.py→ green. - Commit:
feat(aws): per-category failure isolation in scan loop.
Task 2.6 (C) — Add --scope-id and --run-id CLI flags to AWS connector.
- File:
integrations/aws/src/sv0_aws/cli/main.py— add--scope-id,--run-id,--category-results-outto thescansubparser. When--scope-idis set, the connector emits thecategory_resultsdict as JSON to--category-results-outso the platform Driver can consume it. - Update tests for the CLI parser.
- Commit:
feat(aws): accept --scope-id/--run-id from platform driver.
Task 2.7 (P) — Implement InProcessSubprocessDriver.
- File:
src/workers/connector-driver.ts— replace stub. Usechild_process.spawnto invokesv0-aws scan --scope-id ... --run-id ... --category-results-out /tmp/cr.json --graph-json /tmp/g.json. Read both files when subprocess exits. Surface non-zero exit as afailedrun with a single-error category result. - Test:
test/workers/connector-driver.test.ts(new) with a fake script intest/fixtures/fake-connector.shthat writes canned outputs. - Commit:
feat(workers): InProcessSubprocessDriver — spawn connector CLI, read outputs.
Phase 2 deliverable: an admin can manually insert a ScanScope, enqueue an execute_scan job, and see a scan_runs document populate with per-category results plus a linked sync_id. No scheduling yet.
Phase 3: Scheduling
Task 3.1 (P) — Failing test for scope claim.
- File:
test/workers/scheduler.test.ts(new). - Two concurrent calls to
claimDueScopes(now, limit=10)against the same due scope should result in exactly one returning the scope. - Run:
npm test -- test/workers/scheduler.test.ts→ fail. - Commit:
test(workers): failing concurrent claim test for scheduler.
Task 3.2 (P) — Implement scheduler tick.
- File:
src/workers/scheduler.ts(new). Exportclass Schedulerwithstart(),stop(),tickOnce()(for tests). tickOnce()queries due scopes, atomically advancesschedule.next_run_at, enqueuesexecute_scan.- File:
src/storage/mongo/adapter.ts— addclaimDueScopes(now, limit)usingfindOneAndUpdatein a loop bounded bylimit(no batch claim — concurrency is controlled per-document). - Run:
npm test -- test/workers/scheduler.test.ts→ green. - Commit:
feat(workers): in-process scheduler with atomic scope claim.
Task 3.3 (P) — Wire scheduler into platform startup.
- File:
src/index.ts(or whereverWorkerRuntimeis constructed) — instantiateSchedulerwithtickIntervalMs = 30_000, start it after the runtime starts. - Add env flag
SV0_SCHEDULER_ENABLED=true(defaulttruein prod,falsein tests / docker compose dev). - Commit:
feat(workers): start scheduler at platform boot.
Task 3.4 (P) — Catch-up safety + cooldown enforcement.
- File:
src/workers/scheduler.ts— when claiming, ifnow - schedule.next_run_at > 2 × interval, logscheduler.catch_up_skippedand setnext_run_at = now + interval(do NOT fire backlog). - Also enforce
cooldown_after_failure_seconds: query lastscan_runsfor the scope, if it failed within cooldown, skip and bumpnext_run_at. - Test:
test/workers/scheduler.test.ts— add cases for both behaviours. - Commit:
feat(workers): scheduler honours catch-up cap and per-scope cooldown.
Task 3.5 (P) — Manual trigger API.
- File:
src/api/routes/scan-runs.ts(new).POST /api/v1/scan-runsbody{ scope_id, trigger?: { type, iac_correlation_id? } }.GET /api/v1/scan-runs/:id.GET /api/v1/scan-runs?scope_id=X&limit=N. - File:
src/api/index.ts— register the router. - Reject if existing
runningrun violatesmax_concurrent_runs→ 409. - Tests:
test/api/scan-runs.test.ts. - Commit:
feat(api): scan-runs routes for manual trigger and history.
Phase 3 deliverable: a scope created via the CLI runs on its cadence without operator intervention; scan_runs history accumulates; an operator can hit the API to trigger an extra run or list history.
Phase 4: CLI provisioning + migration
Task 4.1 (P) — Connector-instance-cli scaffold.
- File:
scripts/connector-instance-cli.ts(new). Use Node's built-inparseArgs. Three subcommands:instance,scope,run. Each withcreate,list,archive. - Wire into the StorageAdapter directly.
- Add to
package.jsonscripts as"control-plane": "tsx scripts/connector-instance-cli.ts". - Commit:
feat(scripts): connector-instance-cli scaffold.
Task 4.2 (P) — Migration script for legacy connector_credential_refs.
- File:
scripts/migrate-connector-instances.ts(new). - Walks all
tenant_configs, for each entry inconnector_credential_refscreates oneConnectorInstancewithdisplay_name = "<kind> (migrated from tenant config)", copies the credential ref, statusonboarding, no scopes. - Idempotent: skips if an instance with
connector_kind+display_namealready exists for that tenant. - Test:
test/scripts/migrate-connector-instances.test.ts. - Commit:
feat(scripts): migrate legacy credential refs to ConnectorInstance.
Task 4.3 (P) — Add connector_instance_id to ConnectorSyncDoc.
- File:
src/domain/syncs/types.ts— add optionalconnector_instance_id?: string. - File:
src/workers/handlers/sync-ingestion.ts— when receiving a NormalizedGraph whoseconnectorIdmatches an existing_idinconnector_instances, writeconnector_instance_id. Otherwise leave it null. - Test: existing
test/workers/sync-ingestion.test.ts— extend with a case that asserts the field is populated when an instance exists. - Commit:
feat(syncs): backlink ConnectorSync to ConnectorInstance.
Task 4.4 (C) — Document the new CLI shape in connector READMEs.
- File:
integrations/aws/README.md— add a "Driven by platform scheduler" subsection that documents--scope-id/--run-id/--category-results-outand explains operators no longer need to invoke this from a laptop. - Commit:
docs(aws): document platform-driven scan invocation.
Task 4.5 (P) — End-to-end happy path test.
- File:
test/integration/control-plane-e2e.test.ts(new). - Provision a tenant, create an instance via CLI (programmatically), create a scope with
cadence: manual, trigger a run via the API, mock the driver to return a small NormalizedGraph, assert:scan_runs.status == "succeeded",scan_runs.sync_idset, downstreamentitiescollection contains the expected entity. - Run:
npm run test:integration→ green. - Commit:
test(integration): end-to-end control-plane happy path.
Task 4.6 (P) — ConnectorReport minimal-wrap adapter (per umbrella D15).
- File:
src/api/routes/ingest.ts— extend the existing/api/v1/ingest/connector-reporthandler. - On receipt: look up
ConnectorInstancefor(tenant_id, connector_kind)(use the most-recently-createdactiveinstance if multiple exist; log a warning if 0 — keeps unmigrated tenants working). If none, skip the wrap and proceed as today. - If found: insert a
scan_runsdoc withstatusderived fromConnectorReport.errors(none → succeeded, partial → partial, all-failed → failed),category_resultssynthesised one entry perConnectorReport.source_systems,sync_idlinked to the resultingconnector_syncs._id. Do NOT change anything in the connector code itself; the wrap is platform-side only. - Test:
test/api/ingest-connector-report-wrap.test.ts— POST a sample entra-servicenow ConnectorReport, assert ascan_runsdoc appears with the correctstatusandcategory_results.entra.items_scanned > 0. - Commit:
feat(ingest): minimal-wrap ConnectorReport into ScanRun lifecycle (D15).
Phase 4 deliverable: an existing tenant can be migrated without data loss; an admin can provision a new tenant entirely from the CLI; every connector (AWS via the new flow, entra-servicenow + foundry via the minimal-wrap adapter) lands a scan_runs document on each invocation, making umbrella V2 verifiable across all four; CI validates the full lifecycle.
Validation criteria
| # | Criterion | How to verify |
|---|---|---|
| V1 | After Phase 1: a developer can insert a ConnectorInstance / ScanScope / ScanRun and query them by tenant. | npm test -- test/storage/control-plane.test.ts passes. |
| V2 | After Phase 2: an admin can run npx tsx scripts/connector-instance-cli.ts run trigger --scope <id> and a scan_runs doc transitions running → succeeded with category_results populated. entities collection contains the AWS-account entities for that scope. | Manual happy-path against a dev AWS account. db.scan_runs.findOne({ _id: ... }) shows status: "succeeded", category_results.iam.items_scanned > 0. db.entities.countDocuments({ tenant_id, source_system: "aws" }) > 0. |
| V3 | After Phase 2: when one extractor (e.g. CloudTrail) is forced to fail, the scan_run shows status: "partial", that category shows status: "failed" with a structured error, and other categories still have status: "succeeded" with non-zero items_scanned. The downstream pipeline circuit breaker does NOT remove entities from the failed category's domain. | Integration test in test/integration/circuit-breaker.test.ts extended with a "category-isolation preserves circuit-breaker correctness" case. |
| V4 | After Phase 3: a scope with cadence: interval, interval_seconds: 60 produces N scan_runs documents in N×60 seconds without operator action. last_run_at and next_run_at advance monotonically. | Integration test runs scheduler for 200 seconds, asserts ≥ 3 scan_runs documents, asserts no scope was claimed by two workers concurrently (count of running per scope never exceeds max_concurrent_runs). |
| V5 | After Phase 4: the migration script run against a tenant with two legacy connector_credential_refs entries produces exactly two ConnectorInstance docs, status onboarding, with the credential refs preserved. Idempotent: second run is a no-op. | test/scripts/migrate-connector-instances.test.ts. |
| V6 | Stream 4 contract: POST /api/v1/scan-runs with trigger.iac_correlation_id set, then GET /api/v1/scan-runs/:id returns the same correlation id. | test/api/scan-runs.test.ts IaC correlation case. |
Open questions
- Connector Driver model — Phase 1 subprocess vs side-car container? Recommended: subprocess for Phase 1, side-car as Phase 5. Need explicit user sign-off because side-car ties into
sv0-connectors#247Docker work and may want to be done together. - Scheduler leader-election when platform is multi-process? Today's deploy is single-process per environment; the atomic
findOneAndUpdateclaim is sufficient. When MediaPro pilot expands and we run two API replicas, do we need MongoDB transactions or is the per-document atomic update enough? Recommend: enough for now, revisit when we observe duplicate runs. - Credential broker — extend
tenant_configsor newcredentialscollection? This stream treats credentials as opaquecredentials_ref. The actual broker is out of scope but the team needs to decide before MediaPro: 1Password CLI direct (per #247) vs vault-agnostic indirection. Recommend separate research stream after this one lands. - Per-category service-category enum: closed list per connector kind, or open string? Closed feels safer (typo prevention) but means a new AWS service requires a code change. Recommend open string for now, validated against
discovered_capabilities.service_categories_availableat scope-creation time. - Tenant self-service UI — when? This stream defines a CLI only. The UI is a future workstream, gated on WorkOS being live (see
sv0-platform#373) so connector-instance creation can be tied to a real authenticated user. Recommend post-pilot. LegacyRESOLVED in umbrella D15: ship a minimal platform-side wrap in Phase 4 (Task 4.6 above) — the connector keeps emittingConnectorReportingestion path — wrap in scope/run lifecycle now or later?ConnectorReportexactly as today, the platform synthesises aScanRundoc on receipt. Full CLI migration of entra-servicenow and foundry connectors stays deferred. This satisfies umbrella V2 ("all four ScanRuns succeed") without forcing a two-week non-AWS connector refactor.scan_runsretention? A tenant scanning every hour produces ~9k runs/year per scope. At ~5KB per doc that's ~45MB/scope/year — fine for now, but propose 1-year TTL on terminal-status runs (configurable per tenant). Open for sign-off.
References
Issues consulted
sv0-connectors#78— epic: scheduled connector scans (the parent epic this design slots into).sv0-platform#309— research: multi-tenant connector throttling and rate-limit budgets at scale (informsbudgetfield on ScanScope and per-category isolation requirements).sv0-platform#247— Phase 1 connector automation with systemd + 1Password (the prior shape we're evolving past; informs Open Question #3).sv0-connectors#89— connector P0s: AWS multi-account, streaming pagination, retry hardening (Stream 2 territory).sv0-documentation#195— MediaPro pilot readiness umbrella (the customer-facing deadline gating this work).
Architecture docs
02-processing-pipeline.md— defines today's sync handler, circuit breaker, and scan-safety contract this design extends.03-database.md— defines the existing 14 collections; this design adds three more.05-connectors.md— defines NormalizedGraph and connector lifecycle; this design adds the scheduling/scoping shell around it.
Source files
sv0-platform/src/storage/storage-adapter.ts— interface this design extends (specifically lines 142-338 for the existing method shape).sv0-platform/src/workers/runtime.ts:5,25-147— in-process queue this design extends withexecute_scan.sv0-platform/src/workers/handlers/sync-ingestion.ts:53-97— wherescanScopeis consumed today; design preserves this exact contract.sv0-platform/src/ingestion/types.ts:92-105— existingScanScopeinterface; design lifts this from "ingestion-time declaration" to "first-class persisted concept."sv0-platform/src/domain/syncs/types.ts—ConnectorSyncDocshape this design extends withconnector_instance_id.sv0-platform/src/api/middleware/auth.ts:55-118— tenant isolation contract this design respects.sv0-connectors/integrations/aws/src/sv0_aws/cli/main.py:115-331— current monolithicscan()method that this design decomposes per-category.sv0-platform/docs/session-notes/2026-04-20-foundry-demo-resolution-session-handoff.md— operational evidence motivating the "scan history in DB" requirement.