Connector Framework
Context
SecurityV0 must ingest identity, permission, and execution data from heterogeneous enterprise systems (Entra ID, ServiceNow, GitHub, cloud providers, SaaS platforms). Each source has different APIs, different permission models, and different data structures.
The connector framework provides a standardized interface for extracting, normalizing, and loading this data — enabling the integration team to build connectors independently of the platform team's database and API work.
Scope of "read-only": the connector framework is read-only with respect to source systems — connectors never modify the systems they ingest from. This document defines that ingestion contract. Platform-issued tickets to external systems (Jira / GitHub / ServiceNow / Linear) initiated by the user as part of a remediation action are a separate outbound capability that does not run through this connector framework; see ADR-019 for that carve-out, including the URL-redaction policy that prevents PII leakage in outbound deeplinks.
Design Influences
- Veza OAA (Open Authorization API): 3-step pattern (extract → transform → load) with standardized JSON schema. 250+ integrations via pluggable connectors.
- SecurityV0 differentiator: We normalize to an execution authority model — not just "what can this identity access" but "what execution paths exist, who owns them, and how have they changed."
Decision
A connector interface contract with four phases: Extract → Transform → Diff → Load. Connectors produce a NormalizedGraph that is database-agnostic. The Diff Engine and Storage Adapter handle database-specific operations.
Rationale
Why Interface-Based
- Independent developability: Integration team builds connectors against the interface contract. Platform team builds the diff engine and storage adapter. No coordination needed beyond the schema.
- Testability: Connectors can be tested with mock API responses. Platform can be tested with mock
NormalizedGraphinputs. - Database agnosticism: Same connector works regardless of storage backend (MongoDB for MVP, Neo4j addition for future scale).
Why Not Direct API-to-Database Writes
- Coupling: connector changes require database schema knowledge
- No diff computation: can't detect what changed without comparing to previous state
- No normalization: each connector would need to understand the full schema
Design Detail
Connector Interface
// =============================================================
// CONNECTOR INTERFACE CONTRACT
// =============================================================
interface Connector {
// --- Metadata ---
readonly id: string; // Unique connector identifier (e.g., "entra_id_v1")
readonly name: string; // Human-readable name
readonly sourceSystem: SourceSystem; // Enum: which system this connects to
readonly version: string; // Semantic version of the connector
readonly description: string;
// --- Configuration ---
readonly configSchema: JSONSchema7; // JSON Schema for required configuration
readonly requiredPermissions: PermissionSpec[]; // What permissions needed in source system
// --- Supported entity types ---
readonly entityTypes: NormalizedNodeType[]; // What this connector discovers
readonly relationshipTypes: NormalizedEdgeType[]; // What relationships it maps
// --- Core operations ---
healthCheck(config: ConnectorConfig): Promise<HealthCheckResult>;
extract(config: ConnectorConfig, options: ExtractOptions): Promise<RawExtraction>;
transform(raw: RawExtraction): Promise<NormalizedGraph>;
// --- Optional: incremental sync support ---
supportsIncremental: boolean;
extractIncremental?(config: ConnectorConfig, since: DateTime, deltaToken?: string): Promise<RawExtraction>;
// --- Audit log extraction (preferred for temporal tracking) ---
supportsAuditLog: boolean;
auditLogConfig?: {
retentionDays: number; // How long source retains audit data (e.g., 90 for Azure)
supportedOperations: string[]; // Operations this connector can extract
rateLimit: {
requestsPerSecond: number;
batchSize: number;
};
};
extractAuditLogs?(config: ConnectorConfig, options: AuditLogExtractOptions): Promise<AuditLogExtraction>;
}
// =============================================================
// CONFIGURATION
// =============================================================
interface ConnectorConfig {
tenantId: string;
connectorId: string;
// Source system credentials (retrieved from vault at runtime)
credentials: {
type: 'oauth2_client_credentials' | 'api_key' | 'pat' | 'certificate';
// Actual secrets injected at runtime, never stored in config
};
// Source-specific configuration
sourceConfig: Record<string, unknown>; // Validated against configSchema
// Sync behavior
syncMode: 'full' | 'incremental';
timeout: number; // Max sync duration in seconds
rateLimitConfig?: {
maxRequestsPerSecond: number;
retryBackoffMs: number;
maxRetries: number;
};
}
interface ExtractOptions {
syncId: string; // Unique ID for this sync run
syncMode: 'full' | 'incremental' | 'audit_log';
since?: DateTime; // For incremental: last successful sync timestamp
deltaToken?: string; // For incremental: source-specific continuation token
entityFilter?: NormalizedNodeType[]; // Optional: only extract specific entity types
}
// =============================================================
// AUDIT LOG EXTRACTION (preferred for temporal tracking)
// =============================================================
interface AuditLogExtractOptions {
syncId: string;
since: DateTime; // Fetch audit logs since this time
until?: DateTime; // Optional end time (default: now)
cursor?: SyncCursor; // Resume from previous cursor
operations?: string[]; // Filter to specific operations
}
interface SyncCursor {
tenantId: string;
sourceSystem: SourceSystem;
cursorState: {
// Azure Entra ID
lastActivityDatetime?: DateTime;
skipToken?: string;
// ServiceNow
lastSysUpdatedOn?: DateTime;
lastSysId?: string;
};
lastSuccessfulSync: DateTime;
}
interface AuditLogExtraction {
syncId: string;
connectorId: string;
tenantId: string;
extractedAt: DateTime;
// Audit records from source system
auditRecords: AuditRecord[];
// Cursor for next fetch
nextCursor: SyncCursor;
// Metadata
metadata: {
recordsFetched: number;
oldestRecord: DateTime;
newestRecord: DateTime;
hasMore: boolean;
rateLimitRemaining?: number;
sourceApi: string; // e.g., 'directoryAudits', 'sys_audit'
};
}
interface AuditRecord {
sourceRecordId: string; // ID in source audit log
timestamp: DateTime; // When the change occurred
operation: string; // Source-specific operation name
// Actor who made the change
actor: {
id: string;
type: 'user' | 'service_principal' | 'system' | 'unknown';
displayName?: string;
};
// What was affected
targetResources: Array<{
id: string;
type: string;
displayName?: string;
modifiedProperties?: Array<{
name: string;
oldValue: unknown;
newValue: unknown;
}>;
}>;
// Additional context
result: 'success' | 'failure' | 'unknown';
additionalDetails?: Record<string, unknown>;
// Original record for debugging
rawRecord: Record<string, unknown>;
}
// =============================================================
// EXTRACTION OUTPUT
// =============================================================
interface RawExtraction {
syncId: string;
connectorId: string;
tenantId: string;
extractedAt: DateTime;
// Raw entities from source API
entities: RawEntity[];
relationships: RawRelationship[];
// Sync metadata
metadata: ExtractionMetadata;
// For incremental sync continuation
deltaToken?: string;
}
interface RawEntity {
sourceId: string; // ID in the source system
sourceType: string; // Source-specific type name
properties: Record<string, unknown>; // Raw API response fields
extractedAt: DateTime;
}
interface RawRelationship {
sourceId: string;
targetId: string;
type: string; // Source-specific relationship name
properties: Record<string, unknown>;
extractedAt: DateTime;
}
interface ExtractionMetadata {
apiCallCount: number;
totalEntitiesDiscovered: number;
paginationComplete: boolean;
rateLimitRemaining?: number;
errors: ExtractionError[]; // Non-fatal errors (e.g., individual entity fetch failed)
warnings: string[];
}
interface ExtractionError {
entitySourceId?: string;
operation: string;
message: string;
retryable: boolean;
}
Normalized Schema
The transform phase maps source-specific data to a common vocabulary. This is the contract between connectors and the platform.
// =============================================================
// NORMALIZED GRAPH (output of transform, input to diff engine)
// =============================================================
interface NormalizedGraph {
syncId: string;
connectorId: string;
tenantId: string;
transformedAt: DateTime;
nodes: NormalizedNode[];
edges: NormalizedEdge[];
// Temporal markers: what this connector knows changed
temporalMarkers: TemporalMarker[];
// Evidence availability report: what evidence sources this connector probed
// and their status. Propagates to findings via evidence_completeness.
evidenceCompleteness: EvidenceCompletenessReport;
}
interface EvidenceCompletenessReport {
sources: Record<string, EvidenceSourceStatus>;
}
interface EvidenceSourceStatus {
sourceTable: string; // e.g., "syslog_transaction", "sys_audit_role"
status: 'available' | 'unavailable_not_enabled' | 'unavailable_no_access' | 'unavailable_not_applicable';
recordCount?: number; // How many records retrieved (if available)
oldestRecord?: DateTime; // Oldest record timestamp (if available)
notes?: string; // Human-readable note about limitations
}
// =============================================================
// NORMALIZED NODE TYPES
// =============================================================
type NormalizedNodeType =
| 'identity' // Service principal, OAuth app, machine account (authenticating entities)
| 'automation' // Business rule, script include, flow, scheduled job (execution logic)
| 'connection' // REST message, SOAP message, HTTP connection (outbound integration)
| 'human_identity' // User accounts (owners, approvers) — platform maps to internal 'owner'
| 'role' // Any grouping of permissions
| 'permission' // Individual capability
| 'resource' // Thing being acted upon
| 'credential' // Authentication material (OAuth profile, API key, certificate)
| 'execution_evidence'; // Proof of execution (log entry, API call record)
// MIGRATION COMPATIBILITY: During the migration window, the platform ingestion
// normalizer also accepts the legacy type 'autonomous_identity' and remaps it:
// - autonomous_identity with identitySubtype in (service_principal, oauth_app, machine_account, integration_user) → 'identity'
// - autonomous_identity with identitySubtype in (business_rule, script_include, flow_designer_flow, scheduled_job, event_script, transform_map) → 'automation'
// - autonomous_identity with identitySubtype in (oauth_provider, oauth_profile) → 'credential'
// Connectors should migrate to the new types. Legacy type acceptance will be removed in v3.
// NORMALIZATION NOTE: `human_identity` and Owners
//
// Connectors extract `human_identity` nodes for user accounts they discover
// in source systems (Entra users, ServiceNow sys_user records). The platform's
// normalizer layer maps human_identity to internal entity_type: "owner", based on:
// 1. OWNED_BY edges emitted by the connector (explicit ownership)
// 2. BELONGS_TO edges (group/team membership implying team-level ownership)
// 3. APPROVED_BY edges (approval relationships)
//
// Connectors emit facts (human_identity + OWNED_BY edge); the normalizer
// creates the Owner entity with appropriate owner_type (human, team, business_unit).
//
// Do NOT add 'owner' as a NormalizedNodeType. Ownership is a platform concept
// derived from connector-provided relationship data, not a source-system entity type.
interface NormalizedNode {
// Identity
nodeId: string; // Connector-generated stable ID: `${sourceSystem}:${sourceId}`
nodeType: NormalizedNodeType;
sourceSystem: SourceSystem;
sourceId: string; // ID in source system
// Common properties (all node types)
displayName: string;
status: NodeStatus;
createdAt?: DateTime;
lastModifiedAt?: DateTime;
// Type-specific properties (see union type below)
properties: NodeProperties;
}
// Union of all type-specific property interfaces.
// The correct interface depends on nodeType:
// identity → IdentityProperties
// automation → AutomationProperties
// connection → ConnectionProperties
// credential → CredentialProperties
// human_identity → HumanIdentityProperties
// role → RoleProperties
// permission → PermissionProperties
// resource → ResourceProperties
// execution_evidence → ExecutionEvidenceProperties
type NodeProperties =
| IdentityProperties
| AutomationProperties
| ConnectionProperties
| CredentialProperties
| HumanIdentityProperties
| RoleProperties
| PermissionProperties
| ResourceProperties
| ExecutionEvidenceProperties;
// --- Type-specific property interfaces ---
interface IdentityProperties {
identitySubtype:
| 'service_principal' | 'oauth_app' | 'github_app' | 'agent' | 'machine_account' | 'bot'
| 'integration_user' | 'system_execution';
// Note: PATs are credentials (credentialSubtype: 'pat'), not identities.
// A PAT authenticates *as* an identity but is itself authentication material.
// See ADR-006 entity classification decision tree.
executionMode: 'autonomous' | 'operator_assisted' | 'human_triggered' | 'unknown';
securityRelevance?: 'active_external' | 'dormant_authority' | 'internal_inventory';
lastActivityAt?: DateTime;
ownershipState: OwnershipState;
}
interface AutomationProperties {
automationSubtype:
| 'business_rule' | 'script_include' | 'flow_designer_flow'
| 'scheduled_job' | 'event_script' | 'transform_map';
executionMode: 'autonomous' | 'operator_assisted' | 'human_triggered' | 'unknown';
securityRelevance: 'active_external' | 'dormant_authority' | 'internal_inventory';
lastActivityAt?: DateTime;
ownershipState: OwnershipState;
}
interface ConnectionProperties {
connectionSubtype: 'rest_message' | 'rest_method' | 'soap_message' | 'http_connection';
targetUrl?: string;
authMethod?: string;
description?: string;
}
interface HumanIdentityProperties {
email?: string;
orgUnit?: string;
jobTitle?: string;
accountEnabled: boolean;
disabledAt?: DateTime;
deletedAt?: DateTime;
departedAt?: DateTime;
}
interface RoleProperties {
roleName: string;
roleSubtype: 'application' | 'directory' | 'cloud_iam' | 'custom';
description?: string;
isPrivileged: boolean; // Elevated/admin-level
}
interface PermissionProperties {
permissionName: string;
normalizedAction: NormalizedAction;
scope: string; // What it applies to
description?: string;
}
interface ResourceProperties {
resourceSubtype: 'table' | 'module' | 'api_endpoint' | 'repository' | 'secret' | 'workflow' | 'storage' | 'compute';
businessDomain: BusinessDomain;
sensitivity: SensitivityLevel;
containsPii?: boolean;
containsFinancialData?: boolean;
}
interface CredentialProperties {
credentialSubtype: 'oauth_client_secret' | 'certificate' | 'pat' | 'api_key' | 'oidc_token' | 'ssh_key'
| 'oauth_provider' | 'oauth_profile' | 'client_secret';
expiresAt?: DateTime;
lastUsedAt?: DateTime;
rotatedAt?: DateTime;
}
interface ExecutionEvidenceProperties {
executionType: string; // 'api_call', 'workflow_run', 'sign_in', 'audit_log_entry'
executedAt: DateTime;
action?: string; // What was done
targetResource?: string; // What was acted upon
outcome: 'success' | 'failure' | 'unknown';
}
// =============================================================
// NORMALIZED EDGE TYPES
// =============================================================
type NormalizedEdgeType =
| 'OWNED_BY' // any → human_identity (ownership)
| 'HAS_ROLE' // identity → role (role assignment)
| 'GRANTS' // role → permission (role includes permission)
| 'APPLIES_TO' // permission → resource (permission scopes to resource)
| 'AUTHENTICATES_VIA' // DEPRECATED: accepted for backward compat, remapped to USES on ingest. Will be removed in a future version.
| 'EXECUTES_ON' // automation → resource (execution evidence, narrowed from identity→resource)
| 'APPROVED_BY' // identity → human (approval/authorization)
| 'MEMBER_OF' // identity/human → role (group/team membership)
| 'BELONGS_TO' // owner → parent owner (ownership hierarchy)
| 'AUTHENTICATES_TO' // identity → identity (cross-system auth chain)
| 'DELEGATES_TO' // identity → identity (delegation chain)
| 'RUNS_AS' // automation → identity | human_identity (which identity the automation executes as)
| 'TRIGGERS_ON' // automation → resource/event (what triggers the automation)
| 'CREATED_BY' // entity → human (who created this, distinct from ownership)
// New execution chain edge types (ADR-007)
| 'CALLS' // automation → automation (BR invokes SI)
| 'INVOKES' // automation → connection (SI uses REST Message)
| 'USES' // connection → credential (REST Message uses OAuth Profile)
| 'AUTHENTICATES_AS'; // credential → identity (OAuth Profile represents SP)
// EXECUTION CHAIN PATTERN
// The canonical 4-hop execution chain from automation to identity:
//
// automation --CALLS--> automation --INVOKES--> connection --USES--> credential --AUTHENTICATES_AS--> identity
//
// Not all chains have every hop. A simple chain may be:
// automation --RUNS_AS--> identity
//
// AUTHENTICATES_AS direction: credential → identity (credential represents the identity).
// This is the reverse of the legacy AUTHENTICATES_VIA direction.
interface NormalizedEdge {
edgeId: string; // Connector-generated stable ID
edgeType: NormalizedEdgeType;
sourceNodeId: string; // References NormalizedNode.nodeId
targetNodeId: string; // References NormalizedNode.nodeId
// Temporal properties (critical for drift detection)
since?: DateTime; // When this relationship was established
until?: DateTime; // When it ended (null = still active)
// Edge-specific properties
properties: EdgeProperties;
}
interface EdgeProperties {
// Common
grantedBy?: string; // Who/what established this relationship
sourceEvidence?: string; // Source system record ID proving this edge exists
// For OWNED_BY
ownershipStatus?: OwnershipState;
// For HAS_ROLE
inherited?: boolean; // Via group membership vs direct assignment
// For EXECUTES_ON
executionCount?: number; // Observed execution count in window
lastExecution?: DateTime;
// For AUTHENTICATES_TO — cross-system linkage proof
// Must include issuer/tenant and target instance context to disambiguate multi-tenant/multi-instance setups
evidenceReferences?: {
issuingSystemId: string; // e.g., Entra SP appId (required)
issuingTenantId: string; // e.g., Entra tenant ID "72f988bf-..." (required)
targetSystemId: string; // e.g., ServiceNow oauth_entity.client_id (required)
targetInstanceId: string; // e.g., "https://corp.service-now.com" (required)
targetRecordSysId?: string; // e.g., oauth_entity sys_id (optional, for direct verification)
matchingField: string; // e.g., "client_id" (required)
matchingValue: string; // The actual matched value (required)
targetUserBinding?: string; // e.g., "oauth_entity.user -> sys_user.user_name" (optional)
};
// For APPROVED_BY
approvalType?: 'initial_setup' | 'role_addition' | 'scope_expansion' | 'reapproval';
approvalEvidence?: string; // Change request ID, ticket number, etc.
// For RUNS_AS
runAsType?: 'configured' | 'inherited' | 'system';
// For TRIGGERS_ON
triggerType?: 'schedule' | 'event' | 'manual' | 'api_call';
schedule?: string; // Cron expression or interval if schedule-triggered
// For CREATED_BY
createdAt?: DateTime;
creationContext?: string; // e.g., "sys_created_by" or "app registration creator"
// Maintainer hints (e.g., sys_updated_by) are supplemental evidence and
// MUST NOT be treated as authoritative ownership without explicit OWNED_BY proof.
// For CALLS (automation → automation)
callType?: 'direct' | 'include' | 'delegate'; // Direct script call, script include, or delegation
// For INVOKES (automation → connection)
httpMethod?: string; // GET, POST, etc. if known
// For USES (connection → credential)
// (no additional properties needed — relationship is structural)
// For AUTHENTICATES_AS (credential → identity)
// (no additional properties needed — relationship is structural)
}
// =============================================================
// CHAIN HINTS (Optional — connector-provided execution chain metadata)
// =============================================================
// Connectors MAY include chainMembership in automation node properties
// to provide hints for platform-side chain assembly. This is optional —
// the platform can also discover chains via BFS from entry points.
interface ChainMembershipHint {
role: 'entry_point' | 'code_component' | 'outbound_target' | 'auth_credential' | 'destination_identity';
anchorEntityId: string; // Entry point entity source_id
chainSemanticHash?: string; // e.g., "trigger-incident-dest-graph" (human-readable chain summary)
}
// Usage in NormalizedNode.properties:
// properties: {
// ...automationProperties,
// chainMembership: {
// role: "entry_point",
// anchorEntityId: "br-sys-id-abc",
// chainSemanticHash: "trigger-incident-dest-graph"
// }
// }
// =============================================================
// ENUMS
// =============================================================
type SourceSystem =
| 'entra_id'
| 'servicenow'
| 'github'
| 'aws'
| 'azure'
| 'gcp'
| 'snowflake'
| 'pagerduty'
| 'okta'
| 'custom';
type NormalizedAction =
| 'create'
| 'read'
| 'update'
| 'delete'
| 'execute' // Run workflows, trigger actions
| 'admin' // Manage the system itself
| 'delegate'; // Grant authority to others
type OwnershipState =
| 'owned' // Has active, accountable human
| 'orphaned' // Owner departed/disabled/deleted
| 'unattributed' // Never had clear owner
| 'disputed'; // Multiple conflicting signals
type BusinessDomain =
| 'hr'
| 'finance'
| 'customer'
| 'it_ops'
| 'security'
| 'engineering'
| 'legal'
| 'executive'
| 'unknown';
type SensitivityLevel =
| 'public'
| 'internal'
| 'confidential'
| 'restricted';
type NodeStatus =
| 'active'
| 'disabled'
| 'deleted'
| 'expired';
// =============================================================
// TEMPORAL MARKERS
// =============================================================
interface TemporalMarker {
nodeId: string;
markerType: 'created' | 'modified' | 'deleted' | 'relationship_changed';
timestamp: DateTime;
details?: string; // Human-readable description of what changed
}
OAA Canonical Permission Mapping
The overview references OAA's 10-type canonical permission taxonomy for interoperability. Internally, SecurityV0 uses 7 normalized actions (simpler, sufficient for blast radius and drift analysis). The OAA exporter maps from internal actions to OAA types during export:
SecurityV0 NormalizedAction | OAA Canonical Type(s) | Notes |
|---|---|---|
create | DataCreate | |
read | DataRead, MetadataRead | Connector sets scope to disambiguate data vs config |
update | DataWrite, MetadataWrite | Connector sets scope to disambiguate data vs config |
delete | DataDelete | |
execute | NonData | Trigger workflows, invoke functions |
admin | AppAdmin, SystemAdmin | Scope determines app-level vs system-level |
delegate | GlobalAdmin | Granting access to others is a global-admin concern |
The mapping is intentionally lossy inbound (10→7) — SecurityV0 does not need OAA's full granularity for internal analysis. The OAA exporter reverses the mapping using scope and permissionName to recover the original OAA type where possible.
Permission Normalization Guide
Connectors must map source-specific permissions to NormalizedAction. This table provides guidance:
| Source System | Source Permission/Role | Normalized Action | Scope |
|---|---|---|---|
| Entra ID | Application.Read.All | read | applications |
| Entra ID | Application.ReadWrite.All | admin | applications |
| Entra ID | User.Read.All | read | users |
| Entra ID | Directory.ReadWrite.All | admin | directory |
| ServiceNow | itil role | update | incident,problem,change |
| ServiceNow | admin role | admin | all |
| ServiceNow | catalog_admin | admin | service_catalog |
| ServiceNow | hr_read role | read | hr_tables |
| ServiceNow | custom role with create on table X | create | table_x |
| GitHub | contents: read | read | repository_contents |
| GitHub | contents: write | update | repository_contents |
| GitHub | actions: write | execute | workflows |
| GitHub | admin on repo | admin | repository |
| GitHub | members: write on org | delegate | org_membership |
| AWS | s3:GetObject | read | s3_bucket/prefix |
| AWS | s3:PutObject | create | s3_bucket/prefix |
| AWS | iam:CreateRole | delegate | iam_roles |
| AWS | sts:AssumeRole | execute | iam_role_arn |
| Snowflake | SELECT on schema | read | database.schema |
| Snowflake | INSERT on table | create | database.schema.table |
| Snowflake | ACCOUNTADMIN role | admin | account |
Normalization rules:
- If a permission allows modifying system configuration →
admin - If a permission allows granting access to others →
delegate - If a permission allows triggering automation/workflows →
execute - If a permission allows writing data →
createorupdate(based on whether it's new or existing) - If a permission only allows viewing →
read - If a permission allows removing data →
delete - When in doubt, use the most permissive classification (prefer
adminoverupdate)
Business Domain Classification
Connectors should classify resources by business domain where possible. When the source system provides classification (e.g., ServiceNow modules), use it. Otherwise, use name-based heuristics:
| Resource Name Pattern | Business Domain |
|---|---|
hr_*, employee_*, payroll_*, benefits_* | hr |
fin_*, invoice_*, payment_*, budget_*, gl_* | finance |
customer_*, account_*, opportunity_*, case_* | customer |
incident_*, problem_*, change_*, cmdb_* | it_ops |
sec_*, audit_*, compliance_*, policy_* | security |
repo_*, pipeline_*, build_*, deploy_* | engineering |
| Everything else | unknown |
Diff Engine
The Diff Engine compares the current NormalizedGraph against the previously known state and emits typed change events.
// =============================================================
// DIFF ENGINE
// =============================================================
interface DiffEngine {
/**
* Compute changes between current extraction and previous known state.
* Emits events to the event store and applies changes to the graph store.
*/
computeAndApply(
tenantId: string,
syncId: string,
currentGraph: NormalizedGraph,
storageAdapter: StorageAdapter
): Promise<DiffResult>;
}
interface DiffResult {
syncId: string;
tenantId: string;
computedAt: DateTime;
// Counts
nodesCreated: number;
nodesUpdated: number;
nodesDeleted: number; // Tombstoned (present in previous, absent in current)
edgesCreated: number;
edgesUpdated: number;
edgesRemoved: number;
// Events emitted
events: GraphEvent[];
// Entities that should trigger re-evaluation
entitiesToEvaluate: string[]; // Node IDs that changed in ways relevant to triggers
}
interface GraphEvent {
eventId: string;
tenantId: string;
timestamp: DateTime;
entityType: NormalizedNodeType;
entityId: string;
eventType: 'created' | 'updated' | 'deleted' | 'relationship_added' | 'relationship_removed';
relationshipType?: NormalizedEdgeType;
targetEntityId?: string;
beforeState: Record<string, unknown> | null;
afterState: Record<string, unknown> | null;
changedProperties?: string[];
sourceConnector: string;
syncId: string;
}
Diff rules:
- Node present in current but not in previous →
createdevent - Node present in both but properties differ →
updatedevent (with before/after) - Node present in previous but not in current →
deletedevent (tombstone in graph) - Edge present in current but not in previous →
relationship_addedevent - Edge present in previous but not in current →
relationship_removedevent - Edge present in both but properties differ →
relationship_removed+relationship_added(treat as replacement)
Trigger-relevant changes (entities added to entitiesToEvaluate):
- Any
identitywith ownership change - Any
identitywith role addition/removal - Any
identitywith activity change (dormant detection) - Any
automationwith execution mode or security relevance change - Any
human_identitywith status change (disabled/deleted)
Storage Adapter
The storage adapter is the abstraction layer between the diff engine and the actual database.
// =============================================================
// STORAGE ADAPTER (database-agnostic interface)
// =============================================================
interface StorageAdapter {
// --- Graph operations ---
getNode(tenantId: string, nodeId: string): Promise<NormalizedNode | null>;
getNodesByType(tenantId: string, nodeType: NormalizedNodeType): Promise<NormalizedNode[]>;
getEdgesForNode(tenantId: string, nodeId: string, direction: 'incoming' | 'outgoing' | 'both'): Promise<NormalizedEdge[]>;
upsertNode(tenantId: string, node: NormalizedNode): Promise<void>;
deleteNode(tenantId: string, nodeId: string): Promise<void>;
upsertEdge(tenantId: string, edge: NormalizedEdge): Promise<void>;
deleteEdge(tenantId: string, edgeId: string): Promise<void>;
// --- Path queries ---
queryPaths(tenantId: string, query: PathQuery): Promise<ExecutionPath[]>;
// --- Event operations ---
appendEvents(tenantId: string, events: GraphEvent[]): Promise<void>;
queryEvents(tenantId: string, query: EventQuery): Promise<GraphEvent[]>;
// --- Bulk operations (for full sync) ---
bulkUpsertNodes(tenantId: string, nodes: NormalizedNode[]): Promise<void>;
bulkUpsertEdges(tenantId: string, edges: NormalizedEdge[]): Promise<void>;
// --- State queries ---
getLastSyncState(tenantId: string, connectorId: string): Promise<SyncState | null>;
getGraphForConnector(tenantId: string, connectorId: string): Promise<NormalizedGraph>;
}
interface PathQuery {
startNodeId?: string;
startNodeType?: NormalizedNodeType;
startNodeFilters?: Record<string, unknown>;
traverseEdgeTypes: NormalizedEdgeType[];
endNodeType?: NormalizedNodeType;
endNodeFilters?: Record<string, unknown>;
maxDepth: number; // Safety limit
}
interface ExecutionPath {
nodes: NormalizedNode[];
edges: NormalizedEdge[];
depth: number;
startNode: NormalizedNode;
endNode: NormalizedNode;
}
interface EventQuery {
entityId?: string;
entityType?: NormalizedNodeType;
eventType?: string;
relationshipType?: NormalizedEdgeType;
since?: DateTime;
until?: DateTime;
limit?: number;
orderBy?: 'asc' | 'desc';
}
Connector Lifecycle
┌─────────────────────────────────────────────────────────────┐
│ CONNECTOR SYNC LIFECYCLE │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. SCHEDULE / TRIGGER │
│ ├── Periodic (cron-based, configurable per connector) │
│ ├── Manual (API call: POST /connectors/{id}/sync) │
│ └── Event-driven (future: webhook from source) │
│ │
│ 2. INITIALIZE │
│ ├── Generate sync_id │
│ ├── Load connector config from store │
│ ├── Retrieve credentials from vault │
│ ├── Record sync start (connector_syncs table) │
│ └── Acquire tenant-scoped execution context │
│ │
│ 3. HEALTH CHECK │
│ ├── Verify source API connectivity │
│ ├── Verify credential validity │
│ └── Abort if unhealthy (record failure) │
│ │
│ 4. EXTRACT │
│ ├── Call source APIs (paginated, rate-limited) │
│ ├── Handle API errors (retry with backoff) │
│ ├── Collect raw entities and relationships │
│ └── Produce RawExtraction │
│ │
│ 5. TRANSFORM │
│ ├── Map raw entities to NormalizedNodes │
│ ├── Map raw relationships to NormalizedEdges │
│ ├── Normalize permissions to standard actions │
│ ├── Classify resources by business domain │
│ ├── Compute ownership state │
│ └── Produce NormalizedGraph │
│ │
│ 6. DIFF │
│ ├── Load previous known state from StorageAdapter │
│ ├── Compare nodes: created / updated / deleted │
│ ├── Compare edges: added / removed / modified │
│ ├── Emit GraphEvents (immutable, append-only) │
│ └── Identify entities needing trigger re-evaluation │
│ │
│ 7. LOAD │
│ ├── Append events to event store │
│ ├── Upsert/delete nodes in graph store │
│ ├── Upsert/delete edges in graph store │
│ └── Update sync_version on all affected entities │
│ │
│ 8. POST-SYNC │
│ ├── Update connector_syncs record (completed/failed) │
│ ├── Emit "sync completed" event │
│ ├── Trigger evaluator runs for changed entities │
│ └── Store deltaToken for next incremental sync │
│ │
└─────────────────────────────────────────────────────────────┘
Audit Log Sync Lifecycle
When syncMode: 'audit_log', the connector queries source system audit logs instead of extracting full entity state. This is the preferred mode for temporal tracking.
┌─────────────────────────────────────────────────────────────┐
│ AUDIT LOG SYNC LIFECYCLE │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. CHECK BASELINE STATE │
│ ├── Has initial baseline been taken? │
│ │ ├── No → Trigger FULL sync first to establish │
│ │ │ baseline, then switch to audit_log mode │
│ │ └── Yes → Continue to audit log sync │
│ └── Is retention window approaching? │
│ └── Yes → Schedule protective baseline │
│ │
│ 2. LOAD SYNC CURSOR │
│ ├── Get last successful sync timestamp from cursor │
│ ├── Calculate fetch window: cursor → now │
│ └── Validate cursor not older than retention window │
│ │
│ 3. EXTRACT AUDIT LOGS │
│ ├── Azure Entra ID: │
│ │ └── GET /auditLogs/directoryAudits │
│ │ ?$filter=activityDateTime ge {since} │
│ │ &$orderby=activityDateTime asc │
│ │ │
│ ├── ServiceNow: │
│ │ └── GET /api/now/table/sys_audit │
│ │ ?sysparm_query=sys_updated_on>{since} │
│ │ ^ORDERBYsys_updated_on │
│ │ │
│ └── Handle pagination, rate limiting │
│ │
│ 4. TRANSFORM AUDIT RECORDS TO EVENTS │
│ ├── Map source operations to EventType │
│ ├── Extract actor information │
│ ├── Extract change details (old/new values) │
│ └── Generate audit_source provenance fields │
│ │
│ 5. CORRELATE TO ENTITIES │
│ ├── Match target resources to existing entities │
│ ├── Create new entities if referenced but not exists │
│ ├── Update entity relationships based on events │
│ └── Mark identities needing path recomputation │
│ │
│ 6. PERSIST │
│ ├── Append events to events collection (canonical) │
│ ├── Update affected entities │
│ ├── Recompute execution paths for changed identities │
│ └── Update sync cursor with new position │
│ │
│ 7. POST-SYNC │
│ ├── Check if baseline is due (weekly schedule) │
│ ├── Trigger evaluator for changed entities │
│ ├── Generate evidence packs for new findings │
│ └── Record sync metrics (audit_records_fetched, etc.) │
│ │
└─────────────────────────────────────────────────────────────┘
Audit Operation Mapping
Connectors map source-specific audit operations to SecurityV0 EventType. This table shows the standard mappings:
Azure Entra ID (/auditLogs/directoryAudits)
| Activity Display Name | EventType | Entity Type Affected |
|---|---|---|
Add service principal | created | identity |
Update service principal | updated | identity |
Delete service principal | deleted | identity |
Add member to role | role_assigned | identity / human_identity |
Remove member from role | role_revoked | identity / human_identity |
Add owner to service principal | owner_assigned | identity |
Remove owner from service principal | owner_removed | identity |
Add app role assignment to service principal | permission_granted | identity |
Remove app role assignment from service principal | permission_revoked | identity |
Update user (accountEnabled → false) | status_changed | human_identity |
Delete user | status_changed (decayed) | human_identity |
Add service principal credentials | credential_created | credential |
Remove service principal credentials | credential_deleted | credential |
Consent to application | permission_granted | identity |
ServiceNow (sys_audit table)
| Table + Action | EventType | Entity Type Affected |
|---|---|---|
sys_user insert | created | human_identity / identity |
sys_user.active → false | status_changed | human_identity / identity |
sys_user_has_role insert | role_assigned | identity / human_identity |
sys_user_has_role delete | role_revoked | identity / human_identity |
sys_user_group insert (for user) | relationship_added | human_identity |
sys_user_group.active → false | status_changed | Owner (team) |
oauth_entity insert | created | credential |
oauth_entity.active → false | status_changed | credential |
oauth_entity_scope insert | permission_granted | credential |
oauth_entity_scope delete | permission_revoked | credential |
Retention Management
Source systems have limited audit log retention:
- Azure Entra ID: ~90 days (can be extended with Azure AD Premium P2)
- ServiceNow: Configurable, typically 1+ year
The connector framework handles retention by:
- Tracking retention window:
SyncCursor.retentionExpiresAtcomputed from source retention policy - Daily retention check: Background job checks all cursors for approaching expiry
- Protective baseline: When cursor age approaches retention limit (e.g., 83 days for Azure), trigger full sync + baseline before audit logs are purged
- Alerting: Notify tenant admin if sync hasn't run and audit data may be lost
// Retention check pseudocode
async function dailyRetentionCheck(tenantId: string) {
const cursors = await db.sync_cursors.find({ tenant_id: tenantId });
for (const cursor of cursors) {
const retentionDays = getRetentionDays(cursor.source_system); // 90 for Azure
const daysSinceLastSync = daysBetween(cursor.last_successful_sync, now());
const warningThreshold = retentionDays - 7; // 7 days warning
if (daysSinceLastSync >= warningThreshold) {
// Trigger immediate sync and baseline
await triggerSync(tenantId, cursor.source_system, {
syncMode: 'full',
takeBaseline: true,
reason: 'approaching_retention_limit'
});
// Alert tenant admin
await notifyRetentionWarning(tenantId, cursor.source_system);
}
}
}
Error Handling
// =============================================================
// ERROR HANDLING STRATEGY
// =============================================================
/**
* Errors are classified into three categories:
* 1. RETRIABLE: Transient failures (rate limits, timeouts, 5xx) — retry with backoff
* 2. PARTIAL: Some entities failed but others succeeded — continue, log errors
* 3. FATAL: Cannot proceed (auth failure, config error) — abort sync, alert
*/
interface SyncErrorPolicy {
// Rate limiting
maxRetries: number; // Default: 3
retryBackoffMs: number; // Default: 1000 (exponential)
maxRetryBackoffMs: number; // Default: 60000
// Partial failure
maxPartialErrors: number; // Default: 50 (abort if more than this many entities fail)
continueOnPartialError: boolean; // Default: true
// Timeout
syncTimeoutSeconds: number; // Default: 3600 (1 hour)
apiCallTimeoutSeconds: number; // Default: 30
}
// Error types
type ConnectorErrorType =
| 'AUTH_FAILURE' // Credentials invalid/expired → FATAL
| 'PERMISSION_DENIED' // Insufficient permissions → FATAL
| 'RATE_LIMITED' // API rate limit hit → RETRIABLE
| 'TIMEOUT' // API call timed out → RETRIABLE
| 'SERVER_ERROR' // Source system 5xx → RETRIABLE
| 'ENTITY_NOT_FOUND' // Specific entity missing → PARTIAL (skip entity)
| 'PARSE_ERROR' // Cannot parse API response → PARTIAL (skip entity)
| 'CONFIG_ERROR' // Invalid configuration → FATAL
| 'NETWORK_ERROR'; // Connectivity issue → RETRIABLE
Priority Connectors
| Priority | Connector | Source System | Entity Types | Status |
|---|---|---|---|---|
| 1 | entra_id | Microsoft Entra ID | service_principals, users, app_roles, credentials | Design ready |
| 2 | servicenow | ServiceNow | integration_users, roles, tables, acls, audit_events | Design ready |
| 3 | github | GitHub | apps, oauth_apps, pats, actions, org_members, secrets | Proven in v0.1 |
| 4 | aws_iam | AWS | iam_roles, policies, trust_relationships | Future |
| 5 | azure_managed_id | Azure | managed_identities, enterprise_apps | Future |
| 6 | snowflake | Snowflake | service_accounts, roles, grants | Future |
| 7 | pagerduty | PagerDuty | service_integrations, api_keys | Future |
Connector Configuration Examples
Entra ID Connector Config
{
"connectorId": "entra_id_v1",
"tenantId": "tenant-xyz",
"credentials": {
"type": "oauth2_client_credentials",
"tenantId": "entra-tenant-id",
"clientId": "app-registration-client-id"
// clientSecret retrieved from vault at runtime
},
"sourceConfig": {
"graphApiVersion": "v1.0",
"includeSignInLogs": true,
"signInLogDays": 30,
"filterServicePrincipalTypes": ["Application"],
"excludeFirstPartyApps": true
},
"syncMode": "full",
"timeout": 3600,
"rateLimitConfig": {
"maxRequestsPerSecond": 10,
"retryBackoffMs": 1000,
"maxRetries": 3
}
}
ServiceNow Connector Config
{
"connectorId": "servicenow_v1",
"tenantId": "tenant-xyz",
"credentials": {
"type": "api_key",
"instanceUrl": "https://instance.service-now.com"
// username + password or OAuth token retrieved from vault
},
"sourceConfig": {
"includeAuditHistory": true,
"auditHistoryDays": 365,
"integrationUserPatterns": ["svc_*", "int_*", "api_*"],
"tableClassification": {
"hr_case": { "domain": "hr", "sensitivity": "confidential", "pii": true },
"incident": { "domain": "it_ops", "sensitivity": "internal" },
"customer_account": { "domain": "customer", "sensitivity": "confidential" }
}
},
"syncMode": "full",
"timeout": 3600,
"rateLimitConfig": {
"maxRequestsPerSecond": 5,
"retryBackoffMs": 2000,
"maxRetries": 3
}
}
Connector Testing Strategy
Each connector should be testable in isolation with mock data:
// =============================================================
// TESTING SUPPORT
// =============================================================
interface MockSourceApi {
/**
* Given a connector config, return canned API responses
* that exercise all entity types and relationship types.
*/
getResponses(config: ConnectorConfig): Map<string, ApiResponse>;
}
interface ConnectorTestHarness {
/**
* Run the connector against mock APIs and verify:
* 1. All expected entity types are extracted
* 2. Normalization produces valid NormalizedGraph
* 3. Temporal markers are correctly set
* 4. Error handling works (inject failures)
*/
runConnectorTest(
connector: Connector,
mockApi: MockSourceApi,
expectations: TestExpectations
): Promise<TestResult>;
}
interface TestExpectations {
expectedNodeCount: number;
expectedEdgeCount: number;
expectedNodeTypes: NormalizedNodeType[];
expectedEdgeTypes: NormalizedEdgeType[];
expectedOrphanedIdentities: number; // For trigger testing
expectedDriftEvents: number; // For temporal testing
}
Mock data scenarios (each connector should have these test cases):
- Happy path: All entities extracted and normalized correctly
- Orphaned owner: Identity with disabled/deleted owner →
ownershipState: 'orphaned' - Scope drift: Role additions over time without re-approval
- Credential expiry: Credential approaching or past expiration
- Partial failure: Some API calls fail, connector continues with available data
- Rate limiting: Connector backs off and retries correctly
- Empty response: Source system returns no entities (should not delete existing graph)
Import-by-Type Pattern (Track 2)
See ADR-004 for the full decision record.
The Entra-ServiceNow connector (and recommended pattern for future connectors) uses import-by-type instead of pre-linked entity chains.
Data flow
Source APIs → entity dicts by type → EdgeResolver → DiscoveredEntities → Transformer → NormalizedGraph
DiscoveredEntities
Flat container holding discovered entities grouped by type:
@dataclass
class DiscoveredEntities:
business_rules: list[dict]
script_includes: list[dict]
scheduled_jobs: list[dict]
flows: list[dict]
rest_messages: list[dict]
oauth_entities: list[dict]
azure_sps: list[dict]
azure_users: list[dict]
sn_users: list[dict]
execution_data: dict[str, dict]
auth_edges: list[ResolvedEdge] # Cross-system auth matches
caller_edges: list[ResolvedEdge] # Automation → REST message calls
EdgeResolver
Explicit resolution of cross-entity relationships:
resolve_auth_edges()— Matches OAuth entities to Azure SPs byclient_id(case-insensitive)resolve_caller_edges()— Matches automations to REST messages by script-text searchresolve_indirect_caller_edges()— Matches BR/Job → Script Include bycalls_script_includefield
Each ResolvedEdge carries provenance properties for evidence packs:
@dataclass
class ResolvedEdge:
source_id: str
target_id: str
edge_type: str # e.g. AUTHENTICATES_TO, CALLS
properties: dict # Evidence references, matching fields
Connector responsibility boundary
Connectors produce:
- Entity nodes with classification properties (egress_category, origin, ownership_status, risk_group, identity_binding_status)
- Relationship edges with provenance (RUNS_AS, AUTHENTICATES_TO, OWNED_BY, TRIGGERS_ON, etc.)
- Execution evidence nodes (sign-in data, flow execution counts)
Connectors do NOT produce findings. All detection and evaluation happens in the platform evaluator. See ADR-005.
Dependencies
- Depends on:
03-database.md(StorageAdapter implementations reference the schema) - Depended on by:
05-reference-impl-entra-servicenow.md(concrete connector implementations)
Open Questions
- Connector SDK language: TypeScript (consistent with potential Node.js API layer) or Python (richer API client libraries for Microsoft Graph, ServiceNow)? Could support both via language-agnostic interface (connector outputs JSON, platform ingests).
- Incremental sync: Not all source APIs support delta queries. For those that don't, should we do full extraction and rely on the diff engine, or implement application-level change tracking?
- Connector isolation model: Separate container per connector per tenant (strongest isolation, higher cost) or shared worker pool with credential injection (simpler ops, weaker isolation)?
- Schema evolution: How to handle connector schema changes (e.g., new node properties, new edge types) without breaking existing data?
- Connector marketplace: In the future, should third parties be able to publish connectors? If so, what security/sandboxing is needed?
- Multi-instance source scope: When the same connector type is deployed across multiple instances (e.g., two AWS accounts, two ServiceNow instances within one tenant), the current
nodeIdformat (${sourceSystem}:${sourceId}) may collide. Asource_scopefield (scope_type + scope_id) onNormalizedNodeand a corresponding update to the database unique index (tenant_id, source_system, source_scope, source_id) would be needed. Not required for current single-instance MVP. Revisit when multi-account AWS or multi-instance connectors enter scope. - IAM policy conditionality: Cloud IAM (especially AWS) uses conditions, permission boundaries, session policies, and explicit deny semantics that materially change effective permissions. The current
PermissionPropertieshasscopeandnormalizedActionbut no structure for conditions. When the AWS IAM connector enters development, evaluate adding apolicyConditionstructure (conditionLanguage,rawCondition,normalizedKeys) toPermissionProperties. Design constraint: the platform is deterministic — store conditions as evidence and let the connector compute effective permissions during transform, not the platform at query time.