Guarded Context: Role- and Purpose-Aware Access Control for Multi‑Agent Systems
Abstract
Multi‑agent systems improve productivity by sharing context (documents, memory, retrieved snippets, tools). However, uncontrolled context sharing enables over‑reach (agents seeing data they shouldn’t), exfiltration (data leaks through outputs), and role confusion (agents invoking tools beyond their remit). This proposal is about Guarded Context, a design and reference implementation that enforces role‑/attribute‑based access control with purpose binding, data labeling, least privilege, and response redaction. Guarded Context introduces a Context Gateway (PEP) and Context Broker (PDP + store) to mediate all context flows, adds stick‑with‑data labels and purpose TTLs, and integrates policy‑as‑code to keep controls auditable. The approach reduces blast radius while preserving agent collaboration.
1. Problem Statement
Agentic systems often pool:
User memory (preferences, history), Private corpora (documents, tickets, code), Ephemeral run context (RAG chunks, tool results).
Without consistent access controls, any agent in the graph can access any context object, leading to:
Over‑exposure of sensitive data, Privilege escalation (e.g., a “summarizer” accessing HR records), Inadvertent leakage via generated outputs.
2. Goals & Non‑Goals
Goals :
Enforce least privilege across agents and tools. Bind access to role, tenant, purpose, time, and data classification. Provide field‑/row‑level filtering, redaction, and sticky labels. Centralize audit and policy‑as‑code.
Non‑Goals :
Replace enterprise IdP/KMS—integrate with them. Prove cryptographic protocols—use standard, battle‑tested crypto via KMS/HSM.
3. Threat Model
Adversaries
Compromised or misconfigured agents/tools,
Over‑privileged orchestrators,
Curious insiders,
Supply‑chain plugin risks.
Attack Vectors
Unauthorized reads of context,
Output channel leakage,
Purpose creep (reusing context for different tasks),
Cross‑tenant contamination,
Prompt injection to bypass role boundaries.
4. Requirements
Strong identity for agents/tools (mTLS, SPIFFE/SPIRE, service tokens). RBAC + ABAC: role, tenant, data classification, purpose, time, region. Data labeling: classification, owner, scope, retention, allowed roles/scopes. Purpose binding: requests declare purpose; policy enforces purpose‑compatibility and TTL. Row/field-level controls + redaction at egress. Auditable decisions: who accessed what, why, and under which policy. Encryption at rest/in transit; keys isolated per tenant/classification.
5. Architecture
Key components
Agent Mesh: cooperating agents (retriever, planner, executor, summarizer, etc.).
Context Gateway (PEP): the only path to read context; verifies tokens, enforces policy, redacts outputs, emits audit.
Context Broker (PDP + Store): evaluates policies (OPA/Rego or in‑code), manages labels/metadata, fetches encrypted content.
KMS/HSM: envelope encryption; per‑tenant/class keys.
Policy Store: versioned policies (GitOps); tests + approvals.
Audit/SIEM: immutable logs; anomaly detection.
6. Policy Model
Subject: agent_id, roles, tenant, scopes, assurance_level. Resource: context_id, owner, tenant, classification (public/internal/confidential/restricted), allowed_roles, allowed_scopes, allowed_purposes, allowed_fields, retention, region. Action: read, search, summarize. Environment: time, ip/region, run_id, session_id.
Evaluation (ABAC with RBAC overlays):
Allow if all:
subject.tenant == resource.tenant subject.roles ∩ resource.allowed_roles ≠ ∅ or subject.scopes cover resource.allowed_scopes request.purpose ∈ resource.allowed_purposes now <= resource.retention_until env.region ∈ resource.allowed_regions (optional)
On allow: return filters (allowed_fields) and redaction profile (PII, secrets). Otherwise: deny (with reason).
Sticky Labels Responses carry non‑mutable labels (classification, owner, retention) and Purpose‑Binding metadata (purpose, expires_at) so downstream agents must re‑check before further propagation.
7. Sample Policies
7.1 YAML (human‑readable ABAC)
policy.yaml
defaults: deny_by_default: true
classifications: public: {} internal: {} confidential: requires: min_assurance: "mTLS" audit: true restricted: requires: min_assurance: "mTLS+HardwareEnclave" dual_control: true audit: true
rules:
id: same-tenant when: "subject.tenant == resource.tenant" effect: "continue"
id: role-or-scope when: "intersection(subject.roles, resource.allowed_roles) or covers(subject.scopes, resource.allowed_scopes)" effect: "continue"
id: purpose-binding when: "request.purpose in resource.allowed_purposes" effect: "continue"
id: retention when: "now <= resource.retention_until" effect: "continue"
id: region when: "not resource.allowed_regions or env.region in resource.allowed_regions" effect: "continue"
id: allow when: "true" effect: "allow" obligations: filters: "resource.allowed_fields" redaction_profile: "pii+secrets" sticky_labels: ["classification", "owner", "tenant", "purpose", "retention_until"]
7.2 OPA/Rego (policy-as-code)
package context.authz
default allow = false
allow { input.subject.tenant == input.resource.tenant some r input.subject.roles[r] input.resource.allowed_roles[r] input.request.purpose == input.resource.allowed_purposes[_] time.now_ns() <= input.resource.retention_until_ns }
filters := input.resource.allowed_fields redaction_profile := "pii+secrets"
8. Reference Implementation (Python)
For crypto, use AES‑GCM via your KMS/HSM or SDK (e.g., AWS KMS, Azure Key Vault, GCP KMS). Avoid “toy” crypto in production.
8.1 Data Model (labels and metadata)
models.py from dataclasses import dataclass, field from datetime import datetime from typing import List, Dict, Optional
@dataclass class Subject: agent_id: str roles: List[str] tenant: str scopes: List[str] assurance: str # e.g., "mTLS" | "mTLS+Enclave"
@dataclass class RequestCtx: purpose: str fields: Optional[List[str]] = None region: Optional[str] = None now: datetime = field(default_factory=datetime.utcnow)
@dataclass class ResourceMeta: context_id: str tenant: str owner: str classification: str # public|internal|confidential|restricted allowed_roles: List[str] allowed_scopes: List[str] allowed_purposes: List[str] allowed_fields: List[str] retention_until: datetime allowed_regions: List[str] = field(default_factory=list)
8.2 Policy Evaluation (in‑code ABAC; swap with OPA in prod) policy.py from typing import Dict from datetime import datetime from models import Subject, RequestCtx, ResourceMeta
class Decision: def init(self, allow: bool, reason: str = "", filters=None, redaction_profile=None, sticky_labels=None): self.allow = allow self.reason = reason self.filters = filters or [] self.redaction_profile = redaction_profile or "none" self.sticky_labels = sticky_labels or []
def intersect(a, b): return bool(set(a) & set(b)) def covers(scopes, required): return set(required).issubset(set(scopes))
def evaluate(subject: Subject, req: RequestCtx, res: ResourceMeta) -> Decision: if subject.tenant != res.tenant: return Decision(False, "cross-tenant-blocked") if not (intersect(subject.roles, res.allowed_roles) or covers(subject.scopes, res.allowed_scopes)): return Decision(False, "role-or-scope-mismatch") if req.purpose not in res.allowed_purposes: return Decision(False, "purpose-not-allowed") if req.now > res.retention_until: return Decision(False, "beyond-retention") if res.allowed_regions and (req.region not in res.allowed_regions): return Decision(False, "region-not-allowed")
obligations = {
"filters": res.allowed_fields,
"redaction_profile": "pii+secrets" if res.classification in ("confidential","restricted") else "none",
"sticky_labels": ["classification","owner","tenant","purpose","retention_until"]
}
return Decision(True, "ok", **obligations)
8.3 Redaction Utilities (PII/secrets — demo)
# redact.py
import re from typing import Dict, Any, List
PII_PATTERNS = { "email": re.compile(r"[A-Za-z0-9.%+-]+@[A-Za-z0-9.-]+.[A-Za-z]{2,}"), "phone": re.compile(r"+?\d{1,3}[-.\s]?(?\d{1,4})?[-.\s]?\d{2,4}[-.\s]?\d{2,4}"), "ssn": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "api_key": re.compile(r"(?:AKIA|sk-|xoxb-)[A-Za-z0-9-/]{12,}"), }
def apply_field_filter(doc: Dict[str, Any], allowed_fields: List[str]) -> Dict[str, Any]: return {k: v for k, v in doc.items() if k in allowed_fields}
def redact_text(text: str) -> str: redacted = text for _, pat in PII_PATTERNS.items(): redacted = pat.sub("[REDACTED]", redacted) return redacted
def redact_document(doc: Dict[str, Any]) -> Dict[str, Any]: filtered = {} for k, v in doc.items(): if isinstance(v, str): filtered[k] = redact_text(v) else: filtered[k] = v return filtered
8.4 Context Store Facade (encrypted content; KMS stub)
store.py from typing import Dict, Any from models import ResourceMeta
In production, use your KMS SDK to decrypt AES-GCM ciphertext with a per-tenant/class key. class KMS: def unwrap_key(self, tenant: str, classification: str) -> bytes: # placeholder: obtain DEK via KMS (bound to tenant/classification) return b"0123456789abcdef0123456789abcdef"
class ContextStore: def init(self): # demo in-memory; replace with object store / vector DB self._meta: Dict[str, ResourceMeta] = {} self._data: Dict[str, Dict[str, Any]] = {}
def put(self, meta: ResourceMeta, plaintext: Dict[str, Any]):
self._meta[meta.context_id] = meta
# TODO: encrypt with AES-GCM using KMS-dek; store ciphertext+nonce+tag
self._data[meta.context_id] = plaintext
def fetch(self, context_id: str):
return self._meta[context_id], self._data[context_id]
8.5 Context Gateway (PEP) — FastAPI Example
gateway.py from fastapi import FastAPI, Header, HTTPException, Request from fastapi.responses import JSONResponse from datetime import datetime, timedelta from typing import Optional, Dict, Any from models import Subject, RequestCtx from policy import evaluate from redact import apply_field_filter, redact_document from store import ContextStore
app = FastAPI(title="Context Gateway (PEP)") store = ContextStore()
--- Demo bootstrap data (replace with your loader) ---
from models import ResourceMeta from datetime import datetime meta = ResourceMeta( context_id="doc-123", tenant="acme", owner="[email protected]", classification="confidential", allowed_roles=["hr_reader", "hr_analyst"], allowed_scopes=["context.read.hr"], allowed_purposes=["hr_audit","employee_support"], allowed_fields=["title","body","summary"], # field-level control retention_until=datetime.utcnow() + timedelta(days=90), allowed_regions=["US"] ) store.put(meta, { "title": "Employee Case", "body": "PII: [email protected], phone +1-512-555-0123", "summary": "Sensitive HR case. Ticket #12345", "internal_notes": "SSN 123-45-6789; SECRET api key xoxb-123..." })
def parse_jwt(authorization: Optional[str]) -> Subject: # In production, verify JWT signature (JWKS), audience, exp, etc. # Here we accept a demo header "Bearer demo-hr" if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="missing/bad token") token = authorization.split(" ", 1)[1] # toy mapping if token == "demo-hr": return Subject( agent_id="agent-hr-bot", roles=["hr_reader"], tenant="acme", scopes=["context.read.hr"], assurance="mTLS" ) elif token == "demo-summarizer": return Subject( agent_id="agent-sum", roles=["summarizer"], tenant="acme", scopes=["context.read.generic"], assurance="mTLS" ) else: raise HTTPException(status_code=403, detail="unknown token")
@app.get("/context/{context_id}") async def get_context(context_id: str, purpose: str, region: Optional[str] = "US", fields: Optional[str] = None, authorization: Optional[str] = Header(None)): subject = parse_jwt(authorization) req = RequestCtx(purpose=purpose, fields=(fields.split(",") if fields else None), region=region)
try:
meta, doc = store.fetch(context_id)
except KeyError:
raise HTTPException(status_code=404, detail="not found")
decision = evaluate(subject, req, meta)
if not decision.allow:
# audit deny (omitted: send to SIEM)
raise HTTPException(status_code=403, detail=decision.reason)
# field-level filter from policy/obligation
allowed_fields = decision.filters
filtered = apply_field_filter(doc, allowed_fields)
# redaction on top of filtering
if decision.redaction_profile != "none":
filtered = redact_document(filtered)
# enforce request-specified fields subset
if req.fields:
filtered = {k: v for k, v in filtered.items() if k in req.fields}
# attach sticky labels
response = {
"context_id": context_id,
"data": filtered,
"labels": {
"classification": meta.classification,
"owner": meta.owner,
"tenant": meta.tenant,
"purpose": req.purpose,
"retention_until": meta.retention_until.isoformat()
}
}
audit allow (omitted)
return JSONResponse(response)
Try it quickly (dev mode):
pip install fastapi uvicorn pydantic
uvicorn gateway:app --reload --port 8080 Allowed: curl -H "Authorization: Bearer demo-hr" "http://localhost:8080/context/doc-123?purpose=hr_audit" Denied (wrong role/scope): curl -H "Authorization: Bearer demo-summarizer" "http://localhost:8080/context/doc-123?purpose=employee_support" purpose=employee_support"Show more lines
8.6 Agent SDK Wrapper (ensures purpose & labels)
agent_sdk.py from typing import Dict, Any import requests
class ContextClient: def init(self, base_url: str, token: str, purpose: str, region: str = "US"): self.base_url = base_url self.token = token self.purpose = purpose self.region = region
def get(self, context_id: str, fields=None) -> Dict[str, Any]:
params = {"purpose": self.purpose, "region": self.region}
if fields: params["fields"] = ",".join(fields)
resp = requests.get(
f"{self.base_url}/context/{context_id}",
params=params,
headers={"Authorization": f"Bearer {self.token}"}
)
resp.raise_for_status()
return resp.json()
9. Output‑Channel Controls
Even if context access is limited, leakage can occur through outputs. Add:
Egress redaction on all agent responses (PII/secrets). Response size/shape limits (avoid “dump all”). No‑copy labels (prevent downgrading of classification without re‑check). Traceability: include run_id, request_id, policy_version in headers/metadata.
10. Deployment & Integration Notes
Identity: Use mTLS and/or SPIFFE/SPIRE for workload identity; tie JWT sub to SPIFFE ID; short‑lived tokens. KMS/HSM: Per‑tenant/class keys; rotate regularly; use AES‑GCM; bind DEKs to enclave (if available). OPA: Host policy as code; GitOps pipeline with tests; enforce version pinning in Gateway. Data Stores: Support row/field‑level security (e.g., via views) in addition to app‑level filtering. Agent Frameworks: Wrap retrievers/tools with the ContextClient so every call includes purpose and is mediated by the Gateway.
11. Testing & Validation
Unit: policy decisions for role/scope/purpose/retention/region. Property‑based: randomly permuted labels to detect inconsistent outcomes. Security: attempt prompt‑injection to elicit disallowed fields—ensure Gateway filtering/redaction prevents leakage. Performance: measure P50/P95 latency overhead; cache policy decisions per (subject, resource) with TTL.
12. Risks & Mitigations
Prompt injection: The Gateway enforces server‑side policies; never rely on the model to self‑police. Covert channels: Apply output shaping, rate limits, response normalization. Policy drift: Versioned policies + CI tests; expose policy version in responses. Over‑filtering: Promote partial results with reasons to facilitate debug.
13. Conclusion
Guarded Context operationalizes least privilege and purpose‑based access for agentic systems. By centralizing decisions in a Context Gateway + Policy Engine, binding access to labels and purpose, and enforcing egress redaction, teams can safely share context across agents without sacrificing velocity.

