Skip to main content

Processing Pipeline Architecture

Date: 2026-02-27 Status: Current implementation Supersedes: 08-processing-pipeline-observability.md (merged into this document)


1. Purpose

This document defines how data flows through the platform from the moment a connector submits discovered entities to the moment findings and evidence packs are queryable via API. It also defines the scan safety guarantees that protect against data loss from incomplete scans, and the operational observability contract — metrics, alerts, SLOs, and dashboards required for production operation.

It covers two pipeline states:

StateDescriptionWhen
Current3 jobs, authority path materialization, circuit breaker safetyWhat runs in production today
Long-term target6 stages, fully decomposedFuture, after multi-connector reconciliation

Each state is defined with: trigger, inputs/outputs, collection mutations, state transitions, and failure handling.


2. Current Pipeline (Production Today)

2.1 Overview

History: Authority path materialization and path-level finding evaluation were added in February 2026 as part of the W1.1 milestone. Circuit breaker safety was added on 2026-02-27 following a production incident where a broken connector scan removed all authority paths. See Scan Safety plan for the incident summary and design rationale.

2.2 Job 1: sync_ingestion

Trigger: Dequeued from in-memory FIFO after ingestion endpoint returns 202.

Source: src/workers/handlers/sync-ingestion.ts

What it does (in order):

 1. Create ConnectorSyncDoc              → connector_syncs (status: running)
2. transformGraph() → entities + evidence (in memory)
3. computeDiff() → events, changed/created/deleted IDs (in memory)
├── ScanScope: incremental mode skips deletion detection
├── ScanScope: targeted mode scopes deletion to declared entity types
└── Circuit breaker: evaluateDeletionBreaker() checks global + per-type thresholds
4. upsertEntities() → entities collection
5. insertEvents() → events collection
6. insertEntityVersions() → entity_versions collection (changed entities)
6b. Soft-delete absent entities → entities collection
├── GATED by circuit breaker: if triggered, ALL entity deletions blocked
└── Sets removed_by_sync_id for rollback determinism
7. upsertExecutionEvidence() → execution_evidence collection
8. materializeExecutionPaths() → entities collection (embedded paths)
9. assembleExecutionChains() → execution_chains collection
10. materializeAuthorityPaths() → authority_paths collection
└── Independent AP removal safety net (AP_REMOVAL_THRESHOLD = 0.50)
10b. Remove APs for deleted workloads → authority_paths collection
└── GATED by circuit breaker: if entity breaker triggered, AP removal also blocked
11. Update sync → completed → connector_syncs (status: completed, metrics)
└── Records circuit_breaker_details in sync metrics if any breaker fired

Collection mutations:

StepCollectionOperationNotes
1connector_syncsinsert/updateStatus: running
4entitiesbulkWrite (upsert)All entity types
5eventsinsertManyChange events
6entity_versionsinsert + updateNew version, expire previous
6bentitiesupdatestatus=deleted for absent entities (circuit breaker gated)
7execution_evidencebulkWrite (upsert)Activity records
8entitiesupdateWrite execution_paths[] embedded array
9execution_chainsbulkWrite (upsert)1 chain per workload
10authority_pathsbulkWrite (upsert)New and updated paths
10authority_pathsupdateManystatus=removed for absent paths (circuit breaker gated)
11connector_syncsupdateStatus: completed + metrics

Sync metrics fields:

metrics: {
entities_created, entities_updated, entities_affected,
events_created, paths_recomputed,
chains_created, chains_updated,
authority_paths_created, authority_paths_updated, authority_paths_removed,
// Present when any circuit breaker fires:
circuit_breaker_triggered, deletions_blocked, authority_paths_removal_blocked,
circuit_breaker_details: {
entity_deletion_ratio, entity_deletion_threshold,
ap_removal_ratio, ap_removal_threshold
}
}

If any step fails: Update sync to status=failed, throw. No rollback (upserts are idempotent on retry).

2.3 Job 2: evaluate_findings

Trigger: Dequeued after Job 1 completes (FIFO ordering).

Gate: Checks sync.status === "completed". If not, skips evaluation.

Source: src/workers/handlers/evaluate-findings.ts, src/evaluator/index.ts

Job 2 is read-only over derived state. Chain (re-)materialization is owned by Job 1 step 9 (sync ingestion) and by the deploy-gate assemble_chains job — see ADR-026. The evaluator does not write execution_chains.

What it does:

 1. Fetch all evaluable entities (identity, workload, credential, connection)
2. Fetch all active authority_paths for this tenant
3. Run entity-level + path-level evaluator rules:
├── Entity-level: 14 rule files → 15 finding types across all evaluable entities
└── Path-level: path-evaluator.ts runs authority-path checks
(dormant_authority, reachable_sensitive_domain, unknown_identity_binding,
unproven_execution, scope_drift, reachability_drift, ownership_drift,
llm_egress, external_egress, ownership_ambiguous, ownership_unknown)
4. For each rule that fires:
a. Compute finding ID: "eval:" + hash(finding_type, entity_id|path_id)
b. Detect content change (severity, explanation, evidence_refs)
c. If new or changed: upsert FindingDoc
d. Path-level findings: set path_id, intervals[] tracking
5. Auto-resolve stale findings:
a. Active findings whose rule no longer fires → status: remediated
b. Path-level: set resolution_reason, close interval
6. Update authority_paths.current_state:
a. active_finding_count = count of active findings for each path
b. max_finding_severity = highest severity among active findings
7. Capture posture snapshot (non-fatal)
8. Enqueue build_evidence_pack for each created/updated finding

Collection mutations:

StepCollectionOperation
1-2entities, authority_pathsread
4findingsbulkWrite (upsert)
5findingsupdate (status → remediated)
6authority_pathsupdate (current_state)
7posture_snapshotsupsert

2.4 Jobs 3..N: build_evidence_pack

Trigger: One job per changed finding, enqueued by Job 2.

What it does:

1. Fetch finding + entity + related entities
2. Fetch execution evidence, entity versions, events
3. Build 9-section evidence pack content
4. Render markdown
5. Compute integrity hash (SHA256 with tenant_id)
6. Insert EvidencePackDoc (with previous_pack_id chain)
7. Update finding with evidence_pack_id

Collection mutations:

StepCollectionOperation
1-2multipleread only
6evidence_packsinsert
7findingsupdate

2.5 Sync State Machine

pending → running → completed
→ failed
StatusMeaning
pendingSync record created, handler not yet started
runningHandler executing (import, resolve, paths, authority paths, chains)
completedAll steps succeeded, metrics recorded
failedAny step failed, last_error recorded

The evaluate_findings gate only fires if status=completed. If the sync failed, no evaluation happens and findings from the previous sync remain as-is.

2.6 Deploy-Triggered Re-Materialization

Some derived collections — execution_chains is the canonical example — depend on assembly logic that is platform code, not connector input. When that code changes (a new ENTRY_POINT_SUBTYPES entry, a new BFS edge, a new chain role), existing source-system data does not change. A new sync is not enqueued automatically, and the previously-materialized rows do not reflect the new logic until a sync happens to fire. The result is a stale derived collection that the v0.6 UI surfaces depend on.

The pipeline closes this gap with a deploy-gate trigger. When a deploy includes a change to chain-builder.ts (or to the schema files that define the entry-point set, the chain-role table, or the BFS edge list), the deploy enqueues one assemble_chains job per active tenant. The job runs assembleExecutionChains against the existing entity graph and upserts the resulting chains. It does not re-ingest source-system data, and it does not run the evaluator.

The assemble_chains job is idempotent. Re-running it against an unchanged graph produces the same composition_hash for every chain and is a net no-op beyond last_seen_at. The same job kind is also enqueued by an operator script for cold-recovery scenarios — incidents where the deploy gate missed an enqueue, or where an operator wants to re-materialize a single tenant during investigation.

The trigger source is the deploy, not the connector. This matters because the root cause of the failure mode is a code change, not a source-system change. Routing the trigger to its actual cause keeps Job 1 (continuous sync) and Job 2 (read-only evaluation) at their documented responsibilities. The evaluator does not write execution_chains; the deploy gate covers what sync alone cannot.

Garbage collection of chain anchors for workloads that no longer match ENTRY_POINT_SUBTYPES is a separate concern. The assemble_chains job upserts; it does not delete. Deletion semantics for orphaned chain documents are tracked as future work.

Rationale and rejected alternatives — including the "let the evaluator materialize" pattern — are documented in ADR-026: Chain Re-Materialization Triggers.


3. Scan Safety & Data Loss Prevention

This section documents the defense-in-depth architecture that prevents data loss from incomplete or broken connector scans. All features described here are implemented and integration-tested.

Trigger: Production incident on 2026-02-26 where a broken connector scan removed all 5 authority paths for the default tenant. See Scan Safety plan for the full incident post-mortem.

3.1 Design Principles

  1. Never trust a single scan to be complete. Connectors can fail partially — API limits, permission revocations, timeouts. The platform must treat incoming data as potentially incomplete.
  2. Absence ≠ deletion. An entity missing from one scan should not be immediately removed without safety checks.
  3. Protect high-value data with circuit breakers. If a sync would remove a significant portion of existing data, halt destructive operations and record diagnostics.
  4. Connectors must declare scope. A connector scanning only Function Apps should not trigger deletion of unrelated ServiceNow workloads.
  5. All destructive operations must be observable and reversible. Operators must be able to see what each scan changed, detect anomalies, and roll back bad syncs.

3.2 ScanScope Declaration

Connectors declare scan coverage in the scanScope field of NormalizedGraph:

ModeDeletion DetectionUse Case
full (default)Enabled — absent entities flagged for deletion, subject to circuit breakerStandard periodic sync
incrementalDisabled — additive only, no deletionsPartial update, new entities only
targetedScoped to scannedEntityTypes[] onlyType-specific refresh (e.g., only workloads)

Additional optional fields: sourceSystems[] (scope deletion to specific source systems), errors[] (connector-reported partial failures).

Source: src/ingestion/types.ts (ScanScope interface)

3.3 Circuit Breaker: Entity Deletion (Layer 1)

Prevents mass entity deletion from a suspect scan. Evaluated in computeDiff() before any destructive operations.

Global threshold: DEFAULT_DELETION_THRESHOLD = 0.50 (50%)

Per-type thresholds:

Entity TypeThreshold
identity30%
workload40%
role40%
permission40%
resource60%
owner50% (global default)

When triggered:

  • Zero entities are soft-deleted (all deletions blocked for the entire sync)
  • Sync completes normally — additive operations (upsert, events, paths) proceed
  • circuit_breaker_triggered: true recorded in sync metrics with detailed diagnostics

Source: src/ingestion/diff-engine.ts (evaluateDeletionBreaker, DEFAULT_DELETION_THRESHOLD, DELETION_THRESHOLDS_BY_TYPE)

3.4 Circuit Breaker: Authority Path Removal (Layer 2)

Independent safety net inside the authority path materializer. This breaker is evaluated AFTER entity processing — even if entity deletion was not triggered (e.g., incremental mode with missing edges), the AP materializer independently blocks mass path removal.

Threshold: AP_REMOVAL_THRESHOLD = 0.50 (50% of existing active paths across all workloads in the sync)

When triggered:

  • Zero authority paths are removed
  • apRemovalBlocked and apRemovalRatio recorded in sync result
  • authority_paths_removal_blocked recorded in sync metrics

Source: src/ingestion/authority-path-materializer.ts (AP_REMOVAL_THRESHOLD)

3.5 Cascading Pipeline Gate

When the entity deletion circuit breaker fires, it blocks ALL downstream destructive operations:

  • Step 6b: Entity soft-deletion (direct — checked via circuitBreakerTriggered)
  • Step 10b: Authority path removal for deleted workloads (cascading — also checked via circuitBreakerTriggered)

This prevents the causal chain: deleted entities → missing execution_paths → AP materializer removes authority paths. The circuitBreakerTriggered flag gates both Step 6b and Step 10b in the sync handler.

The AP materializer's own breaker (§3.4) is independent — it fires based on its own ratio calculation, not the entity breaker. This is defense-in-depth: two independent safety nets at different layers of the pipeline.

Source: src/workers/handlers/sync-ingestion.ts (circuitBreakerTriggered checks at Step 6b and Step 10b)

3.6 Rollback Determinism

All soft-delete operations record removed_by_sync_id:

  • Entities: EntityDoc.removed_by_sync_id (set in Step 6b)
  • Authority paths: AuthorityPathDoc.removed_by_sync_id (set in markAuthorityPathsRemoved)

This enables:

  • Targeted rollback: Restore all entities/paths removed by a specific sync
  • Audit trail: Identify which sync caused each deletion
  • Incident response: Determine the blast radius of a bad scan

3.7 Circuit Breaker Observability

Sync metrics include full diagnostic data when any breaker fires:

{
"circuit_breaker_triggered": true,
"deletions_blocked": 82,
"authority_paths_removal_blocked": 6,
"circuit_breaker_details": {
"entity_deletion_ratio": 0.854,
"entity_deletion_threshold": 0.50,
"ap_removal_ratio": 0.75,
"ap_removal_threshold": 0.50
}
}

3.8 Integration Test Coverage

5 integration tests verify the complete safety chain:

TestWhat it verifies
Healthy → broken scanEntities and authority paths preserved when breaker fires
Under thresholdSmall changes (below threshold) are deleted normally
Three-scan recoveryHealthy → broken → healthy: full data recovery
Incremental modeDeletion detection skipped entirely
Breaker metricsSync metrics contain detailed circuit breaker diagnostics

Source: test/integration/ingestion/circuit-breaker.test.ts


4. Collection Mutation Matrix

Which job writes to which collection, and how:

Collectionsync_ingestionevaluate_findingsbuild_evidence_pack
connector_syncsinsert + update (status, metrics)
entitiesbulkWrite (upsert), update (soft-delete, execution_paths)readread
eventsinsertManyread
entity_versionsinsert + update (expire)read
execution_evidencebulkWrite (upsert)readread
execution_chainsbulkWrite (upsert)
authority_pathsbulkWrite (upsert), updateMany (remove) (circuit breaker gated)update (current_state)
findingsbulkWrite (upsert), update (auto-resolve)update (evidence_pack_id)
evidence_packsinsert

5. Failure Handling

5.1 sync_ingestion Failures

FailureBehaviorRecovery
Schema validation (step 2)Sync marked failed, error loggedFix connector output, re-submit
DB write error (steps 4-11)Sync marked failed, last_error setSubmit a new syncId with the same graph payload — entity-level upserts are idempotent by entity ID, not by syncId
Authority path traversal error (step 10)Sync marked failedDebug traversal, submit new syncId with same payload
Circuit breaker triggeredSync completes normally, destructive ops blockedAuto-recovers on next healthy scan — no manual intervention required
Duplicate syncId200 returned, no jobs enqueuedIntentional — idempotent

Key property: All entity writes are upserts with deterministic IDs (based on tenantId + sourceSystem + sourceId). Submitting the same NormalizedGraph with a new syncId produces the same entity state. This makes the pipeline safe to replay without rollback.

Idempotency note: The duplicate syncId check (200 no-op) prevents accidental double-submission. For recovery after a failed sync, submit the same graph payload with a new syncId. The entity-level upserts will converge to the same state regardless of syncId.

5.2 evaluate_findings Failures

FailureBehaviorRecovery
Sync not completedEvaluation skipped, warning loggedFix sync_ingestion, re-trigger
Individual rule throwsRule error logged, entity marked as failed, auto-resolve skipped for that entity, other rules continueFix rule, re-trigger evaluation
Path-level evaluation throwsSame as entity-level — skip that path, continueFix rule, re-trigger
DB upsert errorThrow, evaluation stopsRe-trigger evaluation (findings are idempotent)

5.3 build_evidence_pack Failures

FailureBehaviorRecovery
Finding not foundWarning logged, job skippedFinding was deleted — no action needed
Entity not foundWarning logged, job skippedEntity was deleted — stale finding
Pack build errorError logged, job failsRe-trigger — pack insert is idempotent (new UUID)

6. Queue Mechanics

Runtime: WorkerRuntime (src/workers/runtime.ts)

PropertyValue
Queue typeIn-memory FIFO array
Processing modelSingle-threaded async loop
Concurrency1 job at a time (sequential)
Job orderingGuaranteed FIFO — sync_ingestion always runs before evaluate_findings
PersistenceNone — queue lost on process restart
RetryNone built-in — re-submit triggers idempotent replay

Job lifecycle:

enqueue() → queued → processing → completed
→ failed (logged, counter incremented)

Job ordering guarantee for a sync:

1. sync_ingestion     ← always first (enqueued at submission time)
2. evaluate_findings ← always second (enqueued at submission time)
3. build_evidence_pack × N ← enqueued by evaluate_findings

Jobs for different syncs can interleave, but the FIFO ordering guarantees that for any given sync, ingestion completes before evaluation starts. The evaluate_findings gate (check sync.status=completed) provides an additional safety check.


7. Correctness Guarantees

7.1 Determinism

GuaranteeHow
Entity IDssha256(tenantId + ":" + sourceSystem + ":" + sourceId).slice(0, 24)
Authority path IDshash(tenant_id, workload_id, identity_id, resource_id)
Path lineage IDshash(tenant_id, workload_id, resource_id)
Finding IDs (entity)"eval:" + hash(finding_type, entity_id)
Finding IDs (path)"eval:" + hash(tenant_id, path_id, finding_type)
Composition hasheshash(identity_id, sorted(roles), sorted(actions))

Re-running the same input produces the same IDs → upserts are idempotent.

7.2 Temporal Integrity

PropertyEnforcement
effective_from never changesSet on creation, never updated
resolved_at set exactly once per intervalSet when finding leaves active state
intervals[] is append-onlyNew intervals appended, existing never modified
Entity versions are temporalvalid_at set on creation, expired_at set when superseded
Evidence packs are immutableInsert-only, never updated, chained via previous_pack_id

7.3 Post-Sync Validation

After the full pipeline completes (sync_ingestion + evaluate_findings + evidence packs), the following invariants must hold. Validation runs at the end of evaluate_findings, since that is when path-level findings are created:

1. Every active path-level finding references an active authority path
∀ f in findings where f.path_id ≠ null AND f.status = "active":
∃ p in authority_paths where p._id = f.path_id AND p.status = "active"

2. Authority path finding counts match actual findings
∀ p in authority_paths where p.status = "active":
p.current_state.active_finding_count = count(findings where path_id = p._id AND status = "active")

3. No sync stage was skipped
sync.status ∈ {"completed", "completed_with_errors", "failed"}
(never stuck in "running" for > 10 minutes)

4. Sync metrics are internally consistent
metrics.authority_paths_created + metrics.authority_paths_updated
+ metrics.authority_paths_removed ≥ 0 (accounting correctness)

If any validation fails: set sync status to completed_with_errors and emit P1 alert (see Section 8 below).


8. Long-Term Target

When multi-connector reconciliation (Phase 4D) becomes necessary, the monolithic sync_ingestion handler splits into separate stage workers:

Each stage writes a checkpoint to connector_syncs. The sync state machine extends:

Each stage:

  • Writes a checkpoint to connector_syncs
  • Can only start if previous stage is completed
  • Workers are re-entrant and idempotent
  • Retryable failures remain in stage with bounded retries (max 5, exponential backoff)
  • Non-retryable failures move sync to failed_<stage> and create alert

Replay: Starts from the failed stage checkpoint, not from stage 1. Requires input payload to be immutable and addressable.

This decomposition is not needed while the platform operates with a single connector and in-process queue. It becomes necessary when:

  • Multiple connectors submit concurrently and need cross-connector reconciliation
  • Stage-level retries are needed (e.g., external API calls in RESOLVE)
  • Stages need independent scaling (e.g., EVALUATE is CPU-heavy)

See Doc 15 (Section 3) for the full stage-level specification.


9. Observability Architecture

9.1 Execution Model

Use scheduled micro-batch ETL per (tenant_id, connector_id) sync.

  • Trigger sources: scheduled sync, manual sync, replay sync
  • Unit of work: one sync_id
  • Concurrency guard: one active sync per (tenant_id, connector_id)
  • Processing shape: sequential jobs (W1.1) or ordered stages (long-term target)

This keeps behavior deterministic and auditable while avoiding streaming complexity before it is needed.

9.2 Structured Logging

All pipeline logs must include:

  • timestamp
  • level
  • tenant_id
  • connector_id
  • sync_id
  • stage (or job_type for W1.1)
  • attempt
  • worker_id
  • trace_id
  • error_code (when applicable)

9.3 Metrics

Required metric families:

  • sv0_sync_started_total
  • sv0_sync_completed_total
  • sv0_sync_failed_total
  • sv0_job_duration_ms{job=sync_ingestion|evaluate_findings|build_evidence_pack}
  • sv0_queue_depth{queue=...}
  • sv0_sync_age_minutes (time since last completed sync per connector)
  • sv0_authority_paths_created_total
  • sv0_authority_paths_removed_total
  • sv0_findings_opened_total
  • sv0_findings_resolved_total

Long-term target (after stage decomposition):

  • sv0_stage_duration_ms{stage=import|resolve|reconcile|project|evaluate|publish}
  • sv0_stage_retry_total{stage=...}

9.4 Tracing

Trace spans for the current pipeline map to jobs:

  • sync.ingestion (parent span with attributes sync_id, tenant_id, connector_id, trigger_type)
  • sync.evaluate
  • sync.evidence_pack

Long-term target spans map 1:1 with stages:

  • sync.import, sync.resolve, sync.reconcile, sync.project, sync.evaluate, sync.publish
  • Parent span: sync.run

10. SLI/SLO and Alert Policy

SLO Targets

  • Sync success rate (24h): >= 99.0%
  • P95 end-to-end sync latency: <= 15m
  • P95 evaluate→evidence lag: <= 5m
  • Data freshness: last successful sync age <= 2x schedule interval

Alert Matrix

AlertConditionSeverityOwner
Sync hard failuresync status = failedP1Platform on-call
Job stallno status/heartbeat update for >10m while runningP1Platform on-call
Ingestion silenceno completed sync for >2x expected intervalP1Connector owner + platform
Queue backlog growthqueue depth over threshold for 15mP2Platform on-call
Delta anomalypaths/findings delta > 3x 14-day baselineP2Security engineering
Correctness violationpost-sync validation failure (completed_with_errors)P1Platform + security engineering

Long-term target additions (after stage decomposition):

  • Stage stall: no checkpoint update for >10m in running stage
  • Retry exhaustion: stage retries > 5

11. Dashboards and Runbooks

Required Dashboards

  1. Pipeline health (success/failure/job latency)
  2. Freshness and backlog (sync age, queue depth, stalled syncs)
  3. Security output integrity (path and finding deltas, correctness failures)

Required Runbooks

  1. Sync failure triage and replay procedure
  2. Schema validation failure handling
  3. Data freshness outage handling
  4. Delta anomaly triage (connector bug vs real environment change)

Cleanup Gates

Cleanup tasks (deprecated collections, legacy routes) are blocked until:

  1. Observability contract from this section is live
  2. Two consecutive weeks of stable SLO compliance
  3. Fault-injection drills completed (transient failure retry, permanent failure escalation)

12. Implementation Status

Completed

All W1.1 pipeline features are implemented and in production as of February 2026:

TaskDescriptionFile(s)Status
1AuthorityPathDoc type definitionsrc/domain/authority-paths/types.tsDone
2materializeAuthorityPaths() functionsrc/ingestion/authority-path-materializer.tsDone
3Authority path storage methodssrc/storage/storage-adapter.ts, src/storage/mongo/adapter.tsDone
4MongoDB indexes for authority_pathssrc/storage/mongo/adapter.tsDone
5Call materializer from sync_ingestion handler (step 10)src/workers/handlers/sync-ingestion.tsDone
6Extend FindingDoc with path-level fieldssrc/domain/findings/types.tsDone
7Path-level evaluation pass in evaluate_findingssrc/workers/handlers/evaluate-findings.ts, src/evaluator/index.tsDone
8Path-level auto-resolve logicsrc/evaluator/index.tsDone
9Update current_state on authority paths after evaluationsrc/workers/handlers/evaluate-findings.tsDone
10Extend sync metrics with authority path countssrc/workers/handlers/sync-ingestion.tsDone
11Circuit breaker safety (entity + AP breakers, cascading gate)src/ingestion/diff-engine.ts, src/ingestion/authority-path-materializer.tsDone
12Scan scope declarationsrc/ingestion/types.ts, src/ingestion/diff-engine.tsDone
13Posture snapshot capturesrc/workers/handlers/evaluate-findings.tsDone
14Integration tests (circuit breaker, path materialization, findings)test/integration/Done

Not changing (current scope)

ItemWhy
Worker runtime (in-memory FIFO)Sufficient for single-connector, in-process use
Monolithic sync_ingestion handlerNo benefit to splitting without external job queue
Legacy embedded execution_paths[] on entitiesKept for backward compat until Phase 7 cleanup
Sync state machine (running/completed/failed)No stage-level states needed yet