AI in Development
Clusterify.AI
© 2025 All Rights Reserved, Clusterify Solutions FZCO
AI-Driven Sales: The New Playbook to Maximize Ecommerce ROI
Secure MCP Server with Python and NextJS
Guide to Securing MCP AI Servers 2of2
Guide to Securing MCP AI Servers 1of2
NEW Conditional Logic in CSS: From Classic CSS Techniques to the New if() Function
GraphQL May Expose Promo Codes in Magento 2.4.8
July 12, 2025
AI in Development
This article provides a practical, hands-on guide to constructing a secure MCP server backend using Python and the FastAPI framework. The examples will demonstrate the implementation of the key security principles discussed previously in two other articles:
Continue:
A secure application starts with a secure foundation. Before writing application code, it is crucial to establish a secure project environment.
venv
module) to isolate project dependencies from the system’s global Python installation. This prevents version conflicts and ensures a reproducible build environment.pip-tools
can be used to compile a high-level requirements.in
file into a fully pinned requirements.txt
file, which includes all transitive dependencies. This prevents unexpected or malicious packages from being introduced during deployment and ensures that security patches are applied deliberately.
FastAPI’s tight integration with the Pydantic library provides a powerful, declarative way to enforce input validation, which serves as the first line of defense against malformed data and many injection-style attacks.
When a request arrives at a FastAPI endpoint, if the function parameter is type-hinted with a Pydantic model, FastAPI automatically attempts to parse and validate the incoming data against that model’s schema. If the data is invalid (e.g., wrong data type, missing required field), FastAPI immediately rejects the request with a detailed 422 Unprocessable Entity
error, preventing the invalid data from ever reaching the application logic.
The following code demonstrates defining a Pydantic model for a hypothetical MCP Tool
that requires structured input:
# file: schemas.py
from pydantic import BaseModel, Field
from typing import Optional
class QueryToolInput(BaseModel):
"""
Pydantic model for validating the input to a database query tool.
"""
table_name: str = Field(
...,
description="The name of the database table to query.",
pattern="^[a-zA-Z_]+$" # Restrict to only letters and underscores
)
query_filter: dict
limit: int = Field(
default=100,
gt=0,
le=1000,
description="The maximum number of rows to return."
)
# This field is optional and has a default value
sort_by: Optional[str] = None
# Example usage within a FastAPI app (conceptual)
# @app.post("/tools/query")
# async def run_query_tool(tool_input: QueryToolInput):
# # By the time this code is reached, FastAPI has already validated
# # that tool_input conforms to the QueryToolInput schema.
# # We can now safely use tool_input.table_name, tool_input.limit, etc.
# ...
In this example, QueryToolInput
ensures that table_name
is a string containing only safe characters, limit
is an integer within a specific range, and other fields conform to their types. This automatic validation is a crucial first step in hardening the server’s inputs.
It is critical to understand the distinct roles of Pydantic and the database layer in preventing SQL injection (SQLi). Pydantic validates the structure and format of incoming data, but it does not prevent SQLi. The prevention of SQLi occurs at the moment a query is constructed and executed against the database.
The only reliable way to prevent SQL injection is to never use string formatting or concatenation to insert user-controlled data into SQL queries. Instead, one must use parameterized queries (also known as prepared statements or bind variables), which are supported by all modern database drivers and Object-Relational Mappers (ORMs).
The following example contrasts the insecure and secure ways to interact with a database in a FastAPI application, using SQLAlchemy as the ORM.
The Wrong Way (Vulnerable to SQL Injection):
# DO NOT DO THIS. THIS IS VULNERABLE.
@app.post("/tools/get_user_insecure")
async def get_user_insecure(username: str, db: Session = Depends(get_db)):
# Insecurely constructing a query using an f-string
query = f"SELECT * FROM users WHERE username = '{username}'"
# If username is "' OR 1=1; --", the query becomes malicious.
result = db.execute(text(query)).first()
return result
The Right Way (Secure via Parameterized Queries):
This example uses SQLModel, which combines SQLAlchemy and Pydantic for a seamless experience with FastAPI.
# file: main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlmodel import Session, SQLModel, create_engine, select, Field
from typing import Optional
# --- Database and Model Setup ---
DATABASE_URL = "sqlite:///database.db"
engine = create_engine(DATABASE_URL)
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True, unique=True)
full_name: str
hashed_password: str # In a real app, store hashed passwords
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
def get_db():
with Session(engine) as session:
yield session
# --- Secure Endpoint ---
class UserPublic(SQLModel):
id: int
username: str
full_name: str
@app.post("/tools/get_user_secure", response_model=UserPublic)
async def get_user_secure(username: str, db: Session = Depends(get_db)):
"""
Securely fetches a user by using a parameterized query via the ORM.
"""
# The 'select' statement creates a query object.
# The 'where' clause adds a filter.
# SQLAlchemy will automatically parameterize the 'username' variable,
# preventing SQL injection.
statement = select(User).where(User.username == username)
# The query is executed safely by the database driver.
user = db.exec(statement).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserPublic.from_orm(user)
In the secure example, SQLAlchemy
takes the username
variable and passes it to the database driver as a separate parameter, not as part of the SQL string itself. The driver then safely handles quoting and escaping, neutralizing any potential SQLi payload.
This section provides a complete, annotated implementation for securing a FastAPI WebSocket endpoint using the “Token in Query Parameter” method with JWTs. This pattern is chosen for its relative simplicity in a tutorial context, but the security trade-offs mentioned in Section 3.2 must be considered for production systems.
The example uses the fastapi-jwt-auth
library for handling JWT creation and validation.
# file: main_ws.py
import uvicorn
from fastapi import FastAPI, WebSocket, Depends, Query, HTTPException, Request, status
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
from pydantic import BaseModel
# --- Application and JWT Setup ---
app = FastAPI()
class Settings(BaseModel):
# In production, load this from environment variables or a secrets manager
authjwt_secret_key: str = "a_super_secret_key_that_is_long_and_random"
@AuthJWT.load_config
def get_config():
return Settings()
# Exception handler for JWT errors
@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.message}
)
# --- User Models and Login Endpoint ---
class User(BaseModel):
username: str
password: str
# In a real app, this would be a database lookup
fake_users_db = {"testuser": {"password": "testpassword"}}
@app.post('/login')
def login(user: User, Authorize: AuthJWT = Depends()):
"""
Standard HTTP endpoint to authenticate a user and issue a JWT.
"""
if user.username not in fake_users_db or user.password!= fake_users_db[user.username]["password"]:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Bad username or password")
# The 'subject' of the token is the user's identifier
access_token = Authorize.create_access_token(subject=user.username)
return {"access_token": access_token}
# --- WebSocket Implementation ---
# Simple HTML client for demonstration purposes
html_client = """
<!DOCTYPE html>
<html>
<head><title>Secure WebSocket Client</title></head>
<body>
<h1>MCP WebSocket Demo</h1>
<h2>1. Login (use testuser/testpassword)</h2>
<form onsubmit="login(event)">
<input type="text" id="username" placeholder="Username" value="testuser">
<input type="password" id="password" placeholder="Password" value="testpassword">
<button type="submit">Login</button>
</form>
<p>Access Token: <pre id="token_display"></pre></p>
<h2>2. Connect to WebSocket</h2>
<button onclick="connectWs()">Connect</button>
<ul id="messages"></ul>
<script>
let ws;
async function login(event) {
event.preventDefault();
const username = document.getElementById('username').value;
const password = document.getElementById('password').value;
const response = await fetch('/login', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({username, password})
});
const data = await response.json();
if (data.access_token) {
document.getElementById('token_display').innerText = data.access_token;
} else {
document.getElementById('token_display').innerText = JSON.stringify(data);
}
}
function connectWs() {
const token = document.getElementById('token_display').innerText;
if (!token) { alert("Please login first to get a token."); return; }
ws = new WebSocket(`ws://${window.location.host}/ws?token=${token}`);
ws.onmessage = function(event) {
const messages = document.getElementById('messages');
const message = document.createElement('li');
const content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
ws.onopen = () => ws.send("Hello from client!");
ws.onclose = (event) => console.log("WebSocket closed", event);
ws.onerror = (error) => console.log("WebSocket error", error);
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html_client)
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
# Extract the token from the query parameter using Depends and Query
token: str = Query(...),
Authorize: AuthJWT = Depends()
):
"""
This is the protected WebSocket endpoint.
Authentication is performed before the connection is fully accepted.
"""
await websocket.accept()
try:
# This function will raise an AuthJWTException if the token is invalid
# or expired, which is caught by the exception handler below.
Authorize.jwt_required("websocket", token=token)
# If we reach here, the user is authenticated.
# We can get the user's identity from the token.
current_user = Authorize.get_jwt_subject()
await websocket.send_text(f"Welcome, {current_user}! Connection secure.")
await websocket.send_text("You are authorized to communicate.")
# Main communication loop
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message from {current_user}: {data}")
except AuthJWTException as err:
# Send an error message to the client before closing
await websocket.send_text(f"Authentication Error: {err.message}")
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
except Exception as e:
# Handle other potential exceptions, e.g., client disconnect
print(f"An error occurred: {e}")
# Ensure connection is closed
if websocket.client_state!= 3: # WebSocketState.DISCONNECTED
await websocket.close(code=status.WS_1011_INTERNAL_ERROR)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
This self-contained example demonstrates the full, secure lifecycle: a client authenticates via a standard RESTful endpoint to receive a JWT, and then uses that JWT to establish a secure, authenticated real-time communication channel over WebSockets. The server validates the token before permitting any further interaction, effectively securing the endpoint.
A secure MCP server requires a secure client. This section details the best practices and provides practical code examples for building a client application using Next.js and TypeScript, ensuring that security is maintained end-to-end.
Modern frontend frameworks like Next.js blur the line between client and server, making it critical to architect applications in a way that prevents sensitive logic and credentials from ever reaching the browser. The Data Access Layer (DAL) is a crucial pattern for achieving this separation.
A DAL is an internal library within the Next.js application that centralizes all data fetching logic. It acts as the single, authoritative source for interacting with databases, external APIs, and environment variables containing secrets.
'server-only'
package at the top of DAL files. This package will cause the Next.js build process to fail if any client-side component attempts to import the module, providing a compile-time guarantee against leakage.
The following snippet illustrates a secure DAL function for fetching user data:
// file: lib/data-access.ts
import 'server-only'; // Enforces server-only execution
import { db } from './database'; // Assume a configured database client
import { getCurrentUser } from './auth'; // Assume a function to get session user
// Define the safe DTO that will be returned to the client
export interface PublicUserProfile {
userId: string;
username: string;
avatarUrl: string | null;
}
export async function getPublicProfile(userId: string): Promise<PublicUserProfile | null> {
// Authorization check: Is the current user allowed to see this profile?
// (In this case, we assume profiles are public, but more complex logic could exist here)
const viewer = await getCurrentUser();
if (!viewer) {
throw new Error("Not authorized");
}
// Fetch the full user object from the database
const user = await db.user.findUnique({ where: { id: userId } });
if (!user) {
return null;
}
// Return ONLY the safe, public fields as a DTO
// This prevents fields like 'email' or 'hashedPassword' from ever leaving the server.
return {
userId: user.id,
username: user.username,
avatarUrl: user.avatarUrl,
};
}
This section provides the client-side implementation corresponding to the secure FastAPI WebSocket server from Section 4.4. It demonstrates how to manage the authentication flow and establish a secure connection using TypeScript in a React/Next.js component. While many tutorials use libraries like Socket.IO, this example uses the native browser WebSocket
API for clarity and to avoid unnecessary abstractions.
wss://
Connection in TypeScriptThe connection logic is best encapsulated within a React component or a custom hook. The useEffect
hook is ideal for managing the WebSocket lifecycle—connecting when the component mounts and disconnecting when it unmounts. The connection URL must use the wss://
protocol to ensure TLS encryption.
The following code provides a complete, annotated React component in TypeScript that handles the user authentication flow and subsequent WebSocket connection.
// file: components/SecureChatClient.tsx
'use client'; // This component runs on the client
import React, { useState, useEffect, useRef } from 'react';
const SecureChatClient: React.FC = () => {
const [username, setUsername] = useState<string>('testuser');
const [password, setPassword] = useState<string>('testpassword');
const = useState<string | null>(null);
const [messages, setMessages] = useState<string>();
const [isConnected, setIsConnected] = useState<boolean>(false);
const webSocketRef = useRef<WebSocket | null>(null);
// Function to handle login via a Next.js API route
const handleLogin = async (e: React.FormEvent) => {
e.preventDefault();
try {
// NOTE: In a real Next.js app, this would call an API route: /api/login
// For this demo, we assume the FastAPI server is running on localhost:8000
const response = await fetch('http://localhost:8000/login', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ username, password }),
});
if (!response.ok) {
const errorData = await response.json();
throw new Error(errorData.detail |
| 'Login failed');
}
const data = await response.json();
setAccessToken(data.access_token);
setMessages(prev =>);
} catch (error: any) {
setMessages(prev => [...prev, `Login Error: ${error.message}`]);
}
};
// Function to establish the WebSocket connection
const connectWebSocket = () => {
if (!accessToken) {
alert('Please log in first to get an access token.');
return;
}
if (webSocketRef.current && webSocketRef.current.readyState === WebSocket.OPEN) {
setMessages(prev => [...prev, 'Already connected.']);
return;
}
// Construct the secure WebSocket URL with the token as a query parameter
// Use wss:// for production deployments
const wsProtocol = window.location.protocol === 'https:'? 'wss://' : 'ws://';
// Assume FastAPI server is on the same host for simplicity, or use a specific domain
const wsUrl = `${wsProtocol}localhost:8000/ws?token=${accessToken}`;
const ws = new WebSocket(wsUrl);
webSocketRef.current = ws;
ws.onopen = () => {
setMessages(prev =>);
setIsConnected(true);
ws.send('Hello from Next.js client!');
};
ws.onmessage = (event) => {
setMessages(prev =>);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
setMessages(prev =>);
};
ws.onclose = (event) => {
setMessages(prev =>);
setIsConnected(false);
webSocketRef.current = null;
};
};
// Clean up the WebSocket connection when the component unmounts
useEffect(() => {
return () => {
webSocketRef.current?.close();
};
},);
return (
<div>
{/* Login Form */}
<form onSubmit={handleLogin}>
<input type="text" value={username} onChange={(e) => setUsername(e.target.value)} />
<input type="password" value={password} onChange={(e) => setPassword(e.target.value)} />
<button type="submit" disabled={!!accessToken}>Login</button>
</form>
{accessToken && <p>Token acquired!</p>}
{/* Connection Button */}
<button onClick={connectWebSocket} disabled={!accessToken |
| isConnected}>
{isConnected? 'Connected' : 'Connect to WebSocket'}
</button>
{/* Message Display */}
<ul>
{messages.map((msg, index) => (
<li key={index}>{msg}</li>
))}
</ul>
</div>
);
};
export default SecureChatClient;
Just as the server must not trust client input, the client must not blindly trust data received from the server, especially if that data could originate from other users or external sources via the MCP server.
When the onmessage
event handler receives data, it must be treated as untrusted before being rendered to the DOM. If the data is meant to be displayed as plain text, it should be inserted using properties like textContent
, which automatically escapes any HTML characters. Avoid using dangerouslySetInnerHTML
in React unless the content has been rigorously sanitized with a library like DOMPurify
. This practice is a critical defense against XSS attacks, where a malicious payload returned by the server could execute in the context of the user’s browser.
Deploying a secure MCP server is not the end of the security journey. Ongoing vigilance, operational readiness, and continuous validation are required to maintain a strong security posture in the face of evolving threats.
MCP servers, particularly those exposed to the public internet, are prime targets for Denial of Service (DoS) and resource exhaustion attacks (LLM04). Attackers may attempt to overwhelm the server by making excessive requests or triggering resource-intensive operations. A robust defense requires implementing rate limiting and resource management controls.
Effective security relies on visibility. Without comprehensive logging and monitoring, an organization may be blind to ongoing attacks or unable to perform forensic analysis after an incident.
Tools
. These logs are invaluable for detecting suspicious patterns, such as brute-force login attempts or attempts to enumerate object IDs.
Security is a moving target. New vulnerabilities are discovered in software libraries, and attackers constantly devise new techniques. Therefore, security validation must be a continuous, iterative process.
The Model Context Protocol represents a significant leap forward in the capabilities of agentic AI, providing a standardized framework for models to interact with the world. However, this power and connectivity come with inherent security responsibilities. Securing an MCP AI server is a complex, multi-domain challenge that requires a defense-in-depth strategy, integrating principles from AI security, API security, and real-time communication protocols.
The core tenets of a secure MCP server implementation can be summarized as follows: adopt a secure-by-design philosophy, treating security as an architectural prerequisite, not a feature. Implement a zero-trust model for all inputs and outputs, validating and sanitizing every piece of data that crosses a trust boundary. Enforce robust, state-aware authentication and authorization on all endpoints, particularly for persistent connections like WebSockets. Finally, embrace continuous validation through ongoing monitoring, auditing, and proactive red teaming to stay ahead of the evolving threat landscape.
By adhering to these principles, developers and architects can build MCP servers that are not only powerful and functional but also resilient, trustworthy, and secure, paving the way for the safe and responsible deployment of the next generation of AI applications.
The following checklist condenses the key security controls discussed in this report into an actionable tool for development and audit teams.