Skip to main content

ingestion

Import path: github.com/BennettSchwartz/membrane/pkg/ingestion

The ingestion package classifies, validates, interprets, and persists data as MemoryRecord values. Most applications should call m.CaptureMemory through the top-level membrane.Membrane facade. Import this package for request types or when constructing Service directly.

Service

type Service struct { ... }

func NewService(store storage.Store, classifier *Classifier, policy *PolicyEngine) *Service
func NewServiceWithInterpreter(store storage.Store, classifier *Classifier, policy *PolicyEngine, interpreter Interpreter) *Service

membrane.New wires these dependencies automatically. NewServiceWithInterpreter enables LLM-backed capture interpretation and candidate resolution.


CaptureMemory

func (s *Service) CaptureMemory(ctx context.Context, req CaptureMemoryRequest) (*CaptureMemoryResponse, error)

Captures a graph-aware memory candidate. It creates a primary record, attaches interpretation metadata, resolves mentions and references, and can create linked entity records.

CaptureMemoryRequest

Sourcestring

Actor or system that produced the content.

SourceKindstring

Capture shape: event, tool_output, observation, working_state, or agent_turn.

Contentanyrequired

JSON-compatible raw content.

Contextany

Optional JSON-compatible context.

ReasonToRememberstring

Reason the content should be retained.

ProposedTypeschema.MemoryType

Optional requested memory type.

Summarystring

Human-readable summary.

Tags[]string

Labels for categorization and filtering.

Scopestring

Visibility scope.

Sensitivityschema.Sensitivity

Sensitivity classification. Defaults to low when empty.

Timestamptime.Time

Capture timestamp. Defaults to time.Now().UTC() when zero.

CaptureMemoryResponse

PrimaryRecord*schema.MemoryRecord

The primary record stored for the capture.

CreatedRecords[]*schema.MemoryRecord

Additional records created during capture, commonly entity records.

Edges[]schema.GraphEdge

Graph edges materialized during capture.

capture, err := m.CaptureMemory(ctx, ingestion.CaptureMemoryRequest{
Source: "auth-agent",
SourceKind: "tool_output",
Content: map[string]any{
"tool_name": "go test",
"args": map[string]any{"packages": []string{"./pkg/auth"}},
"result": map[string]any{"exit_code": 0},
},
ReasonToRemember: "Successful auth package verification",
Summary: "Auth package tests passed",
Tags: []string{"auth", "tests"},
Scope: "project-auth",
Sensitivity: schema.SensitivityLow,
})

Interpretation

type Interpreter interface {
Interpret(ctx context.Context, req InterpretRequest) (*schema.Interpretation, error)
}

type CandidateResolver interface {
Resolve(ctx context.Context, req ResolveRequest) (*schema.Interpretation, error)
}

When configured, the interpreter proposes summaries, memory types, mentions, relation candidates, and reference candidates. If it also implements CandidateResolver, capture performs a second pass with bounded existing-record candidates before writing graph edges.


Lower-Level Helpers

The package still exposes specialized ingestion helpers used by capture and tests. These are service-level APIs, not daemon RPCs.

IngestEvent

func (s *Service) IngestEvent(ctx context.Context, req IngestEventRequest) (*schema.MemoryRecord, error)

Creates an episodic record from a discrete event.

rec, err := svc.IngestEvent(ctx, ingestion.IngestEventRequest{
Source: "agent-core",
EventKind: "user_input",
Ref: "msg-abc123",
Summary: "User asked about deployment strategy",
Tags: []string{"deployment", "strategy"},
})

IngestToolOutput

func (s *Service) IngestToolOutput(ctx context.Context, req IngestToolOutputRequest) (*schema.MemoryRecord, error)

Creates an episodic record with a populated tool graph node.

rec, err := svc.IngestToolOutput(ctx, ingestion.IngestToolOutputRequest{
Source: "agent-core",
ToolName: "run_bash",
Args: map[string]any{"command": "ls -la"},
Result: "total 8",
})

IngestObservation

func (s *Service) IngestObservation(ctx context.Context, req IngestObservationRequest) (*schema.MemoryRecord, error)

Creates a semantic record from a subject-predicate-object observation.

rec, err := svc.IngestObservation(ctx, ingestion.IngestObservationRequest{
Source: "agent-core",
Subject: "user:alice",
Predicate: "prefers_language",
Object: "Go",
Tags: []string{"preference"},
})

IngestOutcome

func (s *Service) IngestOutcome(ctx context.Context, req IngestOutcomeRequest) (*schema.MemoryRecord, error)

Updates an existing episodic record with outcome data.

updated, err := svc.IngestOutcome(ctx, ingestion.IngestOutcomeRequest{
Source: "agent-core",
TargetRecordID: rec.ID,
OutcomeStatus: schema.OutcomeStatusSuccess,
})

IngestWorkingState

func (s *Service) IngestWorkingState(ctx context.Context, req IngestWorkingStateRequest) (*schema.MemoryRecord, error)

Creates a working record from a snapshot of current task state.

rec, err := svc.IngestWorkingState(ctx, ingestion.IngestWorkingStateRequest{
Source: "agent-core",
ThreadID: "session-xyz",
State: schema.TaskStateExecuting,
NextActions: []string{"run tests", "review output"},
ContextSummary: "Refactoring the auth module",
})

Policy Defaults

The PolicyEngine assigns default confidence and decay profile values by source shape and memory type.

Source kindDefault confidence
Event0.8
Tool output0.9
Observation0.7
Outcome0.85
Working state1.0
Memory typeDefault half-life
episodic1 hour
working1 day
semantic30 days
competence30 days
plan_graph30 days
entity30 days