Skip to main content

Connector Framework


Context

SecurityV0 must ingest identity, permission, and execution data from heterogeneous enterprise systems (Entra ID, ServiceNow, GitHub, cloud providers, SaaS platforms). Each source has different APIs, different permission models, and different data structures.

The connector framework provides a standardized interface for extracting, normalizing, and loading this data — enabling the integration team to build connectors independently of the platform team's database and API work.

Scope of "read-only": the connector framework is read-only with respect to source systems — connectors never modify the systems they ingest from. This document defines that ingestion contract. Platform-issued tickets to external systems (Jira / GitHub / ServiceNow / Linear) initiated by the user as part of a remediation action are a separate outbound capability that does not run through this connector framework; see ADR-019 for that carve-out, including the URL-redaction policy that prevents PII leakage in outbound deeplinks.

Design Influences

  • Veza OAA (Open Authorization API): 3-step pattern (extract → transform → load) with standardized JSON schema. 250+ integrations via pluggable connectors.
  • SecurityV0 differentiator: We normalize to an execution authority model — not just "what can this identity access" but "what execution paths exist, who owns them, and how have they changed."

Decision

A connector interface contract with four phases: Extract → Transform → Diff → Load. Connectors produce a NormalizedGraph that is database-agnostic. The Diff Engine and Storage Adapter handle database-specific operations.


Rationale

Why Interface-Based

  • Independent developability: Integration team builds connectors against the interface contract. Platform team builds the diff engine and storage adapter. No coordination needed beyond the schema.
  • Testability: Connectors can be tested with mock API responses. Platform can be tested with mock NormalizedGraph inputs.
  • Database agnosticism: Same connector works regardless of storage backend (MongoDB for MVP, Neo4j addition for future scale).

Why Not Direct API-to-Database Writes

  • Coupling: connector changes require database schema knowledge
  • No diff computation: can't detect what changed without comparing to previous state
  • No normalization: each connector would need to understand the full schema

Design Detail

Connector Interface

// =============================================================
// CONNECTOR INTERFACE CONTRACT
// =============================================================

interface Connector {
// --- Metadata ---
readonly id: string; // Unique connector identifier (e.g., "entra_id_v1")
readonly name: string; // Human-readable name
readonly sourceSystem: SourceSystem; // Enum: which system this connects to
readonly version: string; // Semantic version of the connector
readonly description: string;

// --- Configuration ---
readonly configSchema: JSONSchema7; // JSON Schema for required configuration
readonly requiredPermissions: PermissionSpec[]; // What permissions needed in source system

// --- Supported entity types ---
readonly entityTypes: NormalizedNodeType[]; // What this connector discovers
readonly relationshipTypes: NormalizedEdgeType[]; // What relationships it maps

// --- Core operations ---
healthCheck(config: ConnectorConfig): Promise<HealthCheckResult>;
extract(config: ConnectorConfig, options: ExtractOptions): Promise<RawExtraction>;
transform(raw: RawExtraction): Promise<NormalizedGraph>;

// --- Optional: incremental sync support ---
supportsIncremental: boolean;
extractIncremental?(config: ConnectorConfig, since: DateTime, deltaToken?: string): Promise<RawExtraction>;

// --- Audit log extraction (preferred for temporal tracking) ---
supportsAuditLog: boolean;
auditLogConfig?: {
retentionDays: number; // How long source retains audit data (e.g., 90 for Azure)
supportedOperations: string[]; // Operations this connector can extract
rateLimit: {
requestsPerSecond: number;
batchSize: number;
};
};
extractAuditLogs?(config: ConnectorConfig, options: AuditLogExtractOptions): Promise<AuditLogExtraction>;
}

// =============================================================
// CONFIGURATION
// =============================================================

interface ConnectorConfig {
tenantId: string;
connectorId: string;

// Source system credentials (retrieved from vault at runtime)
credentials: {
type: 'oauth2_client_credentials' | 'api_key' | 'pat' | 'certificate';
// Actual secrets injected at runtime, never stored in config
};

// Source-specific configuration
sourceConfig: Record<string, unknown>; // Validated against configSchema

// Sync behavior
syncMode: 'full' | 'incremental';
timeout: number; // Max sync duration in seconds
rateLimitConfig?: {
maxRequestsPerSecond: number;
retryBackoffMs: number;
maxRetries: number;
};
}

interface ExtractOptions {
syncId: string; // Unique ID for this sync run
syncMode: 'full' | 'incremental' | 'audit_log';
since?: DateTime; // For incremental: last successful sync timestamp
deltaToken?: string; // For incremental: source-specific continuation token
entityFilter?: NormalizedNodeType[]; // Optional: only extract specific entity types
}

// =============================================================
// AUDIT LOG EXTRACTION (preferred for temporal tracking)
// =============================================================

interface AuditLogExtractOptions {
syncId: string;
since: DateTime; // Fetch audit logs since this time
until?: DateTime; // Optional end time (default: now)
cursor?: SyncCursor; // Resume from previous cursor
operations?: string[]; // Filter to specific operations
}

interface SyncCursor {
tenantId: string;
sourceSystem: SourceSystem;
cursorState: {
// Azure Entra ID
lastActivityDatetime?: DateTime;
skipToken?: string;

// ServiceNow
lastSysUpdatedOn?: DateTime;
lastSysId?: string;
};
lastSuccessfulSync: DateTime;
}

interface AuditLogExtraction {
syncId: string;
connectorId: string;
tenantId: string;
extractedAt: DateTime;

// Audit records from source system
auditRecords: AuditRecord[];

// Cursor for next fetch
nextCursor: SyncCursor;

// Metadata
metadata: {
recordsFetched: number;
oldestRecord: DateTime;
newestRecord: DateTime;
hasMore: boolean;
rateLimitRemaining?: number;
sourceApi: string; // e.g., 'directoryAudits', 'sys_audit'
};
}

interface AuditRecord {
sourceRecordId: string; // ID in source audit log
timestamp: DateTime; // When the change occurred
operation: string; // Source-specific operation name

// Actor who made the change
actor: {
id: string;
type: 'user' | 'service_principal' | 'system' | 'unknown';
displayName?: string;
};

// What was affected
targetResources: Array<{
id: string;
type: string;
displayName?: string;
modifiedProperties?: Array<{
name: string;
oldValue: unknown;
newValue: unknown;
}>;
}>;

// Additional context
result: 'success' | 'failure' | 'unknown';
additionalDetails?: Record<string, unknown>;

// Original record for debugging
rawRecord: Record<string, unknown>;
}

// =============================================================
// EXTRACTION OUTPUT
// =============================================================

interface RawExtraction {
syncId: string;
connectorId: string;
tenantId: string;
extractedAt: DateTime;

// Raw entities from source API
entities: RawEntity[];
relationships: RawRelationship[];

// Sync metadata
metadata: ExtractionMetadata;

// For incremental sync continuation
deltaToken?: string;
}

interface RawEntity {
sourceId: string; // ID in the source system
sourceType: string; // Source-specific type name
properties: Record<string, unknown>; // Raw API response fields
extractedAt: DateTime;
}

interface RawRelationship {
sourceId: string;
targetId: string;
type: string; // Source-specific relationship name
properties: Record<string, unknown>;
extractedAt: DateTime;
}

interface ExtractionMetadata {
apiCallCount: number;
totalEntitiesDiscovered: number;
paginationComplete: boolean;
rateLimitRemaining?: number;
errors: ExtractionError[]; // Non-fatal errors (e.g., individual entity fetch failed)
warnings: string[];
}

interface ExtractionError {
entitySourceId?: string;
operation: string;
message: string;
retryable: boolean;
}

Normalized Schema

The transform phase maps source-specific data to a common vocabulary. This is the contract between connectors and the platform.

// =============================================================
// NORMALIZED GRAPH (output of transform, input to diff engine)
// =============================================================

interface NormalizedGraph {
syncId: string;
connectorId: string;
tenantId: string;
transformedAt: DateTime;

nodes: NormalizedNode[];
edges: NormalizedEdge[];

// Temporal markers: what this connector knows changed
temporalMarkers: TemporalMarker[];

// Evidence availability report: what evidence sources this connector probed
// and their status. Propagates to findings via evidence_completeness.
evidenceCompleteness: EvidenceCompletenessReport;
}

interface EvidenceCompletenessReport {
sources: Record<string, EvidenceSourceStatus>;
}

interface EvidenceSourceStatus {
sourceTable: string; // e.g., "syslog_transaction", "sys_audit_role"
status: 'available' | 'unavailable_not_enabled' | 'unavailable_no_access' | 'unavailable_not_applicable';
recordCount?: number; // How many records retrieved (if available)
oldestRecord?: DateTime; // Oldest record timestamp (if available)
notes?: string; // Human-readable note about limitations
}

// =============================================================
// NORMALIZED NODE TYPES
// =============================================================

type NormalizedNodeType =
| 'identity' // Service principal, OAuth app, machine account (authenticating entities)
| 'automation' // Business rule, script include, flow, scheduled job (execution logic)
| 'connection' // REST message, SOAP message, HTTP connection (outbound integration)
| 'human_identity' // User accounts (owners, approvers) — platform maps to internal 'owner'
| 'role' // Any grouping of permissions
| 'permission' // Individual capability
| 'resource' // Thing being acted upon
| 'credential' // Authentication material (OAuth profile, API key, certificate)
| 'execution_evidence'; // Proof of execution (log entry, API call record)

// MIGRATION COMPATIBILITY: During the migration window, the platform ingestion
// normalizer also accepts the legacy type 'autonomous_identity' and remaps it:
// - autonomous_identity with identitySubtype in (service_principal, oauth_app, machine_account, integration_user) → 'identity'
// - autonomous_identity with identitySubtype in (business_rule, script_include, flow_designer_flow, scheduled_job, event_script, transform_map) → 'automation'
// - autonomous_identity with identitySubtype in (oauth_provider, oauth_profile) → 'credential'
// Connectors should migrate to the new types. Legacy type acceptance will be removed in v3.

// NORMALIZATION NOTE: `human_identity` and Owners
//
// Connectors extract `human_identity` nodes for user accounts they discover
// in source systems (Entra users, ServiceNow sys_user records). The platform's
// normalizer layer maps human_identity to internal entity_type: "owner", based on:
// 1. OWNED_BY edges emitted by the connector (explicit ownership)
// 2. BELONGS_TO edges (group/team membership implying team-level ownership)
// 3. APPROVED_BY edges (approval relationships)
//
// Connectors emit facts (human_identity + OWNED_BY edge); the normalizer
// creates the Owner entity with appropriate owner_type (human, team, business_unit).
//
// Do NOT add 'owner' as a NormalizedNodeType. Ownership is a platform concept
// derived from connector-provided relationship data, not a source-system entity type.

interface NormalizedNode {
// Identity
nodeId: string; // Connector-generated stable ID: `${sourceSystem}:${sourceId}`
nodeType: NormalizedNodeType;
sourceSystem: SourceSystem;
sourceId: string; // ID in source system

// Common properties (all node types)
displayName: string;
status: NodeStatus;
createdAt?: DateTime;
lastModifiedAt?: DateTime;

// Type-specific properties (see union type below)
properties: NodeProperties;
}

// Union of all type-specific property interfaces.
// The correct interface depends on nodeType:
// identity → IdentityProperties
// automation → AutomationProperties
// connection → ConnectionProperties
// credential → CredentialProperties
// human_identity → HumanIdentityProperties
// role → RoleProperties
// permission → PermissionProperties
// resource → ResourceProperties
// execution_evidence → ExecutionEvidenceProperties
type NodeProperties =
| IdentityProperties
| AutomationProperties
| ConnectionProperties
| CredentialProperties
| HumanIdentityProperties
| RoleProperties
| PermissionProperties
| ResourceProperties
| ExecutionEvidenceProperties;

// --- Type-specific property interfaces ---

interface IdentityProperties {
identitySubtype:
| 'service_principal' | 'oauth_app' | 'github_app' | 'agent' | 'machine_account' | 'bot'
| 'integration_user' | 'system_execution';
// Note: PATs are credentials (credentialSubtype: 'pat'), not identities.
// A PAT authenticates *as* an identity but is itself authentication material.
// See ADR-006 entity classification decision tree.
executionMode: 'autonomous' | 'operator_assisted' | 'human_triggered' | 'unknown';
securityRelevance?: 'active_external' | 'dormant_authority' | 'internal_inventory';
lastActivityAt?: DateTime;
ownershipState: OwnershipState;
}

interface AutomationProperties {
automationSubtype:
| 'business_rule' | 'script_include' | 'flow_designer_flow'
| 'scheduled_job' | 'event_script' | 'transform_map';
executionMode: 'autonomous' | 'operator_assisted' | 'human_triggered' | 'unknown';
securityRelevance: 'active_external' | 'dormant_authority' | 'internal_inventory';
lastActivityAt?: DateTime;
ownershipState: OwnershipState;
}

interface ConnectionProperties {
connectionSubtype: 'rest_message' | 'rest_method' | 'soap_message' | 'http_connection';
targetUrl?: string;
authMethod?: string;
description?: string;
}

interface HumanIdentityProperties {
email?: string;
orgUnit?: string;
jobTitle?: string;
accountEnabled: boolean;
disabledAt?: DateTime;
deletedAt?: DateTime;
departedAt?: DateTime;
}

interface RoleProperties {
roleName: string;
roleSubtype: 'application' | 'directory' | 'cloud_iam' | 'custom';
description?: string;
isPrivileged: boolean; // Elevated/admin-level
}

interface PermissionProperties {
permissionName: string;
normalizedAction: NormalizedAction;
scope: string; // What it applies to
description?: string;
}

interface ResourceProperties {
resourceSubtype: 'table' | 'module' | 'api_endpoint' | 'repository' | 'secret' | 'workflow' | 'storage' | 'compute';
businessDomain: BusinessDomain;
sensitivity: SensitivityLevel;
containsPii?: boolean;
containsFinancialData?: boolean;
}

interface CredentialProperties {
credentialSubtype: 'oauth_client_secret' | 'certificate' | 'pat' | 'api_key' | 'oidc_token' | 'ssh_key'
| 'oauth_provider' | 'oauth_profile' | 'client_secret';
expiresAt?: DateTime;
lastUsedAt?: DateTime;
rotatedAt?: DateTime;
}

interface ExecutionEvidenceProperties {
executionType: string; // 'api_call', 'workflow_run', 'sign_in', 'audit_log_entry'
executedAt: DateTime;
action?: string; // What was done
targetResource?: string; // What was acted upon
outcome: 'success' | 'failure' | 'unknown';
}

// =============================================================
// NORMALIZED EDGE TYPES
// =============================================================

type NormalizedEdgeType =
| 'OWNED_BY' // any → human_identity (ownership)
| 'HAS_ROLE' // identity → role (role assignment)
| 'GRANTS' // role → permission (role includes permission)
| 'APPLIES_TO' // permission → resource (permission scopes to resource)
| 'AUTHENTICATES_VIA' // DEPRECATED: accepted for backward compat, remapped to USES on ingest. Will be removed in a future version.
| 'EXECUTES_ON' // automation → resource (execution evidence, narrowed from identity→resource)
| 'APPROVED_BY' // identity → human (approval/authorization)
| 'MEMBER_OF' // identity/human → role (group/team membership)
| 'BELONGS_TO' // owner → parent owner (ownership hierarchy)
| 'AUTHENTICATES_TO' // identity → identity (cross-system auth chain)
| 'DELEGATES_TO' // identity → identity (delegation chain)
| 'RUNS_AS' // automation → identity | human_identity (which identity the automation executes as)
| 'TRIGGERS_ON' // automation → resource/event (what triggers the automation)
| 'CREATED_BY' // entity → human (who created this, distinct from ownership)
// New execution chain edge types (ADR-007)
| 'CALLS' // automation → automation (BR invokes SI)
| 'INVOKES' // automation → connection (SI uses REST Message)
| 'USES' // connection → credential (REST Message uses OAuth Profile)
| 'AUTHENTICATES_AS'; // credential → identity (OAuth Profile represents SP)

// EXECUTION CHAIN PATTERN
// The canonical 4-hop execution chain from automation to identity:
//
// automation --CALLS--> automation --INVOKES--> connection --USES--> credential --AUTHENTICATES_AS--> identity
//
// Not all chains have every hop. A simple chain may be:
// automation --RUNS_AS--> identity
//
// AUTHENTICATES_AS direction: credential → identity (credential represents the identity).
// This is the reverse of the legacy AUTHENTICATES_VIA direction.

interface NormalizedEdge {
edgeId: string; // Connector-generated stable ID
edgeType: NormalizedEdgeType;
sourceNodeId: string; // References NormalizedNode.nodeId
targetNodeId: string; // References NormalizedNode.nodeId

// Temporal properties (critical for drift detection)
since?: DateTime; // When this relationship was established
until?: DateTime; // When it ended (null = still active)

// Edge-specific properties
properties: EdgeProperties;
}

interface EdgeProperties {
// Common
grantedBy?: string; // Who/what established this relationship
sourceEvidence?: string; // Source system record ID proving this edge exists

// For OWNED_BY
ownershipStatus?: OwnershipState;

// For HAS_ROLE
inherited?: boolean; // Via group membership vs direct assignment

// For EXECUTES_ON
executionCount?: number; // Observed execution count in window
lastExecution?: DateTime;

// For AUTHENTICATES_TO — cross-system linkage proof
// Must include issuer/tenant and target instance context to disambiguate multi-tenant/multi-instance setups
evidenceReferences?: {
issuingSystemId: string; // e.g., Entra SP appId (required)
issuingTenantId: string; // e.g., Entra tenant ID "72f988bf-..." (required)
targetSystemId: string; // e.g., ServiceNow oauth_entity.client_id (required)
targetInstanceId: string; // e.g., "https://corp.service-now.com" (required)
targetRecordSysId?: string; // e.g., oauth_entity sys_id (optional, for direct verification)
matchingField: string; // e.g., "client_id" (required)
matchingValue: string; // The actual matched value (required)
targetUserBinding?: string; // e.g., "oauth_entity.user -> sys_user.user_name" (optional)
};

// For APPROVED_BY
approvalType?: 'initial_setup' | 'role_addition' | 'scope_expansion' | 'reapproval';
approvalEvidence?: string; // Change request ID, ticket number, etc.

// For RUNS_AS
runAsType?: 'configured' | 'inherited' | 'system';

// For TRIGGERS_ON
triggerType?: 'schedule' | 'event' | 'manual' | 'api_call';
schedule?: string; // Cron expression or interval if schedule-triggered

// For CREATED_BY
createdAt?: DateTime;
creationContext?: string; // e.g., "sys_created_by" or "app registration creator"
// Maintainer hints (e.g., sys_updated_by) are supplemental evidence and
// MUST NOT be treated as authoritative ownership without explicit OWNED_BY proof.

// For CALLS (automation → automation)
callType?: 'direct' | 'include' | 'delegate'; // Direct script call, script include, or delegation

// For INVOKES (automation → connection)
httpMethod?: string; // GET, POST, etc. if known

// For USES (connection → credential)
// (no additional properties needed — relationship is structural)

// For AUTHENTICATES_AS (credential → identity)
// (no additional properties needed — relationship is structural)
}

// =============================================================
// CHAIN HINTS (Optional — connector-provided execution chain metadata)
// =============================================================

// Connectors MAY include chainMembership in automation node properties
// to provide hints for platform-side chain assembly. This is optional —
// the platform can also discover chains via BFS from entry points.

interface ChainMembershipHint {
role: 'entry_point' | 'code_component' | 'outbound_target' | 'auth_credential' | 'destination_identity';
anchorEntityId: string; // Entry point entity source_id
chainSemanticHash?: string; // e.g., "trigger-incident-dest-graph" (human-readable chain summary)
}

// Usage in NormalizedNode.properties:
// properties: {
// ...automationProperties,
// chainMembership: {
// role: "entry_point",
// anchorEntityId: "br-sys-id-abc",
// chainSemanticHash: "trigger-incident-dest-graph"
// }
// }

// =============================================================
// ENUMS
// =============================================================

type SourceSystem =
| 'entra_id'
| 'servicenow'
| 'github'
| 'aws'
| 'azure'
| 'gcp'
| 'snowflake'
| 'pagerduty'
| 'okta'
| 'custom';

type NormalizedAction =
| 'create'
| 'read'
| 'update'
| 'delete'
| 'execute' // Run workflows, trigger actions
| 'admin' // Manage the system itself
| 'delegate'; // Grant authority to others

type OwnershipState =
| 'owned' // Has active, accountable human
| 'orphaned' // Owner departed/disabled/deleted
| 'unattributed' // Never had clear owner
| 'disputed'; // Multiple conflicting signals

type BusinessDomain =
| 'hr'
| 'finance'
| 'customer'
| 'it_ops'
| 'security'
| 'engineering'
| 'legal'
| 'executive'
| 'unknown';

type SensitivityLevel =
| 'public'
| 'internal'
| 'confidential'
| 'restricted';

type NodeStatus =
| 'active'
| 'disabled'
| 'deleted'
| 'expired';

// =============================================================
// TEMPORAL MARKERS
// =============================================================

interface TemporalMarker {
nodeId: string;
markerType: 'created' | 'modified' | 'deleted' | 'relationship_changed';
timestamp: DateTime;
details?: string; // Human-readable description of what changed
}

OAA Canonical Permission Mapping

The overview references OAA's 10-type canonical permission taxonomy for interoperability. Internally, SecurityV0 uses 7 normalized actions (simpler, sufficient for blast radius and drift analysis). The OAA exporter maps from internal actions to OAA types during export:

SecurityV0 NormalizedActionOAA Canonical Type(s)Notes
createDataCreate
readDataRead, MetadataReadConnector sets scope to disambiguate data vs config
updateDataWrite, MetadataWriteConnector sets scope to disambiguate data vs config
deleteDataDelete
executeNonDataTrigger workflows, invoke functions
adminAppAdmin, SystemAdminScope determines app-level vs system-level
delegateGlobalAdminGranting access to others is a global-admin concern

The mapping is intentionally lossy inbound (10→7) — SecurityV0 does not need OAA's full granularity for internal analysis. The OAA exporter reverses the mapping using scope and permissionName to recover the original OAA type where possible.

Permission Normalization Guide

Connectors must map source-specific permissions to NormalizedAction. This table provides guidance:

Source SystemSource Permission/RoleNormalized ActionScope
Entra IDApplication.Read.Allreadapplications
Entra IDApplication.ReadWrite.Alladminapplications
Entra IDUser.Read.Allreadusers
Entra IDDirectory.ReadWrite.Alladmindirectory
ServiceNowitil roleupdateincident,problem,change
ServiceNowadmin roleadminall
ServiceNowcatalog_adminadminservice_catalog
ServiceNowhr_read rolereadhr_tables
ServiceNowcustom role with create on table Xcreatetable_x
GitHubcontents: readreadrepository_contents
GitHubcontents: writeupdaterepository_contents
GitHubactions: writeexecuteworkflows
GitHubadmin on repoadminrepository
GitHubmembers: write on orgdelegateorg_membership
AWSs3:GetObjectreads3_bucket/prefix
AWSs3:PutObjectcreates3_bucket/prefix
AWSiam:CreateRoledelegateiam_roles
AWSsts:AssumeRoleexecuteiam_role_arn
SnowflakeSELECT on schemareaddatabase.schema
SnowflakeINSERT on tablecreatedatabase.schema.table
SnowflakeACCOUNTADMIN roleadminaccount

Normalization rules:

  1. If a permission allows modifying system configuration → admin
  2. If a permission allows granting access to others → delegate
  3. If a permission allows triggering automation/workflows → execute
  4. If a permission allows writing data → create or update (based on whether it's new or existing)
  5. If a permission only allows viewing → read
  6. If a permission allows removing data → delete
  7. When in doubt, use the most permissive classification (prefer admin over update)

Business Domain Classification

Connectors should classify resources by business domain where possible. When the source system provides classification (e.g., ServiceNow modules), use it. Otherwise, use name-based heuristics:

Resource Name PatternBusiness Domain
hr_*, employee_*, payroll_*, benefits_*hr
fin_*, invoice_*, payment_*, budget_*, gl_*finance
customer_*, account_*, opportunity_*, case_*customer
incident_*, problem_*, change_*, cmdb_*it_ops
sec_*, audit_*, compliance_*, policy_*security
repo_*, pipeline_*, build_*, deploy_*engineering
Everything elseunknown

Diff Engine

The Diff Engine compares the current NormalizedGraph against the previously known state and emits typed change events.

// =============================================================
// DIFF ENGINE
// =============================================================

interface DiffEngine {
/**
* Compute changes between current extraction and previous known state.
* Emits events to the event store and applies changes to the graph store.
*/
computeAndApply(
tenantId: string,
syncId: string,
currentGraph: NormalizedGraph,
storageAdapter: StorageAdapter
): Promise<DiffResult>;
}

interface DiffResult {
syncId: string;
tenantId: string;
computedAt: DateTime;

// Counts
nodesCreated: number;
nodesUpdated: number;
nodesDeleted: number; // Tombstoned (present in previous, absent in current)
edgesCreated: number;
edgesUpdated: number;
edgesRemoved: number;

// Events emitted
events: GraphEvent[];

// Entities that should trigger re-evaluation
entitiesToEvaluate: string[]; // Node IDs that changed in ways relevant to triggers
}

interface GraphEvent {
eventId: string;
tenantId: string;
timestamp: DateTime;
entityType: NormalizedNodeType;
entityId: string;
eventType: 'created' | 'updated' | 'deleted' | 'relationship_added' | 'relationship_removed';
relationshipType?: NormalizedEdgeType;
targetEntityId?: string;
beforeState: Record<string, unknown> | null;
afterState: Record<string, unknown> | null;
changedProperties?: string[];
sourceConnector: string;
syncId: string;
}

Diff rules:

  1. Node present in current but not in previous → created event
  2. Node present in both but properties differ → updated event (with before/after)
  3. Node present in previous but not in current → deleted event (tombstone in graph)
  4. Edge present in current but not in previous → relationship_added event
  5. Edge present in previous but not in current → relationship_removed event
  6. Edge present in both but properties differ → relationship_removed + relationship_added (treat as replacement)

Trigger-relevant changes (entities added to entitiesToEvaluate):

  • Any identity with ownership change
  • Any identity with role addition/removal
  • Any identity with activity change (dormant detection)
  • Any automation with execution mode or security relevance change
  • Any human_identity with status change (disabled/deleted)

Storage Adapter

The storage adapter is the abstraction layer between the diff engine and the actual database.

// =============================================================
// STORAGE ADAPTER (database-agnostic interface)
// =============================================================

interface StorageAdapter {
// --- Graph operations ---
getNode(tenantId: string, nodeId: string): Promise<NormalizedNode | null>;
getNodesByType(tenantId: string, nodeType: NormalizedNodeType): Promise<NormalizedNode[]>;
getEdgesForNode(tenantId: string, nodeId: string, direction: 'incoming' | 'outgoing' | 'both'): Promise<NormalizedEdge[]>;

upsertNode(tenantId: string, node: NormalizedNode): Promise<void>;
deleteNode(tenantId: string, nodeId: string): Promise<void>;
upsertEdge(tenantId: string, edge: NormalizedEdge): Promise<void>;
deleteEdge(tenantId: string, edgeId: string): Promise<void>;

// --- Path queries ---
queryPaths(tenantId: string, query: PathQuery): Promise<ExecutionPath[]>;

// --- Event operations ---
appendEvents(tenantId: string, events: GraphEvent[]): Promise<void>;
queryEvents(tenantId: string, query: EventQuery): Promise<GraphEvent[]>;

// --- Bulk operations (for full sync) ---
bulkUpsertNodes(tenantId: string, nodes: NormalizedNode[]): Promise<void>;
bulkUpsertEdges(tenantId: string, edges: NormalizedEdge[]): Promise<void>;

// --- State queries ---
getLastSyncState(tenantId: string, connectorId: string): Promise<SyncState | null>;
getGraphForConnector(tenantId: string, connectorId: string): Promise<NormalizedGraph>;
}

interface PathQuery {
startNodeId?: string;
startNodeType?: NormalizedNodeType;
startNodeFilters?: Record<string, unknown>;
traverseEdgeTypes: NormalizedEdgeType[];
endNodeType?: NormalizedNodeType;
endNodeFilters?: Record<string, unknown>;
maxDepth: number; // Safety limit
}

interface ExecutionPath {
nodes: NormalizedNode[];
edges: NormalizedEdge[];
depth: number;
startNode: NormalizedNode;
endNode: NormalizedNode;
}

interface EventQuery {
entityId?: string;
entityType?: NormalizedNodeType;
eventType?: string;
relationshipType?: NormalizedEdgeType;
since?: DateTime;
until?: DateTime;
limit?: number;
orderBy?: 'asc' | 'desc';
}

Connector Lifecycle

┌─────────────────────────────────────────────────────────────┐
│ CONNECTOR SYNC LIFECYCLE │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. SCHEDULE / TRIGGER │
│ ├── Periodic (cron-based, configurable per connector) │
│ ├── Manual (API call: POST /connectors/{id}/sync) │
│ └── Event-driven (future: webhook from source) │
│ │
│ 2. INITIALIZE │
│ ├── Generate sync_id │
│ ├── Load connector config from store │
│ ├── Retrieve credentials from vault │
│ ├── Record sync start (connector_syncs table) │
│ └── Acquire tenant-scoped execution context │
│ │
│ 3. HEALTH CHECK │
│ ├── Verify source API connectivity │
│ ├── Verify credential validity │
│ └── Abort if unhealthy (record failure) │
│ │
│ 4. EXTRACT │
│ ├── Call source APIs (paginated, rate-limited) │
│ ├── Handle API errors (retry with backoff) │
│ ├── Collect raw entities and relationships │
│ └── Produce RawExtraction │
│ │
│ 5. TRANSFORM │
│ ├── Map raw entities to NormalizedNodes │
│ ├── Map raw relationships to NormalizedEdges │
│ ├── Normalize permissions to standard actions │
│ ├── Classify resources by business domain │
│ ├── Compute ownership state │
│ └── Produce NormalizedGraph │
│ │
│ 6. DIFF │
│ ├── Load previous known state from StorageAdapter │
│ ├── Compare nodes: created / updated / deleted │
│ ├── Compare edges: added / removed / modified │
│ ├── Emit GraphEvents (immutable, append-only) │
│ └── Identify entities needing trigger re-evaluation │
│ │
│ 7. LOAD │
│ ├── Append events to event store │
│ ├── Upsert/delete nodes in graph store │
│ ├── Upsert/delete edges in graph store │
│ └── Update sync_version on all affected entities │
│ │
│ 8. POST-SYNC │
│ ├── Update connector_syncs record (completed/failed) │
│ ├── Emit "sync completed" event │
│ ├── Trigger evaluator runs for changed entities │
│ └── Store deltaToken for next incremental sync │
│ │
└─────────────────────────────────────────────────────────────┘

Audit Log Sync Lifecycle

When syncMode: 'audit_log', the connector queries source system audit logs instead of extracting full entity state. This is the preferred mode for temporal tracking.

┌─────────────────────────────────────────────────────────────┐
│ AUDIT LOG SYNC LIFECYCLE │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. CHECK BASELINE STATE │
│ ├── Has initial baseline been taken? │
│ │ ├── No → Trigger FULL sync first to establish │
│ │ │ baseline, then switch to audit_log mode │
│ │ └── Yes → Continue to audit log sync │
│ └── Is retention window approaching? │
│ └── Yes → Schedule protective baseline │
│ │
│ 2. LOAD SYNC CURSOR │
│ ├── Get last successful sync timestamp from cursor │
│ ├── Calculate fetch window: cursor → now │
│ └── Validate cursor not older than retention window │
│ │
│ 3. EXTRACT AUDIT LOGS │
│ ├── Azure Entra ID: │
│ │ └── GET /auditLogs/directoryAudits │
│ │ ?$filter=activityDateTime ge {since} │
│ │ &$orderby=activityDateTime asc │
│ │ │
│ ├── ServiceNow: │
│ │ └── GET /api/now/table/sys_audit │
│ │ ?sysparm_query=sys_updated_on>{since} │
│ │ ^ORDERBYsys_updated_on │
│ │ │
│ └── Handle pagination, rate limiting │
│ │
│ 4. TRANSFORM AUDIT RECORDS TO EVENTS │
│ ├── Map source operations to EventType │
│ ├── Extract actor information │
│ ├── Extract change details (old/new values) │
│ └── Generate audit_source provenance fields │
│ │
│ 5. CORRELATE TO ENTITIES │
│ ├── Match target resources to existing entities │
│ ├── Create new entities if referenced but not exists │
│ ├── Update entity relationships based on events │
│ └── Mark identities needing path recomputation │
│ │
│ 6. PERSIST │
│ ├── Append events to events collection (canonical) │
│ ├── Update affected entities │
│ ├── Recompute execution paths for changed identities │
│ └── Update sync cursor with new position │
│ │
│ 7. POST-SYNC │
│ ├── Check if baseline is due (weekly schedule) │
│ ├── Trigger evaluator for changed entities │
│ ├── Generate evidence packs for new findings │
│ └── Record sync metrics (audit_records_fetched, etc.) │
│ │
└─────────────────────────────────────────────────────────────┘

Audit Operation Mapping

Connectors map source-specific audit operations to SecurityV0 EventType. This table shows the standard mappings:

Azure Entra ID (/auditLogs/directoryAudits)

Activity Display NameEventTypeEntity Type Affected
Add service principalcreatedidentity
Update service principalupdatedidentity
Delete service principaldeletedidentity
Add member to rolerole_assignedidentity / human_identity
Remove member from rolerole_revokedidentity / human_identity
Add owner to service principalowner_assignedidentity
Remove owner from service principalowner_removedidentity
Add app role assignment to service principalpermission_grantedidentity
Remove app role assignment from service principalpermission_revokedidentity
Update user (accountEnabled → false)status_changedhuman_identity
Delete userstatus_changed (decayed)human_identity
Add service principal credentialscredential_createdcredential
Remove service principal credentialscredential_deletedcredential
Consent to applicationpermission_grantedidentity

ServiceNow (sys_audit table)

Table + ActionEventTypeEntity Type Affected
sys_user insertcreatedhuman_identity / identity
sys_user.active → falsestatus_changedhuman_identity / identity
sys_user_has_role insertrole_assignedidentity / human_identity
sys_user_has_role deleterole_revokedidentity / human_identity
sys_user_group insert (for user)relationship_addedhuman_identity
sys_user_group.active → falsestatus_changedOwner (team)
oauth_entity insertcreatedcredential
oauth_entity.active → falsestatus_changedcredential
oauth_entity_scope insertpermission_grantedcredential
oauth_entity_scope deletepermission_revokedcredential

Retention Management

Source systems have limited audit log retention:

  • Azure Entra ID: ~90 days (can be extended with Azure AD Premium P2)
  • ServiceNow: Configurable, typically 1+ year

The connector framework handles retention by:

  1. Tracking retention window: SyncCursor.retentionExpiresAt computed from source retention policy
  2. Daily retention check: Background job checks all cursors for approaching expiry
  3. Protective baseline: When cursor age approaches retention limit (e.g., 83 days for Azure), trigger full sync + baseline before audit logs are purged
  4. Alerting: Notify tenant admin if sync hasn't run and audit data may be lost
// Retention check pseudocode
async function dailyRetentionCheck(tenantId: string) {
const cursors = await db.sync_cursors.find({ tenant_id: tenantId });

for (const cursor of cursors) {
const retentionDays = getRetentionDays(cursor.source_system); // 90 for Azure
const daysSinceLastSync = daysBetween(cursor.last_successful_sync, now());
const warningThreshold = retentionDays - 7; // 7 days warning

if (daysSinceLastSync >= warningThreshold) {
// Trigger immediate sync and baseline
await triggerSync(tenantId, cursor.source_system, {
syncMode: 'full',
takeBaseline: true,
reason: 'approaching_retention_limit'
});

// Alert tenant admin
await notifyRetentionWarning(tenantId, cursor.source_system);
}
}
}

Error Handling

// =============================================================
// ERROR HANDLING STRATEGY
// =============================================================

/**
* Errors are classified into three categories:
* 1. RETRIABLE: Transient failures (rate limits, timeouts, 5xx) — retry with backoff
* 2. PARTIAL: Some entities failed but others succeeded — continue, log errors
* 3. FATAL: Cannot proceed (auth failure, config error) — abort sync, alert
*/

interface SyncErrorPolicy {
// Rate limiting
maxRetries: number; // Default: 3
retryBackoffMs: number; // Default: 1000 (exponential)
maxRetryBackoffMs: number; // Default: 60000

// Partial failure
maxPartialErrors: number; // Default: 50 (abort if more than this many entities fail)
continueOnPartialError: boolean; // Default: true

// Timeout
syncTimeoutSeconds: number; // Default: 3600 (1 hour)
apiCallTimeoutSeconds: number; // Default: 30
}

// Error types
type ConnectorErrorType =
| 'AUTH_FAILURE' // Credentials invalid/expired → FATAL
| 'PERMISSION_DENIED' // Insufficient permissions → FATAL
| 'RATE_LIMITED' // API rate limit hit → RETRIABLE
| 'TIMEOUT' // API call timed out → RETRIABLE
| 'SERVER_ERROR' // Source system 5xx → RETRIABLE
| 'ENTITY_NOT_FOUND' // Specific entity missing → PARTIAL (skip entity)
| 'PARSE_ERROR' // Cannot parse API response → PARTIAL (skip entity)
| 'CONFIG_ERROR' // Invalid configuration → FATAL
| 'NETWORK_ERROR'; // Connectivity issue → RETRIABLE

Priority Connectors

PriorityConnectorSource SystemEntity TypesStatus
1entra_idMicrosoft Entra IDservice_principals, users, app_roles, credentialsDesign ready
2servicenowServiceNowintegration_users, roles, tables, acls, audit_eventsDesign ready
3githubGitHubapps, oauth_apps, pats, actions, org_members, secretsProven in v0.1
4aws_iamAWSiam_roles, policies, trust_relationshipsFuture
5azure_managed_idAzuremanaged_identities, enterprise_appsFuture
6snowflakeSnowflakeservice_accounts, roles, grantsFuture
7pagerdutyPagerDutyservice_integrations, api_keysFuture

Connector Configuration Examples

Entra ID Connector Config

{
"connectorId": "entra_id_v1",
"tenantId": "tenant-xyz",
"credentials": {
"type": "oauth2_client_credentials",
"tenantId": "entra-tenant-id",
"clientId": "app-registration-client-id"
// clientSecret retrieved from vault at runtime
},
"sourceConfig": {
"graphApiVersion": "v1.0",
"includeSignInLogs": true,
"signInLogDays": 30,
"filterServicePrincipalTypes": ["Application"],
"excludeFirstPartyApps": true
},
"syncMode": "full",
"timeout": 3600,
"rateLimitConfig": {
"maxRequestsPerSecond": 10,
"retryBackoffMs": 1000,
"maxRetries": 3
}
}

ServiceNow Connector Config

{
"connectorId": "servicenow_v1",
"tenantId": "tenant-xyz",
"credentials": {
"type": "api_key",
"instanceUrl": "https://instance.service-now.com"
// username + password or OAuth token retrieved from vault
},
"sourceConfig": {
"includeAuditHistory": true,
"auditHistoryDays": 365,
"integrationUserPatterns": ["svc_*", "int_*", "api_*"],
"tableClassification": {
"hr_case": { "domain": "hr", "sensitivity": "confidential", "pii": true },
"incident": { "domain": "it_ops", "sensitivity": "internal" },
"customer_account": { "domain": "customer", "sensitivity": "confidential" }
}
},
"syncMode": "full",
"timeout": 3600,
"rateLimitConfig": {
"maxRequestsPerSecond": 5,
"retryBackoffMs": 2000,
"maxRetries": 3
}
}

Connector Testing Strategy

Each connector should be testable in isolation with mock data:

// =============================================================
// TESTING SUPPORT
// =============================================================

interface MockSourceApi {
/**
* Given a connector config, return canned API responses
* that exercise all entity types and relationship types.
*/
getResponses(config: ConnectorConfig): Map<string, ApiResponse>;
}

interface ConnectorTestHarness {
/**
* Run the connector against mock APIs and verify:
* 1. All expected entity types are extracted
* 2. Normalization produces valid NormalizedGraph
* 3. Temporal markers are correctly set
* 4. Error handling works (inject failures)
*/
runConnectorTest(
connector: Connector,
mockApi: MockSourceApi,
expectations: TestExpectations
): Promise<TestResult>;
}

interface TestExpectations {
expectedNodeCount: number;
expectedEdgeCount: number;
expectedNodeTypes: NormalizedNodeType[];
expectedEdgeTypes: NormalizedEdgeType[];
expectedOrphanedIdentities: number; // For trigger testing
expectedDriftEvents: number; // For temporal testing
}

Mock data scenarios (each connector should have these test cases):

  1. Happy path: All entities extracted and normalized correctly
  2. Orphaned owner: Identity with disabled/deleted owner → ownershipState: 'orphaned'
  3. Scope drift: Role additions over time without re-approval
  4. Credential expiry: Credential approaching or past expiration
  5. Partial failure: Some API calls fail, connector continues with available data
  6. Rate limiting: Connector backs off and retries correctly
  7. Empty response: Source system returns no entities (should not delete existing graph)

Import-by-Type Pattern (Track 2)

See ADR-004 for the full decision record.

The Entra-ServiceNow connector (and recommended pattern for future connectors) uses import-by-type instead of pre-linked entity chains.

Data flow

Source APIs → entity dicts by type → EdgeResolver → DiscoveredEntities → Transformer → NormalizedGraph

DiscoveredEntities

Flat container holding discovered entities grouped by type:

@dataclass
class DiscoveredEntities:
business_rules: list[dict]
script_includes: list[dict]
scheduled_jobs: list[dict]
flows: list[dict]
rest_messages: list[dict]
oauth_entities: list[dict]
azure_sps: list[dict]
azure_users: list[dict]
sn_users: list[dict]
execution_data: dict[str, dict]
auth_edges: list[ResolvedEdge] # Cross-system auth matches
caller_edges: list[ResolvedEdge] # Automation → REST message calls

EdgeResolver

Explicit resolution of cross-entity relationships:

  • resolve_auth_edges() — Matches OAuth entities to Azure SPs by client_id (case-insensitive)
  • resolve_caller_edges() — Matches automations to REST messages by script-text search
  • resolve_indirect_caller_edges() — Matches BR/Job → Script Include by calls_script_include field

Each ResolvedEdge carries provenance properties for evidence packs:

@dataclass
class ResolvedEdge:
source_id: str
target_id: str
edge_type: str # e.g. AUTHENTICATES_TO, CALLS
properties: dict # Evidence references, matching fields

Connector responsibility boundary

Connectors produce:

  • Entity nodes with classification properties (egress_category, origin, ownership_status, risk_group, identity_binding_status)
  • Relationship edges with provenance (RUNS_AS, AUTHENTICATES_TO, OWNED_BY, TRIGGERS_ON, etc.)
  • Execution evidence nodes (sign-in data, flow execution counts)

Connectors do NOT produce findings. All detection and evaluation happens in the platform evaluator. See ADR-005.


Dependencies

  • Depends on: 03-database.md (StorageAdapter implementations reference the schema)
  • Depended on by: 05-reference-impl-entra-servicenow.md (concrete connector implementations)

Open Questions

  1. Connector SDK language: TypeScript (consistent with potential Node.js API layer) or Python (richer API client libraries for Microsoft Graph, ServiceNow)? Could support both via language-agnostic interface (connector outputs JSON, platform ingests).
  2. Incremental sync: Not all source APIs support delta queries. For those that don't, should we do full extraction and rely on the diff engine, or implement application-level change tracking?
  3. Connector isolation model: Separate container per connector per tenant (strongest isolation, higher cost) or shared worker pool with credential injection (simpler ops, weaker isolation)?
  4. Schema evolution: How to handle connector schema changes (e.g., new node properties, new edge types) without breaking existing data?
  5. Connector marketplace: In the future, should third parties be able to publish connectors? If so, what security/sandboxing is needed?
  6. Multi-instance source scope: When the same connector type is deployed across multiple instances (e.g., two AWS accounts, two ServiceNow instances within one tenant), the current nodeId format (${sourceSystem}:${sourceId}) may collide. A source_scope field (scope_type + scope_id) on NormalizedNode and a corresponding update to the database unique index (tenant_id, source_system, source_scope, source_id) would be needed. Not required for current single-instance MVP. Revisit when multi-account AWS or multi-instance connectors enter scope.
  7. IAM policy conditionality: Cloud IAM (especially AWS) uses conditions, permission boundaries, session policies, and explicit deny semantics that materially change effective permissions. The current PermissionProperties has scope and normalizedAction but no structure for conditions. When the AWS IAM connector enters development, evaluate adding a policyCondition structure (conditionLanguage, rawCondition, normalizedKeys) to PermissionProperties. Design constraint: the platform is deterministic — store conditions as evidence and let the connector compute effective permissions during transform, not the platform at query time.