ingestion
Import path: github.com/BennettSchwartz/membrane/pkg/ingestion
The ingestion package classifies, validates, interprets, and persists data as MemoryRecord values. Most applications should call m.CaptureMemory through the top-level membrane.Membrane facade. Import this package for request types or when constructing Service directly.
Service
type Service struct { ... }
func NewService(store storage.Store, classifier *Classifier, policy *PolicyEngine) *Service
func NewServiceWithInterpreter(store storage.Store, classifier *Classifier, policy *PolicyEngine, interpreter Interpreter) *Service
membrane.New wires these dependencies automatically. NewServiceWithInterpreter enables LLM-backed capture interpretation and candidate resolution.
CaptureMemory
func (s *Service) CaptureMemory(ctx context.Context, req CaptureMemoryRequest) (*CaptureMemoryResponse, error)
Captures a graph-aware memory candidate. It creates a primary record, attaches interpretation metadata, resolves mentions and references, and can create linked entity records.
CaptureMemoryRequest
SourcestringActor or system that produced the content.
SourceKindstringCapture shape: event, tool_output, observation, working_state, or agent_turn.
ContentanyrequiredJSON-compatible raw content.
ContextanyOptional JSON-compatible context.
ReasonToRememberstringReason the content should be retained.
ProposedTypeschema.MemoryTypeOptional requested memory type.
SummarystringHuman-readable summary.
Tags[]stringLabels for categorization and filtering.
ScopestringVisibility scope.
Sensitivityschema.SensitivitySensitivity classification. Defaults to low when empty.
Timestamptime.TimeCapture timestamp. Defaults to time.Now().UTC() when zero.
CaptureMemoryResponse
PrimaryRecord*schema.MemoryRecordThe primary record stored for the capture.
CreatedRecords[]*schema.MemoryRecordAdditional records created during capture, commonly entity records.
Edges[]schema.GraphEdgeGraph edges materialized during capture.
capture, err := m.CaptureMemory(ctx, ingestion.CaptureMemoryRequest{
Source: "auth-agent",
SourceKind: "tool_output",
Content: map[string]any{
"tool_name": "go test",
"args": map[string]any{"packages": []string{"./pkg/auth"}},
"result": map[string]any{"exit_code": 0},
},
ReasonToRemember: "Successful auth package verification",
Summary: "Auth package tests passed",
Tags: []string{"auth", "tests"},
Scope: "project-auth",
Sensitivity: schema.SensitivityLow,
})
Interpretation
type Interpreter interface {
Interpret(ctx context.Context, req InterpretRequest) (*schema.Interpretation, error)
}
type CandidateResolver interface {
Resolve(ctx context.Context, req ResolveRequest) (*schema.Interpretation, error)
}
When configured, the interpreter proposes summaries, memory types, mentions, relation candidates, and reference candidates. If it also implements CandidateResolver, capture performs a second pass with bounded existing-record candidates before writing graph edges.
Lower-Level Helpers
The package still exposes specialized ingestion helpers used by capture and tests. These are service-level APIs, not daemon RPCs.
IngestEvent
func (s *Service) IngestEvent(ctx context.Context, req IngestEventRequest) (*schema.MemoryRecord, error)
Creates an episodic record from a discrete event.
rec, err := svc.IngestEvent(ctx, ingestion.IngestEventRequest{
Source: "agent-core",
EventKind: "user_input",
Ref: "msg-abc123",
Summary: "User asked about deployment strategy",
Tags: []string{"deployment", "strategy"},
})
IngestToolOutput
func (s *Service) IngestToolOutput(ctx context.Context, req IngestToolOutputRequest) (*schema.MemoryRecord, error)
Creates an episodic record with a populated tool graph node.
rec, err := svc.IngestToolOutput(ctx, ingestion.IngestToolOutputRequest{
Source: "agent-core",
ToolName: "run_bash",
Args: map[string]any{"command": "ls -la"},
Result: "total 8",
})
IngestObservation
func (s *Service) IngestObservation(ctx context.Context, req IngestObservationRequest) (*schema.MemoryRecord, error)
Creates a semantic record from a subject-predicate-object observation.
rec, err := svc.IngestObservation(ctx, ingestion.IngestObservationRequest{
Source: "agent-core",
Subject: "user:alice",
Predicate: "prefers_language",
Object: "Go",
Tags: []string{"preference"},
})
IngestOutcome
func (s *Service) IngestOutcome(ctx context.Context, req IngestOutcomeRequest) (*schema.MemoryRecord, error)
Updates an existing episodic record with outcome data.
updated, err := svc.IngestOutcome(ctx, ingestion.IngestOutcomeRequest{
Source: "agent-core",
TargetRecordID: rec.ID,
OutcomeStatus: schema.OutcomeStatusSuccess,
})
IngestWorkingState
func (s *Service) IngestWorkingState(ctx context.Context, req IngestWorkingStateRequest) (*schema.MemoryRecord, error)
Creates a working record from a snapshot of current task state.
rec, err := svc.IngestWorkingState(ctx, ingestion.IngestWorkingStateRequest{
Source: "agent-core",
ThreadID: "session-xyz",
State: schema.TaskStateExecuting,
NextActions: []string{"run tests", "review output"},
ContextSummary: "Refactoring the auth module",
})
Policy Defaults
The PolicyEngine assigns default confidence and decay profile values by source shape and memory type.
| Source kind | Default confidence |
|---|---|
| Event | 0.8 |
| Tool output | 0.9 |
| Observation | 0.7 |
| Outcome | 0.85 |
| Working state | 1.0 |
| Memory type | Default half-life |
|---|---|
episodic | 1 hour |
working | 1 day |
semantic | 30 days |
competence | 30 days |
plan_graph | 30 days |
entity | 30 days |