Webhook Implementation
This guide presents best practices for implementing a robust and reliable webhook receiver.
Recommended Architecture
The most robust approach to processing webhooks follows the "receive, queue, respond" pattern:
Your Application
|
+---------------------------------+----------------------------------+
| | |
v v v
+-----------+ +------------+ +-------------+
| Webhook | | Queue | | Worker |
| Endpoint | -----------------> | | -----------------> | Processor |
+-----------+ +------------+ +-------------+
| |
| 1. Receive |
| 2. Validate signature |
| 3. Check idempotency |
| 4. Enqueue |
| 5. Respond 200 OK |
| |
+-------------------------------------------------------------------+
|
6. Process
asynchronouslyWhy use a queue?
Responding quickly to the webhook (< 30s) and processing afterward ensures that retries are not triggered unnecessarily. Queues like Redis, RabbitMQ, or SQS are ideal for this.
Step-by-Step Implementation
1. Create the Endpoint
The endpoint must accept POST and process JSON:
const express = require('express');
const crypto = require('crypto');
const Redis = require('ioredis');
const app = express();
const redis = new Redis();
// Middleware to capture raw body (needed for signature verification)
app.use('/webhooks', express.raw({ type: 'application/json' }));
// Webhook endpoint
app.post('/webhooks/pixconnect', async (req, res) => {
const startTime = Date.now();
try {
// 1. Extract headers
const signature = req.headers['x-webhook-signature'];
const timestamp = req.headers['x-webhook-timestamp'];
const requestId = req.headers['x-request-id'];
const payload = req.body.toString();
// 2. Validate signature
verifySignature(payload, signature, timestamp);
// 3. Check idempotency
const isProcessed = await checkIdempotency(requestId);
if (isProcessed) {
console.log(`Webhook ${requestId} already processed`);
return res.status(200).json({ status: 'already_processed' });
}
// 4. Enqueue for processing
const event = JSON.parse(payload);
await enqueueEvent(requestId, event);
// 5. Respond success
const duration = Date.now() - startTime;
console.log(`Webhook ${requestId} enqueued in ${duration}ms`);
res.status(200).json({ status: 'queued', request_id: requestId });
} catch (error) {
console.error('Webhook error:', error.message);
if (error.message.includes('signature') || error.message.includes('timestamp')) {
return res.status(401).json({ error: error.message });
}
res.status(500).json({ error: 'Internal server error' });
}
});
app.listen(3000, () => {
console.log('Webhook server running on port 3000');
});from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional
import redis
import json
import hmac
import hashlib
import time
import os
app = FastAPI()
redis_client = redis.Redis()
@app.post("/webhooks/pixconnect")
async def handle_webhook(
request: Request,
x_webhook_signature: str = Header(...),
x_webhook_timestamp: str = Header(...),
x_request_id: str = Header(...)
):
start_time = time.time()
try:
# 1. Extract payload
payload = await request.body()
payload_str = payload.decode('utf-8')
# 2. Validate signature
verify_signature(payload_str, x_webhook_signature, x_webhook_timestamp)
# 3. Check idempotency
if check_idempotency(x_request_id):
print(f"Webhook {x_request_id} already processed")
return {"status": "already_processed"}
# 4. Enqueue for processing
event = json.loads(payload_str)
enqueue_event(x_request_id, event)
# 5. Respond success
duration = (time.time() - start_time) * 1000
print(f"Webhook {x_request_id} enqueued in {duration:.0f}ms")
return {"status": "queued", "request_id": x_request_id}
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
except Exception as e:
print(f"Webhook error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=3000)2. Verify Signature
Signature verification ensures the webhook actually came from FluxiQ NPC:
function verifySignature(payload, signature, timestamp) {
const secret = process.env.WEBHOOK_SECRET;
// Validate timestamp (5-minute window)
const currentTime = Math.floor(Date.now() / 1000);
const webhookTime = parseInt(timestamp, 10);
if (Math.abs(currentTime - webhookTime) > 300) {
throw new Error('Timestamp outside tolerance window (5 min)');
}
// Calculate expected signature
const signedPayload = `${timestamp}.${payload}`;
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(signedPayload)
.digest('hex');
// Compare securely (timing-safe)
const sigBuffer = Buffer.from(signature, 'hex');
const expectedBuffer = Buffer.from(expectedSignature, 'hex');
if (sigBuffer.length !== expectedBuffer.length) {
throw new Error('Invalid signature');
}
if (!crypto.timingSafeEqual(sigBuffer, expectedBuffer)) {
throw new Error('Invalid signature');
}
}def verify_signature(payload: str, signature: str, timestamp: str):
secret = os.environ['WEBHOOK_SECRET']
# Validate timestamp (5-minute window)
current_time = int(time.time())
webhook_time = int(timestamp)
if abs(current_time - webhook_time) > 300:
raise ValueError('Timestamp outside tolerance window (5 min)')
# Calculate expected signature
signed_payload = f"{timestamp}.{payload}"
expected_signature = hmac.new(
secret.encode('utf-8'),
signed_payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Compare securely (timing-safe)
if not hmac.compare_digest(signature, expected_signature):
raise ValueError('Invalid signature')3. Check Idempotency
Use the X-Request-Id to avoid duplicate processing:
const IDEMPOTENCY_TTL = 86400; // 24 hours
async function checkIdempotency(requestId) {
const key = `webhook:processed:${requestId}`;
const exists = await redis.exists(key);
return exists === 1;
}
async function markAsProcessed(requestId) {
const key = `webhook:processed:${requestId}`;
await redis.setex(key, IDEMPOTENCY_TTL, '1');
}IDEMPOTENCY_TTL = 86400 # 24 hours
def check_idempotency(request_id: str) -> bool:
key = f"webhook:processed:{request_id}"
return redis_client.exists(key) == 1
def mark_as_processed(request_id: str):
key = f"webhook:processed:{request_id}"
redis_client.setex(key, IDEMPOTENCY_TTL, '1')4. Enqueue for Processing
Add the event to the queue for asynchronous processing:
async function enqueueEvent(requestId, event) {
const job = {
request_id: requestId,
event_type: event.event,
payload: event,
received_at: new Date().toISOString(),
attempts: 0
};
// Using Redis List as a simple queue
await redis.lpush('webhook:queue', JSON.stringify(job));
// Or using Bull Queue (recommended for production)
// await webhookQueue.add(event.event, job, {
// attempts: 3,
// backoff: { type: 'exponential', delay: 1000 }
// });
}def enqueue_event(request_id: str, event: dict):
job = {
"request_id": request_id,
"event_type": event["event"],
"payload": event,
"received_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"attempts": 0
}
# Using Redis List as a simple queue
redis_client.lpush('webhook:queue', json.dumps(job))
# Or using Celery (recommended for production)
# process_webhook.delay(job)5. Respond Immediately
Respond 200 OK as soon as the event is enqueued. Do not wait for complete processing:
// GOOD: Respond immediately after enqueueing
res.status(200).json({ status: 'queued' });
// BAD: Process then respond (may cause timeout)
await processEvent(event); // Can take time!
res.status(200).json({ status: 'processed' });30-second Timeout
If you don't respond within 30 seconds, we consider the delivery as failed and a retry will be scheduled. Processing synchronously increases the risk of timeouts.
Processing Worker
The worker consumes events from the queue and executes business logic:
const eventHandlers = {
'boleto_created': handleBoletoCreated,
'boleto_registered': handleBoletoRegistered,
'boleto_paid': handleBoletoPaid,
'boleto_cancelled': handleBoletoCancelled,
'settlement_completed': handleSettlementCompleted,
'payment_received': handlePaymentReceived,
};
async function processWebhookQueue() {
while (true) {
try {
// Fetch next job from queue (blocking)
const result = await redis.brpop('webhook:queue', 0);
const job = JSON.parse(result[1]);
console.log(`Processing ${job.event_type} (${job.request_id})`);
// Get appropriate handler
const handler = eventHandlers[job.event_type];
if (!handler) {
console.warn(`Handler not found for ${job.event_type}`);
continue;
}
// Process event
await handler(job.payload.data);
// Mark as processed
await markAsProcessed(job.request_id);
console.log(`Event ${job.request_id} processed successfully`);
} catch (error) {
console.error('Error processing webhook:', error);
// Implement retry logic or dead letter queue
}
}
}
// Event handlers
async function handleBoletoPaid(data) {
const { nosso_numero, valor_pago, data_pagamento } = data;
// Update database
await db.query(
'UPDATE orders SET status = $1, paid_at = $2 WHERE boleto_ref = $3',
['paid', data_pagamento, nosso_numero]
);
// Notify customer
await notificationService.send({
type: 'payment_confirmed',
customer_id: data.pagador.documento,
amount: valor_pago / 100
});
// Trigger fulfillment
await fulfillmentService.process(nosso_numero);
}
// Start worker
processWebhookQueue();import asyncio
from typing import Dict, Callable
event_handlers: Dict[str, Callable] = {
'boleto_created': handle_boleto_created,
'boleto_registered': handle_boleto_registered,
'boleto_paid': handle_boleto_paid,
'boleto_cancelled': handle_boleto_cancelled,
'settlement_completed': handle_settlement_completed,
'payment_received': handle_payment_received,
}
async def process_webhook_queue():
while True:
try:
# Fetch next job from queue (blocking)
result = redis_client.brpop('webhook:queue', 0)
job = json.loads(result[1])
print(f"Processing {job['event_type']} ({job['request_id']})")
# Get appropriate handler
handler = event_handlers.get(job['event_type'])
if not handler:
print(f"Handler not found for {job['event_type']}")
continue
# Process event
await handler(job['payload']['data'])
# Mark as processed
mark_as_processed(job['request_id'])
print(f"Event {job['request_id']} processed successfully")
except Exception as e:
print(f"Error processing webhook: {e}")
# Implement retry logic or dead letter queue
# Event handlers
async def handle_boleto_paid(data: dict):
nosso_numero = data['nosso_numero']
valor_pago = data['valor_pago']
data_pagamento = data['data_pagamento']
# Update database
await db.execute(
"UPDATE orders SET status = $1, paid_at = $2 WHERE boleto_ref = $3",
'paid', data_pagamento, nosso_numero
)
# Notify customer
await notification_service.send(
type='payment_confirmed',
customer_id=data['pagador']['documento'],
amount=valor_pago / 100
)
# Trigger fulfillment
await fulfillment_service.process(nosso_numero)
# Start worker
asyncio.run(process_webhook_queue())Error Handling
Temporary vs Permanent Errors
Differentiate between recoverable errors and permanent errors:
async function processWithRetry(job, maxAttempts = 3) {
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
await processEvent(job);
return; // Success!
} catch (error) {
// Permanent errors: don't retry
if (isPermanentError(error)) {
console.error(`Permanent error: ${error.message}`);
await moveToDeadLetter(job, error);
return;
}
// Temporary errors: retry with backoff
if (attempt < maxAttempts) {
const delay = Math.pow(2, attempt) * 1000; // 2s, 4s, 8s
console.log(`Attempt ${attempt} failed, retry in ${delay}ms`);
await sleep(delay);
} else {
console.error(`All ${maxAttempts} attempts failed`);
await moveToDeadLetter(job, error);
}
}
}
}
function isPermanentError(error) {
// Errors that won't be resolved with retry
const permanentErrors = [
'INVALID_PAYLOAD',
'UNKNOWN_EVENT_TYPE',
'MISSING_REQUIRED_FIELD'
];
return permanentErrors.includes(error.code);
}
async function moveToDeadLetter(job, error) {
const deadLetterJob = {
...job,
error: error.message,
failed_at: new Date().toISOString()
};
await redis.lpush('webhook:dead_letter', JSON.stringify(deadLetterJob));
}Error Categories
| Type | Examples | Action |
|---|---|---|
| Temporary | Database timeout, service unavailable | Retry with backoff |
| Permanent | Invalid payload, unknown event | Dead letter queue |
| Recoverable | Rate limit, connection refused | Immediate or short retry |
Monitoring
Essential Metrics
Monitor these metrics to ensure system health:
| Metric | Description | Alert |
|---|---|---|
webhook_received_total | Total webhooks received | - |
webhook_processing_time_ms | Processing time | > 1000ms |
webhook_queue_size | Queue size | > 1000 |
webhook_errors_total | Total errors | Rate > 5% |
webhook_dead_letter_size | Dead letter items | > 0 |
Prometheus Example
const promClient = require('prom-client');
// Metrics
const webhookCounter = new promClient.Counter({
name: 'webhook_received_total',
help: 'Total webhooks received',
labelNames: ['event_type', 'status']
});
const webhookDuration = new promClient.Histogram({
name: 'webhook_processing_duration_ms',
help: 'Webhook processing duration',
labelNames: ['event_type'],
buckets: [10, 50, 100, 500, 1000, 5000]
});
const queueSize = new promClient.Gauge({
name: 'webhook_queue_size',
help: 'Current webhook queue size'
});
// Usage
app.post('/webhooks/pixconnect', async (req, res) => {
const timer = webhookDuration.startTimer({ event_type: event.event });
try {
// ... processing ...
webhookCounter.inc({ event_type: event.event, status: 'success' });
} catch (error) {
webhookCounter.inc({ event_type: event.event, status: 'error' });
} finally {
timer();
}
});
// Update queue size periodically
setInterval(async () => {
const size = await redis.llen('webhook:queue');
queueSize.set(size);
}, 10000);Recommended Alerts
Configure alerts for:
- Growing queue - If queue exceeds 1000 items
- High error rate - If more than 5% of webhooks fail
- Non-empty dead letter - Any item in dead letter
- High latency - Processing > 1 second
Implementation Checklist
Use this checklist to validate your implementation:
Endpoint
- [ ] Accepts POST method
- [ ] Captures raw body before parsing JSON
- [ ] Extracts all necessary headers (signature, timestamp, request_id)
Security
- [ ] Validates HMAC-SHA256 signature
- [ ] Verifies timestamp (5-minute window)
- [ ] Uses timing-safe comparison
- [ ] Stores secret securely (env var)
Idempotency
- [ ] Checks request_id before processing
- [ ] Stores processed IDs (Redis/DB)
- [ ] Returns 200 for duplicates
Processing
- [ ] Enqueues events (Redis/RabbitMQ/SQS)
- [ ] Responds in < 30 seconds
- [ ] Separate worker for processing
- [ ] Retry for temporary errors
- [ ] Dead letter queue for permanent errors
Monitoring
- [ ] Structured logs
- [ ] Volume and latency metrics
- [ ] Alerts configured
- [ ] Monitoring dashboard
Testing
- [ ] Tests with webhook.site
- [ ] Tests with sandbox events
- [ ] Tests error scenarios
- [ ] Tests retry/idempotency
Next Steps
- Overview - Concepts and configuration
- Events - Event types and payloads
- Boleto Flow - Complete lifecycle