Database Integration: MCP for Azure PostgreSQL and pgvector

Database Integration: MCP for Azure PostgreSQL and pgvector

Throughout this series, we have explored Model Context Protocol fundamentals, Azure MCP Server capabilities, custom server development, and multi-agent orchestration. Now we focus on one of the most powerful MCP integrations: connecting AI agents to Azure Database for PostgreSQL with pgvector for intelligent, context-aware data access.

Azure Database for PostgreSQL with MCP integration transforms how AI applications interact with enterprise data. Instead of writing custom database access layers or managing complex API endpoints, you can enable natural language queries that automatically translate to optimized SQL operations, including advanced vector similarity searches.

Azure Database for PostgreSQL MCP Server

Microsoft announced the public preview of the Azure Database for PostgreSQL MCP Server in April 2025. This open-source implementation provides a standardized bridge between AI models and PostgreSQL databases, enabling agents to discover schemas, query data, and perform vector operations through natural language interactions.

Key Capabilities

The PostgreSQL MCP Server exposes several critical tools that AI agents can invoke:

  • Database Discovery: List all databases in your PostgreSQL Flexible Server instance
  • Schema Inspection: Enumerate tables with their complete schema information including columns, data types, and constraints
  • Query Execution: Run read queries to retrieve data, with automatic query optimization and result formatting
  • Data Manipulation: Insert and update records through natural language commands
  • Vector Search: Perform similarity searches using pgvector extension with cosine, L2, and inner product distance metrics

Architecture

graph TB
    subgraph "MCP Client Layer"
        A[AI Agent/Application]
        B[GitHub Copilot Agent Mode]
        C[Claude Desktop]
        D[VS Code MCP Client]
    end
    
    subgraph "MCP Server"
        E[Azure PostgreSQL MCP Server]
        F[Tool Registry]
        G[Authentication Manager]
    end
    
    subgraph "Authentication"
        H[Microsoft Entra ID]
        I[Managed Identity]
        J[Password Auth]
    end
    
    subgraph "Azure Database for PostgreSQL"
        K[PostgreSQL Flexible Server]
        L[pgvector Extension]
        M[azure_ai Extension]
        N[Database Tables]
    end
    
    subgraph "AI Services"
        O[Azure OpenAI]
        P[Embedding Models]
    end
    
    A --> E
    B --> E
    C --> E
    D --> E
    E --> F
    E --> G
    G --> H
    G --> I
    G --> J
    E --> K
    K --> L
    K --> M
    K --> N
    M --> O
    O --> P
    P -.Embeddings.-> L
    
    style E fill:#4A90E2
    style K fill:#336791
    style L fill:#7ED321
    style H fill:#F5A623

Setting Up Azure PostgreSQL MCP Server

Let’s walk through a complete setup including authentication, schema creation, and vector search capabilities.

Prerequisites

  • Azure Database for PostgreSQL Flexible Server (version 13 or higher)
  • Python 3.8 or later with pip installed
  • Azure CLI for Entra ID authentication setup
  • MCP-compatible client (Claude Desktop, VS Code, or custom implementation)

Installation

Create a virtual environment and install the required packages:

python -m venv azure-postgresql-mcp-venv
source azure-postgresql-mcp-venv/bin/activate  # On Windows: azure-postgresql-mcp-venv\Scripts\activate

pip install mcp[cli] psycopg[binary] azure-mgmt-postgresqlflexibleservers azure-identity

Python MCP Server Implementation

Here is a complete implementation of a PostgreSQL MCP server with vector search capabilities:

import os
import json
import asyncio
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import psycopg
from psycopg.rows import dict_row
from azure.identity import DefaultAzureCredential
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AzurePostgreSQLMCPServer:
    def __init__(self):
        self.server = Server("azure-postgresql-mcp")
        self.conn: Optional[psycopg.Connection] = None
        self.setup_handlers()
        
    def setup_handlers(self):
        @self.server.list_tools()
        async def list_tools() -> List[Tool]:
            return [
                Tool(
                    name="list_databases",
                    description="List all databases in the PostgreSQL server",
                    inputSchema={
                        "type": "object",
                        "properties": {},
                    }
                ),
                Tool(
                    name="list_tables",
                    description="List all tables in a database with schema information",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "database": {
                                "type": "string",
                                "description": "Database name"
                            }
                        },
                        "required": ["database"]
                    }
                ),
                Tool(
                    name="execute_query",
                    description="Execute a SELECT query on the database",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "database": {
                                "type": "string",
                                "description": "Database name"
                            },
                            "query": {
                                "type": "string",
                                "description": "SQL SELECT query to execute"
                            },
                            "limit": {
                                "type": "integer",
                                "description": "Maximum number of rows to return",
                                "default": 100
                            }
                        },
                        "required": ["database", "query"]
                    }
                ),
                Tool(
                    name="vector_similarity_search",
                    description="Perform vector similarity search using pgvector",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "database": {
                                "type": "string",
                                "description": "Database name"
                            },
                            "table": {
                                "type": "string",
                                "description": "Table name containing vector column"
                            },
                            "vector_column": {
                                "type": "string",
                                "description": "Name of the vector column"
                            },
                            "query_vector": {
                                "type": "array",
                                "items": {"type": "number"},
                                "description": "Query vector for similarity search"
                            },
                            "limit": {
                                "type": "integer",
                                "description": "Number of results to return",
                                "default": 10
                            },
                            "distance_metric": {
                                "type": "string",
                                "enum": ["cosine", "l2", "inner_product"],
                                "description": "Distance metric to use",
                                "default": "cosine"
                            }
                        },
                        "required": ["database", "table", "vector_column", "query_vector"]
                    }
                ),
                Tool(
                    name="insert_data",
                    description="Insert data into a table",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "database": {
                                "type": "string",
                                "description": "Database name"
                            },
                            "table": {
                                "type": "string",
                                "description": "Table name"
                            },
                            "data": {
                                "type": "object",
                                "description": "Column-value pairs to insert"
                            }
                        },
                        "required": ["database", "table", "data"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def call_tool(name: str, arguments: Any) -> List[TextContent]:
            try:
                if name == "list_databases":
                    result = await self.list_databases()
                elif name == "list_tables":
                    result = await self.list_tables(arguments["database"])
                elif name == "execute_query":
                    result = await self.execute_query(
                        arguments["database"],
                        arguments["query"],
                        arguments.get("limit", 100)
                    )
                elif name == "vector_similarity_search":
                    result = await self.vector_similarity_search(
                        arguments["database"],
                        arguments["table"],
                        arguments["vector_column"],
                        arguments["query_vector"],
                        arguments.get("limit", 10),
                        arguments.get("distance_metric", "cosine")
                    )
                elif name == "insert_data":
                    result = await self.insert_data(
                        arguments["database"],
                        arguments["table"],
                        arguments["data"]
                    )
                else:
                    result = {"error": f"Unknown tool: {name}"}
                
                return [TextContent(
                    type="text",
                    text=json.dumps(result, indent=2, default=str)
                )]
            except Exception as e:
                logger.error(f"Error executing tool {name}: {str(e)}")
                return [TextContent(
                    type="text",
                    text=json.dumps({"error": str(e)}, indent=2)
                )]
    
    async def get_connection(self, database: str = "postgres") -> psycopg.AsyncConnection:
        """Create database connection with Entra ID or password authentication"""
        host = os.environ.get("PGHOST")
        user = os.environ.get("PGUSER")
        password = os.environ.get("PGPASSWORD")
        use_entra = os.environ.get("USE_ENTRA_AUTH", "false").lower() == "true"
        
        if use_entra:
            credential = DefaultAzureCredential()
            token = credential.get_token("https://ossrdbms-aad.database.windows.net/.default")
            
            conn = await psycopg.AsyncConnection.connect(
                host=host,
                dbname=database,
                user=user,
                password=token.token,
                sslmode="require",
                row_factory=dict_row
            )
        else:
            conn = await psycopg.AsyncConnection.connect(
                host=host,
                dbname=database,
                user=user,
                password=password,
                sslmode="require",
                row_factory=dict_row
            )
        
        return conn
    
    async def list_databases(self) -> Dict[str, Any]:
        """List all databases"""
        conn = await self.get_connection()
        try:
            async with conn.cursor() as cur:
                await cur.execute("""
                    SELECT datname as database_name, 
                           pg_size_pretty(pg_database_size(datname)) as size
                    FROM pg_database
                    WHERE datistemplate = false
                    ORDER BY datname;
                """)
                databases = await cur.fetchall()
                return {"databases": databases}
        finally:
            await conn.close()
    
    async def list_tables(self, database: str) -> Dict[str, Any]:
        """List all tables with schema information"""
        conn = await self.get_connection(database)
        try:
            async with conn.cursor() as cur:
                await cur.execute("""
                    SELECT 
                        t.table_schema,
                        t.table_name,
                        c.column_name,
                        c.data_type,
                        c.is_nullable,
                        c.column_default
                    FROM information_schema.tables t
                    JOIN information_schema.columns c 
                        ON t.table_name = c.table_name 
                        AND t.table_schema = c.table_schema
                    WHERE t.table_schema NOT IN ('pg_catalog', 'information_schema')
                    ORDER BY t.table_schema, t.table_name, c.ordinal_position;
                """)
                columns = await cur.fetchall()
                
                # Group by table
                tables = {}
                for col in columns:
                    key = f"{col['table_schema']}.{col['table_name']}"
                    if key not in tables:
                        tables[key] = {
                            "schema": col["table_schema"],
                            "name": col["table_name"],
                            "columns": []
                        }
                    tables[key]["columns"].append({
                        "name": col["column_name"],
                        "type": col["data_type"],
                        "nullable": col["is_nullable"],
                        "default": col["column_default"]
                    })
                
                return {"tables": list(tables.values())}
        finally:
            await conn.close()
    
    async def execute_query(self, database: str, query: str, limit: int = 100) -> Dict[str, Any]:
        """Execute a SELECT query"""
        if not query.strip().upper().startswith("SELECT"):
            return {"error": "Only SELECT queries are allowed"}
        
        if "LIMIT" not in query.upper():
            query = f"{query.rstrip(';')} LIMIT {limit}"
        
        conn = await self.get_connection(database)
        try:
            async with conn.cursor() as cur:
                await cur.execute(query)
                results = await cur.fetchall()
                return {
                    "query": query,
                    "row_count": len(results),
                    "results": results
                }
        finally:
            await conn.close()
    
    async def vector_similarity_search(
        self,
        database: str,
        table: str,
        vector_column: str,
        query_vector: List[float],
        limit: int = 10,
        distance_metric: str = "cosine"
    ) -> Dict[str, Any]:
        """Perform vector similarity search"""
        operator_map = {
            "cosine": "<=>",
            "l2": "<->",
            "inner_product": "<#>"
        }
        
        operator = operator_map.get(distance_metric, "<=>")
        vector_str = "[" + ",".join(map(str, query_vector)) + "]"
        
        query = f"""
            SELECT *, {vector_column} {operator} '{vector_str}'::vector as distance
            FROM {table}
            ORDER BY distance
            LIMIT {limit};
        """
        
        conn = await self.get_connection(database)
        try:
            async with conn.cursor() as cur:
                await cur.execute(query)
                results = await cur.fetchall()
                return {
                    "query": query,
                    "distance_metric": distance_metric,
                    "results": results
                }
        finally:
            await conn.close()
    
    async def insert_data(self, database: str, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Insert data into table"""
        columns = list(data.keys())
        values = list(data.values())
        
        placeholders = ",".join(["%s"] * len(values))
        columns_str = ",".join(columns)
        
        query = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders}) RETURNING *;"
        
        conn = await self.get_connection(database)
        try:
            async with conn.cursor() as cur:
                await cur.execute(query, values)
                result = await cur.fetchone()
                await conn.commit()
                return {
                    "success": True,
                    "inserted_row": result
                }
        finally:
            await conn.close()
    
    async def run(self):
        """Start the MCP server"""
        async with stdio_server() as (read_stream, write_stream):
            await self.server.run(
                read_stream,
                write_stream,
                self.server.create_initialization_options()
            )

if __name__ == "__main__":
    server = AzurePostgreSQLMCPServer()
    asyncio.run(server.run())

Node.js/TypeScript Implementation

For TypeScript developers, here is an equivalent implementation:

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import pg from "pg";
import { DefaultAzureCredential } from "@azure/identity";

const { Pool } = pg;

interface PostgreSQLConfig {
  host: string;
  user: string;
  password?: string;
  database: string;
  ssl: boolean;
  useEntraAuth: boolean;
}

class AzurePostgreSQLMCPServer {
  private server: Server;
  private pools: Map = new Map();
  private config: PostgreSQLConfig;

  constructor() {
    this.server = new Server(
      {
        name: "azure-postgresql-mcp",
        version: "1.0.0",
      },
      {
        capabilities: {
          tools: {},
        },
      }
    );

    this.config = {
      host: process.env.PGHOST!,
      user: process.env.PGUSER!,
      password: process.env.PGPASSWORD,
      database: "postgres",
      ssl: true,
      useEntraAuth: process.env.USE_ENTRA_AUTH === "true",
    };

    this.setupHandlers();
  }

  private async getPool(database: string): Promise {
    if (this.pools.has(database)) {
      return this.pools.get(database)!;
    }

    let password = this.config.password;

    if (this.config.useEntraAuth) {
      const credential = new DefaultAzureCredential();
      const token = await credential.getToken(
        "https://ossrdbms-aad.database.windows.net/.default"
      );
      password = token.token;
    }

    const pool = new Pool({
      host: this.config.host,
      user: this.config.user,
      password: password,
      database: database,
      ssl: { rejectUnauthorized: true },
    });

    this.pools.set(database, pool);
    return pool;
  }

  private setupHandlers() {
    this.server.setRequestHandler(ListToolsRequestSchema, async () => {
      return {
        tools: [
          {
            name: "list_databases",
            description: "List all databases in the PostgreSQL server",
            inputSchema: {
              type: "object",
              properties: {},
            },
          },
          {
            name: "list_tables",
            description: "List all tables with schema information",
            inputSchema: {
              type: "object",
              properties: {
                database: {
                  type: "string",
                  description: "Database name",
                },
              },
              required: ["database"],
            },
          },
          {
            name: "execute_query",
            description: "Execute a SELECT query",
            inputSchema: {
              type: "object",
              properties: {
                database: { type: "string" },
                query: { type: "string" },
                limit: { type: "number", default: 100 },
              },
              required: ["database", "query"],
            },
          },
          {
            name: "vector_search",
            description: "Perform pgvector similarity search",
            inputSchema: {
              type: "object",
              properties: {
                database: { type: "string" },
                table: { type: "string" },
                vector_column: { type: "string" },
                query_vector: {
                  type: "array",
                  items: { type: "number" },
                },
                limit: { type: "number", default: 10 },
                metric: {
                  type: "string",
                  enum: ["cosine", "l2", "inner_product"],
                  default: "cosine",
                },
              },
              required: ["database", "table", "vector_column", "query_vector"],
            },
          },
        ],
      };
    });

    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      const { name, arguments: args } = request.params;

      try {
        let result;

        switch (name) {
          case "list_databases":
            result = await this.listDatabases();
            break;
          case "list_tables":
            result = await this.listTables(args.database);
            break;
          case "execute_query":
            result = await this.executeQuery(
              args.database,
              args.query,
              args.limit || 100
            );
            break;
          case "vector_search":
            result = await this.vectorSearch(
              args.database,
              args.table,
              args.vector_column,
              args.query_vector,
              args.limit || 10,
              args.metric || "cosine"
            );
            break;
          default:
            throw new Error(`Unknown tool: ${name}`);
        }

        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(result, null, 2),
            },
          ],
        };
      } catch (error) {
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(
                { error: error instanceof Error ? error.message : "Unknown error" },
                null,
                2
              ),
            },
          ],
          isError: true,
        };
      }
    });
  }

  private async listDatabases() {
    const pool = await this.getPool("postgres");
    const result = await pool.query(`
      SELECT datname as database_name, 
             pg_size_pretty(pg_database_size(datname)) as size
      FROM pg_database
      WHERE datistemplate = false
      ORDER BY datname;
    `);
    return { databases: result.rows };
  }

  private async listTables(database: string) {
    const pool = await this.getPool(database);
    const result = await pool.query(`
      SELECT 
        t.table_schema,
        t.table_name,
        c.column_name,
        c.data_type,
        c.is_nullable
      FROM information_schema.tables t
      JOIN information_schema.columns c 
        ON t.table_name = c.table_name 
        AND t.table_schema = c.table_schema
      WHERE t.table_schema NOT IN ('pg_catalog', 'information_schema')
      ORDER BY t.table_schema, t.table_name, c.ordinal_position;
    `);

    const tables: any = {};
    for (const row of result.rows) {
      const key = `${row.table_schema}.${row.table_name}`;
      if (!tables[key]) {
        tables[key] = {
          schema: row.table_schema,
          name: row.table_name,
          columns: [],
        };
      }
      tables[key].columns.push({
        name: row.column_name,
        type: row.data_type,
        nullable: row.is_nullable,
      });
    }

    return { tables: Object.values(tables) };
  }

  private async executeQuery(database: string, query: string, limit: number) {
    if (!query.trim().toUpperCase().startsWith("SELECT")) {
      throw new Error("Only SELECT queries are allowed");
    }

    if (!query.toUpperCase().includes("LIMIT")) {
      query = `${query.replace(/;$/, "")} LIMIT ${limit}`;
    }

    const pool = await this.getPool(database);
    const result = await pool.query(query);

    return {
      query,
      row_count: result.rows.length,
      results: result.rows,
    };
  }

  private async vectorSearch(
    database: string,
    table: string,
    vectorColumn: string,
    queryVector: number[],
    limit: number,
    metric: string
  ) {
    const operatorMap: Record = {
      cosine: "<=>",
      l2: "<->",
      inner_product: "<#>",
    };

    const operator = operatorMap[metric] || "<=>";
    const vectorStr = `[${queryVector.join(",")}]`;

    const query = `
      SELECT *, ${vectorColumn} ${operator} '${vectorStr}'::vector as distance
      FROM ${table}
      ORDER BY distance
      LIMIT ${limit};
    `;

    const pool = await this.getPool(database);
    const result = await pool.query(query);

    return {
      query,
      distance_metric: metric,
      results: result.rows,
    };
  }

  async run() {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
    console.error("Azure PostgreSQL MCP Server running on stdio");
  }
}

const server = new AzurePostgreSQLMCPServer();
server.run().catch(console.error);

Working with pgvector for AI Applications

The pgvector extension transforms PostgreSQL into a powerful vector database for AI applications. Let’s explore practical implementations for semantic search and recommendation systems.

Enabling pgvector

-- Enable extensions
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS azure_ai;

-- Create table with vector column
CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    embedding VECTOR(1536),  -- OpenAI text-embedding-3-small dimensions
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create HNSW index for fast similarity search
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);

C# Implementation with Azure OpenAI Integration

using Npgsql;
using Azure.AI.OpenAI;
using Azure.Identity;
using System.Text.Json;

public class PostgreSQLVectorStore
{
    private readonly string _connectionString;
    private readonly AzureOpenAIClient _openAIClient;
    private readonly string _embeddingDeployment;

    public PostgreSQLVectorStore(
        string connectionString,
        string openAIEndpoint,
        string embeddingDeployment)
    {
        _connectionString = connectionString;
        _embeddingDeployment = embeddingDeployment;
        
        _openAIClient = new AzureOpenAIClient(
            new Uri(openAIEndpoint),
            new DefaultAzureCredential()
        );
    }

    public async Task AddDocumentAsync(string title, string content)
    {
        // Generate embedding
        var embeddingResponse = await _openAIClient.GetEmbeddingsAsync(
            _embeddingDeployment,
            new EmbeddingsOptions(content)
        );

        var embedding = embeddingResponse.Value.Data[0].Embedding.ToArray();

        // Insert into database
        await using var conn = new NpgsqlConnection(_connectionString);
        await conn.OpenAsync();

        var sql = @"
            INSERT INTO documents (title, content, embedding)
            VALUES (@title, @content, @embedding)
            RETURNING id;
        ";

        await using var cmd = new NpgsqlCommand(sql, conn);
        cmd.Parameters.AddWithValue("title", title);
        cmd.Parameters.AddWithValue("content", content);
        cmd.Parameters.AddWithValue("embedding", embedding);

        return (int)(await cmd.ExecuteScalarAsync())!;
    }

    public async Task> SearchAsync(string query, int limit = 10)
    {
        // Generate query embedding
        var embeddingResponse = await _openAIClient.GetEmbeddingsAsync(
            _embeddingDeployment,
            new EmbeddingsOptions(query)
        );

        var queryEmbedding = embeddingResponse.Value.Data[0].Embedding.ToArray();

        // Perform similarity search
        await using var conn = new NpgsqlConnection(_connectionString);
        await conn.OpenAsync();

        var sql = @"
            SELECT id, title, content, 
                   embedding <=> @queryEmbedding as distance
            FROM documents
            ORDER BY distance
            LIMIT @limit;
        ";

        await using var cmd = new NpgsqlCommand(sql, conn);
        cmd.Parameters.AddWithValue("queryEmbedding", queryEmbedding);
        cmd.Parameters.AddWithValue("limit", limit);

        var results = new List();
        await using var reader = await cmd.ExecuteReaderAsync();

        while (await reader.ReadAsync())
        {
            results.Add(new SearchResult
            {
                Id = reader.GetInt32(0),
                Title = reader.GetString(1),
                Content = reader.GetString(2),
                Distance = reader.GetFloat(3)
            });
        }

        return results;
    }

    public async Task> HybridSearchAsync(
        string query,
        int limit = 10)
    {
        // Combine full-text search with vector similarity
        var embeddingResponse = await _openAIClient.GetEmbeddingsAsync(
            _embeddingDeployment,
            new EmbeddingsOptions(query)
        );

        var queryEmbedding = embeddingResponse.Value.Data[0].Embedding.ToArray();

        await using var conn = new NpgsqlConnection(_connectionString);
        await conn.OpenAsync();

        var sql = @"
            WITH vector_search AS (
                SELECT id, title, content,
                       embedding <=> @queryEmbedding as vector_distance
                FROM documents
                ORDER BY vector_distance
                LIMIT @limit * 2
            ),
            text_search AS (
                SELECT id, title, content,
                       ts_rank(to_tsvector('english', content), 
                               plainto_tsquery('english', @query)) as text_rank
                FROM documents
                WHERE to_tsvector('english', content) @@ plainto_tsquery('english', @query)
                ORDER BY text_rank DESC
                LIMIT @limit * 2
            )
            SELECT DISTINCT ON (v.id) v.id, v.title, v.content,
                   v.vector_distance,
                   COALESCE(t.text_rank, 0) as text_rank,
                   (v.vector_distance + (1.0 - COALESCE(t.text_rank, 0))) / 2 as combined_score
            FROM vector_search v
            LEFT JOIN text_search t ON v.id = t.id
            ORDER BY combined_score
            LIMIT @limit;
        ";

        await using var cmd = new NpgsqlCommand(sql, conn);
        cmd.Parameters.AddWithValue("queryEmbedding", queryEmbedding);
        cmd.Parameters.AddWithValue("query", query);
        cmd.Parameters.AddWithValue("limit", limit);

        var results = new List();
        await using var reader = await cmd.ExecuteReaderAsync();

        while (await reader.ReadAsync())
        {
            results.Add(new SearchResult
            {
                Id = reader.GetInt32(0),
                Title = reader.GetString(1),
                Content = reader.GetString(2),
                Distance = reader.GetFloat(3),
                TextRank = reader.GetFloat(4),
                CombinedScore = reader.GetFloat(5)
            });
        }

        return results;
    }
}

public class SearchResult
{
    public int Id { get; set; }
    public string Title { get; set; } = string.Empty;
    public string Content { get; set; } = string.Empty;
    public float Distance { get; set; }
    public float TextRank { get; set; }
    public float CombinedScore { get; set; }
}

Integrating with Azure AI Foundry

For production deployments, you can connect the PostgreSQL MCP Server to Azure AI Foundry Agent Service for managed infrastructure.

Deployment Configuration

from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.ai.agents.models import McpTool

project_client = AIProjectClient.from_connection_string(
    credential=DefaultAzureCredential(),
    conn_str=os.environ["PROJECT_CONNECTION_STRING"]
)

# Create agent with PostgreSQL MCP integration
agent = project_client.agents.create_agent(
    model="gpt-4o",
    name="Database Analytics Agent",
    instructions="""You are a database analytics expert. You can query PostgreSQL databases,
    perform vector similarity searches, and provide insights from data.
    
    Use the available tools to:
    - Discover database schemas
    - Execute SQL queries efficiently
    - Perform semantic searches on document embeddings
    - Analyze data patterns and trends
    
    Always explain your queries and results clearly.""",
    tools=[
        McpTool(
            server_url="https://postgresql-mcp-server.azurecontainerapps.io",
            server_label="postgresql",
            allowed_tools=[
                "list_databases",
                "list_tables",
                "execute_query",
                "vector_similarity_search"
            ]
        )
    ]
)

# Create thread and query
thread = project_client.agents.threads.create()

project_client.agents.messages.create(
    thread_id=thread.id,
    role="user",
    content="Find the 5 most similar documents to 'machine learning best practices'"
)

# Execute with managed identity headers
run = project_client.agents.runs.create(
    thread_id=thread.id,
    assistant_id=agent.id,
    tool_resources={
        "mcp": {
            "postgresql": {
                "headers": {
                    "X-Database": "production",
                    "X-Schema": "public"
                },
                "require_approval": False
            }
        }
    }
)

Production Best Practices

  • Use Microsoft Entra authentication: Avoid password-based auth in production. Configure Entra ID with managed identities for secure access
  • Implement connection pooling: Reuse database connections to reduce overhead and improve performance
  • Enable query logging: Track all AI-generated queries for audit and optimization purposes
  • Set appropriate timeouts: Configure query timeouts to prevent long-running operations from blocking resources
  • Use read replicas: Direct analytics queries to read replicas to protect primary database performance
  • Optimize vector indexes: Choose between HNSW (better recall) and IVFFlat (faster build) based on your use case
  • Implement row-level security: Use PostgreSQL RLS policies to enforce data access controls
  • Monitor embedding dimensions: Ensure consistency between embedding model output and vector column dimensions

Performance Optimization

-- Optimize HNSW index parameters
CREATE INDEX ON documents 
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- For IVFFlat, determine optimal lists parameter
-- Rule of thumb: lists = rows / 1000 for up to 1M rows
CREATE INDEX ON documents 
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- Update statistics after index creation
ANALYZE documents;

-- Hybrid search with combined ranking
CREATE INDEX documents_content_idx ON documents 
USING gin(to_tsvector('english', content));

Looking Ahead

Azure Database for PostgreSQL with MCP integration represents a fundamental shift in how AI applications access enterprise data. By combining natural language interfaces with powerful vector search capabilities, developers can build sophisticated AI systems that understand both structured data and semantic relationships.

In the final post of this series, we will explore production deployment patterns and enterprise best practices for MCP-based systems. You will learn about monitoring strategies, security hardening, multi-tenant architectures, and scaling approaches for mission-critical AI applications.

References

Written by:

535 Posts

View All Posts
Follow Me :
How to whitelist website on AdBlocker?

How to whitelist website on AdBlocker?

  1. 1 Click on the AdBlock Plus icon on the top right corner of your browser
  2. 2 Click on "Enabled on this site" from the AdBlock Plus option
  3. 3 Refresh the page and start browsing the site