AI Agents with Memory Part 2: Episodic Memory – Storing and Retrieving Conversation History at Scale with PostgreSQL, pgvector, and Node.js

AI Agents with Memory Part 2: Episodic Memory – Storing and Retrieving Conversation History at Scale with PostgreSQL, pgvector, and Node.js

Part 1 of this series explained why single-session agents fail at enterprise scale and introduced the three memory types that solve it. This part builds the first and most foundational of those types: episodic memory. By the end of this post, you will have a working Node.js implementation that stores every conversation event, action, and outcome to PostgreSQL with pgvector, and retrieves the most relevant memories at the start of each new session.

What Episodic Memory Stores

Episodic memory is the record of specific events experienced by the agent over time. For a production agent, those events fall into three categories:

Conversation turns – Each user message and agent response, with timestamp, session ID, and user/tenant identifiers. This is the raw history of what was said and when.

Agent actions – Tool invocations, their inputs, outputs, and whether they succeeded or failed. An agent that called a database query tool, got a timeout, retried with a modified query, and succeeded on the second attempt should have all of that recorded, not just the final result.

Observations and outcomes – Higher-level events the agent records about its own reasoning: “User confirmed this approach is correct,” “This query pattern caused a timeout – use index hint next time,” “User’s preferred output format is JSON with camelCase keys.” These are the seeds of semantic and procedural memory but live in the episodic store at the point of creation.

Why PostgreSQL with pgvector

Episodic memory retrieval has two distinct patterns that need to work together. You need to retrieve by time (recent sessions are almost always relevant) and by semantic similarity (memories similar to the current query should surface even if they are old). PostgreSQL with pgvector handles both in a single system.

The alternative of using a dedicated vector database for semantic retrieval and a relational database for time-based retrieval creates synchronisation complexity and two query paths to maintain. PostgreSQL gives you SQL for structured time-based queries, pgvector for similarity search, JSONB for flexible metadata, and transactional writes for consistency, all in one system your operations team already knows how to run.

Database Schema

The schema needs to support both retrieval patterns efficiently while enforcing tenant isolation at the row level.

-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- Sessions table: one row per agent session
CREATE TABLE agent_sessions (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  tenant_id TEXT NOT NULL,
  user_id TEXT NOT NULL,
  agent_id TEXT NOT NULL,
  started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  ended_at TIMESTAMPTZ,
  metadata JSONB DEFAULT '{}'
);

CREATE INDEX idx_sessions_tenant_user 
  ON agent_sessions (tenant_id, user_id, started_at DESC);

-- Memory events table: one row per episodic event
CREATE TABLE episodic_memories (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  tenant_id TEXT NOT NULL,
  user_id TEXT NOT NULL,
  session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
  event_type TEXT NOT NULL, -- 'conversation_turn' | 'agent_action' | 'observation'
  role TEXT,                -- 'user' | 'assistant' | null for actions/observations
  content TEXT NOT NULL,    -- the raw text of the event
  embedding vector(1536),   -- OpenAI text-embedding-3-small dimension
  importance FLOAT DEFAULT 0.5, -- 0.0 to 1.0, higher = more likely to retrieve
  metadata JSONB DEFAULT '{}',  -- tool name, action result, tags, etc.
  created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Composite index for tenant-scoped time queries
CREATE INDEX idx_memories_tenant_user_time 
  ON episodic_memories (tenant_id, user_id, created_at DESC);

-- HNSW vector index for fast similarity search
-- HNSW is better than IVFFlat for recall at low latency
CREATE INDEX idx_memories_embedding 
  ON episodic_memories USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);

-- Partial index for high-importance memories
CREATE INDEX idx_memories_important 
  ON episodic_memories (tenant_id, user_id, importance DESC)
  WHERE importance > 0.7;
erDiagram
    agent_sessions {
        UUID id PK
        TEXT tenant_id
        TEXT user_id
        TEXT agent_id
        TIMESTAMPTZ started_at
        TIMESTAMPTZ ended_at
        JSONB metadata
    }

    episodic_memories {
        UUID id PK
        TEXT tenant_id
        TEXT user_id
        UUID session_id FK
        TEXT event_type
        TEXT role
        TEXT content
        vector embedding
        FLOAT importance
        JSONB metadata
        TIMESTAMPTZ created_at
    }

    agent_sessions ||--o{ episodic_memories : "contains"

Node.js Episodic Memory Client

The client handles session lifecycle, memory writes, and the two retrieval patterns. It uses the pg driver with a connection pool and calls the OpenAI embeddings API to generate vectors for similarity search.

// episodic-memory.js
import pg from 'pg';
import OpenAI from 'openai';

const { Pool } = pg;

const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
  max: 20,
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 5000,
});

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

// Generate embedding vector for a text string
async function embed(text) {
  const response = await openai.embeddings.create({
    model: 'text-embedding-3-small',
    input: text.slice(0, 8000), // truncate to stay within token limit
  });
  return response.data[0].embedding;
}

export class EpisodicMemoryClient {
  constructor({ tenantId, userId, agentId }) {
    this.tenantId = tenantId;
    this.userId = userId;
    this.agentId = agentId;
    this.sessionId = null;
  }

  // Start a new session - call at the beginning of each agent run
  async startSession(metadata = {}) {
    const result = await pool.query(
      `INSERT INTO agent_sessions (tenant_id, user_id, agent_id, metadata)
       VALUES ($1, $2, $3, $4)
       RETURNING id`,
      [this.tenantId, this.userId, this.agentId, JSON.stringify(metadata)]
    );
    this.sessionId = result.rows[0].id;
    return this.sessionId;
  }

  // Close the current session
  async endSession() {
    if (!this.sessionId) return;
    await pool.query(
      `UPDATE agent_sessions SET ended_at = now() WHERE id = $1`,
      [this.sessionId]
    );
    this.sessionId = null;
  }

  // Write a memory event to the store
  // Embedding is generated async and should not block the response path
  async writeMemory({
    eventType,       // 'conversation_turn' | 'agent_action' | 'observation'
    content,
    role = null,
    importance = 0.5,
    metadata = {},
  }) {
    if (!this.sessionId) {
      throw new Error('No active session. Call startSession() first.');
    }

    // Generate embedding - in production, queue this if latency is critical
    const embedding = await embed(content);
    const embeddingLiteral = `[${embedding.join(',')}]`;

    await pool.query(
      `INSERT INTO episodic_memories
         (tenant_id, user_id, session_id, event_type, role, content, embedding, importance, metadata)
       VALUES ($1, $2, $3, $4, $5, $6, $7::vector, $8, $9)`,
      [
        this.tenantId,
        this.userId,
        this.sessionId,
        eventType,
        role,
        content,
        embeddingLiteral,
        importance,
        JSON.stringify(metadata),
      ]
    );
  }

  // Convenience method for conversation turns
  async writeConversationTurn(role, content, importance = 0.5) {
    return this.writeMemory({
      eventType: 'conversation_turn',
      role,
      content,
      importance,
    });
  }

  // Convenience method for tool actions
  async writeAgentAction({ toolName, input, output, success, importance = 0.6 }) {
    const content = `Tool: ${toolName}\nInput: ${JSON.stringify(input)}\nOutput: ${JSON.stringify(output)}\nSuccess: ${success}`;
    return this.writeMemory({
      eventType: 'agent_action',
      content,
      importance,
      metadata: { toolName, success },
    });
  }

  // Convenience method for agent observations
  async writeObservation(content, importance = 0.8) {
    return this.writeMemory({
      eventType: 'observation',
      content,
      importance,
    });
  }

  // Retrieve recent memories (time-based)
  async getRecentMemories({ limit = 20, eventTypes = null, sessionCount = 5 }) {
    const typeFilter = eventTypes
      ? `AND event_type = ANY($4::text[])`
      : '';

    const result = await pool.query(
      `SELECT em.id, em.event_type, em.role, em.content, em.importance,
              em.metadata, em.created_at, s.started_at as session_started
       FROM episodic_memories em
       JOIN agent_sessions s ON em.session_id = s.id
       WHERE em.tenant_id = $1
         AND em.user_id = $2
         AND s.id IN (
           SELECT id FROM agent_sessions
           WHERE tenant_id = $1 AND user_id = $2
           ORDER BY started_at DESC
           LIMIT $3
         )
         ${typeFilter}
       ORDER BY em.created_at DESC
       LIMIT $${eventTypes ? 5 : 4}`,
      eventTypes
        ? [this.tenantId, this.userId, sessionCount, eventTypes, limit]
        : [this.tenantId, this.userId, sessionCount, limit]
    );

    return result.rows;
  }

  // Retrieve semantically similar memories (vector search)
  async getSimilarMemories({ query, limit = 10, minImportance = 0.3, threshold = 0.75 }) {
    const queryEmbedding = await embed(query);
    const embeddingLiteral = `[${queryEmbedding.join(',')}]`;

    const result = await pool.query(
      `SELECT id, event_type, role, content, importance, metadata, created_at,
              1 - (embedding <=> $3::vector) AS similarity
       FROM episodic_memories
       WHERE tenant_id = $1
         AND user_id = $2
         AND importance >= $5
         AND 1 - (embedding <=> $3::vector) >= $6
       ORDER BY embedding <=> $3::vector
       LIMIT $4`,
      [this.tenantId, this.userId, embeddingLiteral, limit, minImportance, threshold]
    );

    return result.rows;
  }

  // Hybrid retrieval: combine recency and similarity, deduplicated
  async retrieveForContext({ query, recentLimit = 10, similarLimit = 10 }) {
    const [recent, similar] = await Promise.all([
      this.getRecentMemories({ limit: recentLimit }),
      this.getSimilarMemories({ query, limit: similarLimit }),
    ]);

    // Merge and deduplicate by id, prefer higher importance on conflict
    const seen = new Map();
    for (const m of [...recent, ...similar]) {
      if (!seen.has(m.id) || m.importance > seen.get(m.id).importance) {
        seen.set(m.id, m);
      }
    }

    // Sort by created_at descending so context is chronological
    return Array.from(seen.values())
      .sort((a, b) => new Date(b.created_at) - new Date(a.created_at));
  }
}

Integrating Episodic Memory into an Agent Loop

The memory client integrates around the LLM call. At session start, retrieve relevant memories and inject them into the system prompt. After each turn, write the new events.

// agent.js
import Anthropic from '@anthropic-ai/sdk';
import { EpisodicMemoryClient } from './episodic-memory.js';

const anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

export class MemoryAwareAgent {
  constructor({ tenantId, userId }) {
    this.memory = new EpisodicMemoryClient({
      tenantId,
      userId,
      agentId: 'production-agent-v1',
    });
    this.conversationHistory = [];
  }

  async startSession() {
    await this.memory.startSession({ startedBy: 'user_request' });
  }

  async endSession() {
    await this.memory.endSession();
    this.conversationHistory = [];
  }

  // Format retrieved memories into a system prompt injection
  formatMemoriesForContext(memories) {
    if (!memories.length) return '';

    const sections = {
      observation: [],
      conversation_turn: [],
      agent_action: [],
    };

    for (const m of memories) {
      sections[m.event_type]?.push(m);
    }

    let context = '\n\n--- Relevant memories from past sessions ---\n';

    if (sections.observation.length) {
      context += '\nKey observations about this user:\n';
      context += sections.observation
        .map(m => `- ${m.content}`)
        .join('\n');
    }

    if (sections.conversation_turn.length) {
      context += '\n\nRecent conversation history:\n';
      context += sections.conversation_turn
        .slice(0, 10) // cap to avoid context bloat
        .map(m => `[${m.role}]: ${m.content}`)
        .join('\n');
    }

    if (sections.agent_action.length) {
      context += '\n\nRecent tool actions:\n';
      context += sections.agent_action
        .slice(0, 5)
        .map(m => `- ${m.content}`)
        .join('\n');
    }

    context += '\n--- End of memories ---\n';
    return context;
  }

  async chat(userMessage) {
    // Retrieve relevant memories for this query
    const memories = await this.memory.retrieveForContext({
      query: userMessage,
      recentLimit: 10,
      similarLimit: 8,
    });

    const memoryContext = this.formatMemoriesForContext(memories);

    const systemPrompt = `You are a helpful production AI agent with access to long-term memory.
You remember past conversations and can build on prior context without the user needing to re-explain.
${memoryContext}`;

    // Add user message to in-session history
    this.conversationHistory.push({ role: 'user', content: userMessage });

    // Call the LLM
    const response = await anthropic.messages.create({
      model: 'claude-sonnet-4-20250514',
      max_tokens: 4096,
      system: systemPrompt,
      messages: this.conversationHistory,
    });

    const assistantMessage = response.content[0].text;
    this.conversationHistory.push({ role: 'assistant', content: assistantMessage });

    // Write memories async - do not await to avoid adding latency
    Promise.all([
      this.memory.writeConversationTurn('user', userMessage, 0.5),
      this.memory.writeConversationTurn('assistant', assistantMessage, 0.5),
    ]).catch(err => console.error('Memory write failed:', err));

    return assistantMessage;
  }

  // Call this when the agent makes a significant observation
  async recordObservation(content, importance = 0.8) {
    await this.memory.writeObservation(content, importance);
  }
}
sequenceDiagram
    participant U as User
    participant A as MemoryAwareAgent
    participant M as EpisodicMemoryClient
    participant DB as PostgreSQL + pgvector
    participant LLM as Claude Sonnet 4.6

    U->>A: chat("How is the auth refactor going?")
    A->>M: retrieveForContext(query)
    M->>DB: getRecentMemories (SQL time query)
    M->>DB: getSimilarMemories (vector cosine search)
    DB-->>M: merged memory rows
    M-->>A: deduped, sorted memories
    A->>A: formatMemoriesForContext()
    A->>LLM: messages + system (with memory context)
    LLM-->>A: assistant response
    A-->>U: response
    A->>M: writeConversationTurn(user) [async]
    A->>M: writeConversationTurn(assistant) [async]
    M->>DB: INSERT with embedding vector

Importance Scoring

Not all memories are equally worth retrieving. Importance scoring lets you bias retrieval toward high-value events. The default of 0.5 works for ordinary conversation turns. Here is a practical guide:

ImportanceEvent typeExample
0.9 – 1.0Critical observation“User confirmed production deploy is blocked until security review passes”
0.7 – 0.9Key preference or constraint“User prefers TypeScript over JavaScript for all new services”
0.6 – 0.7Successful tool result or action“Database query pattern X returned results in under 50ms”
0.4 – 0.6Ordinary conversation turnsNormal Q&A exchanges
0.1 – 0.4Low-signal eventsGreetings, confirmations, filler turns

You can also have the LLM itself score importance. After generating a response, send the turn to a lightweight model with a prompt asking it to rate the importance of this exchange from 0 to 1 and return a JSON score. This adds a small async cost but produces better retrieval over time than static heuristics.

Async Write Queue for Production

The implementation above fires memory writes as a floating promise after each turn. This works but lacks durability: if the process crashes between the response and the write, the memory is lost. For production, route writes through a queue.

// memory-queue.js - Redis Streams writer for durable async memory writes
import { createClient } from 'redis';

const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();

export async function enqueueMemoryWrite(memoryEvent) {
  await redis.xAdd('memory:write:queue', '*', {
    payload: JSON.stringify(memoryEvent),
  });
}

// memory-worker.js - runs as a separate process
import { createClient } from 'redis';
import { EpisodicMemoryClient } from './episodic-memory.js';

const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();

async function processMemoryWrites() {
  while (true) {
    const entries = await redis.xReadGroup(
      'memory-workers',
      `worker-${process.pid}`,
      [{ key: 'memory:write:queue', id: '>' }],
      { COUNT: 10, BLOCK: 5000 }
    );

    if (!entries) continue;

    for (const { messages } of entries) {
      for (const { id, message } of messages) {
        try {
          const event = JSON.parse(message.payload);
          const client = new EpisodicMemoryClient({
            tenantId: event.tenantId,
            userId: event.userId,
            agentId: event.agentId,
          });
          // Reattach to existing session rather than starting a new one
          client.sessionId = event.sessionId;
          await client.writeMemory(event);
          await redis.xAck('memory:write:queue', 'memory-workers', id);
        } catch (err) {
          console.error('Memory write failed:', err, 'message id:', id);
          // Leave unacknowledged for retry via XAUTOCLAIM
        }
      }
    }
  }
}

processMemoryWrites();

Pruning and Retention Policies

Episodic memory grows indefinitely without a retention policy. You need a strategy that keeps the high-value memories while pruning low-signal events that are no longer useful.

A practical default: retain all memories with importance above 0.7 indefinitely, retain all memories from the last 30 days regardless of importance, and delete memories older than 90 days with importance below 0.5. Run this as a nightly job.

// memory-pruner.js - run nightly via cron or Azure Functions timer
import pool from './db.js';

export async function pruneEpisodicMemories() {
  const result = await pool.query(
    `DELETE FROM episodic_memories
     WHERE created_at < now() - INTERVAL '90 days'
       AND importance < 0.5
     RETURNING id`
  );

  console.log(`Pruned ${result.rowCount} low-importance episodic memories`);
  return result.rowCount;
}

What Is Next

Part 3 builds the semantic memory layer: the system that stores distilled facts and knowledge extracted from episodes, indexed for similarity search using Qdrant. Where episodic memory records what happened, semantic memory stores what the agent has learned, and retrieval is purely by conceptual similarity rather than by time.

References

Written by:

622 Posts

View All Posts
Follow Me :