Skip to content

Webhook Implementation

This guide presents best practices for implementing a robust and reliable webhook receiver.

The most robust approach to processing webhooks follows the "receive, queue, respond" pattern:

                                    Your Application
                                          |
        +---------------------------------+----------------------------------+
        |                                 |                                  |
        v                                 v                                  v
  +-----------+                    +------------+                    +-------------+
  | Webhook   |                    |   Queue    |                    |   Worker    |
  | Endpoint  | -----------------> |            | -----------------> | Processor   |
  +-----------+                    +------------+                    +-------------+
        |                                                                   |
        | 1. Receive                                                        |
        | 2. Validate signature                                             |
        | 3. Check idempotency                                              |
        | 4. Enqueue                                                        |
        | 5. Respond 200 OK                                                 |
        |                                                                   |
        +-------------------------------------------------------------------+
                                                                            |
                                                                   6. Process
                                                                      asynchronously

Why use a queue?

Responding quickly to the webhook (< 30s) and processing afterward ensures that retries are not triggered unnecessarily. Queues like Redis, RabbitMQ, or SQS are ideal for this.

Step-by-Step Implementation

1. Create the Endpoint

The endpoint must accept POST and process JSON:

javascript
const express = require('express');
const crypto = require('crypto');
const Redis = require('ioredis');

const app = express();
const redis = new Redis();

// Middleware to capture raw body (needed for signature verification)
app.use('/webhooks', express.raw({ type: 'application/json' }));

// Webhook endpoint
app.post('/webhooks/pixconnect', async (req, res) => {
  const startTime = Date.now();

  try {
    // 1. Extract headers
    const signature = req.headers['x-webhook-signature'];
    const timestamp = req.headers['x-webhook-timestamp'];
    const requestId = req.headers['x-request-id'];
    const payload = req.body.toString();

    // 2. Validate signature
    verifySignature(payload, signature, timestamp);

    // 3. Check idempotency
    const isProcessed = await checkIdempotency(requestId);
    if (isProcessed) {
      console.log(`Webhook ${requestId} already processed`);
      return res.status(200).json({ status: 'already_processed' });
    }

    // 4. Enqueue for processing
    const event = JSON.parse(payload);
    await enqueueEvent(requestId, event);

    // 5. Respond success
    const duration = Date.now() - startTime;
    console.log(`Webhook ${requestId} enqueued in ${duration}ms`);

    res.status(200).json({ status: 'queued', request_id: requestId });

  } catch (error) {
    console.error('Webhook error:', error.message);

    if (error.message.includes('signature') || error.message.includes('timestamp')) {
      return res.status(401).json({ error: error.message });
    }

    res.status(500).json({ error: 'Internal server error' });
  }
});

app.listen(3000, () => {
  console.log('Webhook server running on port 3000');
});
python
from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional
import redis
import json
import hmac
import hashlib
import time
import os

app = FastAPI()
redis_client = redis.Redis()

@app.post("/webhooks/pixconnect")
async def handle_webhook(
    request: Request,
    x_webhook_signature: str = Header(...),
    x_webhook_timestamp: str = Header(...),
    x_request_id: str = Header(...)
):
    start_time = time.time()

    try:
        # 1. Extract payload
        payload = await request.body()
        payload_str = payload.decode('utf-8')

        # 2. Validate signature
        verify_signature(payload_str, x_webhook_signature, x_webhook_timestamp)

        # 3. Check idempotency
        if check_idempotency(x_request_id):
            print(f"Webhook {x_request_id} already processed")
            return {"status": "already_processed"}

        # 4. Enqueue for processing
        event = json.loads(payload_str)
        enqueue_event(x_request_id, event)

        # 5. Respond success
        duration = (time.time() - start_time) * 1000
        print(f"Webhook {x_request_id} enqueued in {duration:.0f}ms")

        return {"status": "queued", "request_id": x_request_id}

    except ValueError as e:
        raise HTTPException(status_code=401, detail=str(e))
    except Exception as e:
        print(f"Webhook error: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=3000)

2. Verify Signature

Signature verification ensures the webhook actually came from FluxiQ NPC:

javascript
function verifySignature(payload, signature, timestamp) {
  const secret = process.env.WEBHOOK_SECRET;

  // Validate timestamp (5-minute window)
  const currentTime = Math.floor(Date.now() / 1000);
  const webhookTime = parseInt(timestamp, 10);

  if (Math.abs(currentTime - webhookTime) > 300) {
    throw new Error('Timestamp outside tolerance window (5 min)');
  }

  // Calculate expected signature
  const signedPayload = `${timestamp}.${payload}`;
  const expectedSignature = crypto
    .createHmac('sha256', secret)
    .update(signedPayload)
    .digest('hex');

  // Compare securely (timing-safe)
  const sigBuffer = Buffer.from(signature, 'hex');
  const expectedBuffer = Buffer.from(expectedSignature, 'hex');

  if (sigBuffer.length !== expectedBuffer.length) {
    throw new Error('Invalid signature');
  }

  if (!crypto.timingSafeEqual(sigBuffer, expectedBuffer)) {
    throw new Error('Invalid signature');
  }
}
python
def verify_signature(payload: str, signature: str, timestamp: str):
    secret = os.environ['WEBHOOK_SECRET']

    # Validate timestamp (5-minute window)
    current_time = int(time.time())
    webhook_time = int(timestamp)

    if abs(current_time - webhook_time) > 300:
        raise ValueError('Timestamp outside tolerance window (5 min)')

    # Calculate expected signature
    signed_payload = f"{timestamp}.{payload}"
    expected_signature = hmac.new(
        secret.encode('utf-8'),
        signed_payload.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    # Compare securely (timing-safe)
    if not hmac.compare_digest(signature, expected_signature):
        raise ValueError('Invalid signature')

3. Check Idempotency

Use the X-Request-Id to avoid duplicate processing:

javascript
const IDEMPOTENCY_TTL = 86400; // 24 hours

async function checkIdempotency(requestId) {
  const key = `webhook:processed:${requestId}`;
  const exists = await redis.exists(key);
  return exists === 1;
}

async function markAsProcessed(requestId) {
  const key = `webhook:processed:${requestId}`;
  await redis.setex(key, IDEMPOTENCY_TTL, '1');
}
python
IDEMPOTENCY_TTL = 86400  # 24 hours

def check_idempotency(request_id: str) -> bool:
    key = f"webhook:processed:{request_id}"
    return redis_client.exists(key) == 1

def mark_as_processed(request_id: str):
    key = f"webhook:processed:{request_id}"
    redis_client.setex(key, IDEMPOTENCY_TTL, '1')

4. Enqueue for Processing

Add the event to the queue for asynchronous processing:

javascript
async function enqueueEvent(requestId, event) {
  const job = {
    request_id: requestId,
    event_type: event.event,
    payload: event,
    received_at: new Date().toISOString(),
    attempts: 0
  };

  // Using Redis List as a simple queue
  await redis.lpush('webhook:queue', JSON.stringify(job));

  // Or using Bull Queue (recommended for production)
  // await webhookQueue.add(event.event, job, {
  //   attempts: 3,
  //   backoff: { type: 'exponential', delay: 1000 }
  // });
}
python
def enqueue_event(request_id: str, event: dict):
    job = {
        "request_id": request_id,
        "event_type": event["event"],
        "payload": event,
        "received_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        "attempts": 0
    }

    # Using Redis List as a simple queue
    redis_client.lpush('webhook:queue', json.dumps(job))

    # Or using Celery (recommended for production)
    # process_webhook.delay(job)

5. Respond Immediately

Respond 200 OK as soon as the event is enqueued. Do not wait for complete processing:

javascript
// GOOD: Respond immediately after enqueueing
res.status(200).json({ status: 'queued' });

// BAD: Process then respond (may cause timeout)
await processEvent(event);  // Can take time!
res.status(200).json({ status: 'processed' });

30-second Timeout

If you don't respond within 30 seconds, we consider the delivery as failed and a retry will be scheduled. Processing synchronously increases the risk of timeouts.

Processing Worker

The worker consumes events from the queue and executes business logic:

javascript
const eventHandlers = {
  'boleto_created': handleBoletoCreated,
  'boleto_registered': handleBoletoRegistered,
  'boleto_paid': handleBoletoPaid,
  'boleto_cancelled': handleBoletoCancelled,
  'settlement_completed': handleSettlementCompleted,
  'payment_received': handlePaymentReceived,
};

async function processWebhookQueue() {
  while (true) {
    try {
      // Fetch next job from queue (blocking)
      const result = await redis.brpop('webhook:queue', 0);
      const job = JSON.parse(result[1]);

      console.log(`Processing ${job.event_type} (${job.request_id})`);

      // Get appropriate handler
      const handler = eventHandlers[job.event_type];
      if (!handler) {
        console.warn(`Handler not found for ${job.event_type}`);
        continue;
      }

      // Process event
      await handler(job.payload.data);

      // Mark as processed
      await markAsProcessed(job.request_id);

      console.log(`Event ${job.request_id} processed successfully`);

    } catch (error) {
      console.error('Error processing webhook:', error);
      // Implement retry logic or dead letter queue
    }
  }
}

// Event handlers
async function handleBoletoPaid(data) {
  const { nosso_numero, valor_pago, data_pagamento } = data;

  // Update database
  await db.query(
    'UPDATE orders SET status = $1, paid_at = $2 WHERE boleto_ref = $3',
    ['paid', data_pagamento, nosso_numero]
  );

  // Notify customer
  await notificationService.send({
    type: 'payment_confirmed',
    customer_id: data.pagador.documento,
    amount: valor_pago / 100
  });

  // Trigger fulfillment
  await fulfillmentService.process(nosso_numero);
}

// Start worker
processWebhookQueue();
python
import asyncio
from typing import Dict, Callable

event_handlers: Dict[str, Callable] = {
    'boleto_created': handle_boleto_created,
    'boleto_registered': handle_boleto_registered,
    'boleto_paid': handle_boleto_paid,
    'boleto_cancelled': handle_boleto_cancelled,
    'settlement_completed': handle_settlement_completed,
    'payment_received': handle_payment_received,
}

async def process_webhook_queue():
    while True:
        try:
            # Fetch next job from queue (blocking)
            result = redis_client.brpop('webhook:queue', 0)
            job = json.loads(result[1])

            print(f"Processing {job['event_type']} ({job['request_id']})")

            # Get appropriate handler
            handler = event_handlers.get(job['event_type'])
            if not handler:
                print(f"Handler not found for {job['event_type']}")
                continue

            # Process event
            await handler(job['payload']['data'])

            # Mark as processed
            mark_as_processed(job['request_id'])

            print(f"Event {job['request_id']} processed successfully")

        except Exception as e:
            print(f"Error processing webhook: {e}")
            # Implement retry logic or dead letter queue

# Event handlers
async def handle_boleto_paid(data: dict):
    nosso_numero = data['nosso_numero']
    valor_pago = data['valor_pago']
    data_pagamento = data['data_pagamento']

    # Update database
    await db.execute(
        "UPDATE orders SET status = $1, paid_at = $2 WHERE boleto_ref = $3",
        'paid', data_pagamento, nosso_numero
    )

    # Notify customer
    await notification_service.send(
        type='payment_confirmed',
        customer_id=data['pagador']['documento'],
        amount=valor_pago / 100
    )

    # Trigger fulfillment
    await fulfillment_service.process(nosso_numero)

# Start worker
asyncio.run(process_webhook_queue())

Error Handling

Temporary vs Permanent Errors

Differentiate between recoverable errors and permanent errors:

javascript
async function processWithRetry(job, maxAttempts = 3) {
  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      await processEvent(job);
      return; // Success!

    } catch (error) {
      // Permanent errors: don't retry
      if (isPermanentError(error)) {
        console.error(`Permanent error: ${error.message}`);
        await moveToDeadLetter(job, error);
        return;
      }

      // Temporary errors: retry with backoff
      if (attempt < maxAttempts) {
        const delay = Math.pow(2, attempt) * 1000; // 2s, 4s, 8s
        console.log(`Attempt ${attempt} failed, retry in ${delay}ms`);
        await sleep(delay);
      } else {
        console.error(`All ${maxAttempts} attempts failed`);
        await moveToDeadLetter(job, error);
      }
    }
  }
}

function isPermanentError(error) {
  // Errors that won't be resolved with retry
  const permanentErrors = [
    'INVALID_PAYLOAD',
    'UNKNOWN_EVENT_TYPE',
    'MISSING_REQUIRED_FIELD'
  ];
  return permanentErrors.includes(error.code);
}

async function moveToDeadLetter(job, error) {
  const deadLetterJob = {
    ...job,
    error: error.message,
    failed_at: new Date().toISOString()
  };
  await redis.lpush('webhook:dead_letter', JSON.stringify(deadLetterJob));
}

Error Categories

TypeExamplesAction
TemporaryDatabase timeout, service unavailableRetry with backoff
PermanentInvalid payload, unknown eventDead letter queue
RecoverableRate limit, connection refusedImmediate or short retry

Monitoring

Essential Metrics

Monitor these metrics to ensure system health:

MetricDescriptionAlert
webhook_received_totalTotal webhooks received-
webhook_processing_time_msProcessing time> 1000ms
webhook_queue_sizeQueue size> 1000
webhook_errors_totalTotal errorsRate > 5%
webhook_dead_letter_sizeDead letter items> 0

Prometheus Example

javascript
const promClient = require('prom-client');

// Metrics
const webhookCounter = new promClient.Counter({
  name: 'webhook_received_total',
  help: 'Total webhooks received',
  labelNames: ['event_type', 'status']
});

const webhookDuration = new promClient.Histogram({
  name: 'webhook_processing_duration_ms',
  help: 'Webhook processing duration',
  labelNames: ['event_type'],
  buckets: [10, 50, 100, 500, 1000, 5000]
});

const queueSize = new promClient.Gauge({
  name: 'webhook_queue_size',
  help: 'Current webhook queue size'
});

// Usage
app.post('/webhooks/pixconnect', async (req, res) => {
  const timer = webhookDuration.startTimer({ event_type: event.event });

  try {
    // ... processing ...
    webhookCounter.inc({ event_type: event.event, status: 'success' });
  } catch (error) {
    webhookCounter.inc({ event_type: event.event, status: 'error' });
  } finally {
    timer();
  }
});

// Update queue size periodically
setInterval(async () => {
  const size = await redis.llen('webhook:queue');
  queueSize.set(size);
}, 10000);

Configure alerts for:

  1. Growing queue - If queue exceeds 1000 items
  2. High error rate - If more than 5% of webhooks fail
  3. Non-empty dead letter - Any item in dead letter
  4. High latency - Processing > 1 second

Implementation Checklist

Use this checklist to validate your implementation:

Endpoint

  • [ ] Accepts POST method
  • [ ] Captures raw body before parsing JSON
  • [ ] Extracts all necessary headers (signature, timestamp, request_id)

Security

  • [ ] Validates HMAC-SHA256 signature
  • [ ] Verifies timestamp (5-minute window)
  • [ ] Uses timing-safe comparison
  • [ ] Stores secret securely (env var)

Idempotency

  • [ ] Checks request_id before processing
  • [ ] Stores processed IDs (Redis/DB)
  • [ ] Returns 200 for duplicates

Processing

  • [ ] Enqueues events (Redis/RabbitMQ/SQS)
  • [ ] Responds in < 30 seconds
  • [ ] Separate worker for processing
  • [ ] Retry for temporary errors
  • [ ] Dead letter queue for permanent errors

Monitoring

  • [ ] Structured logs
  • [ ] Volume and latency metrics
  • [ ] Alerts configured
  • [ ] Monitoring dashboard

Testing

  • [ ] Tests with webhook.site
  • [ ] Tests with sandbox events
  • [ ] Tests error scenarios
  • [ ] Tests retry/idempotency

Next Steps

Documentação da API FluxiQ NPC