OPEN HYPER STEP
← 목록으로 (stack-analysis)
STACK-ANALYSIS · 114 / 120
stack-analysis
CHAPTER 114 / 120
읽기 약 2
FUNCTION

이벤트 기반 아키텍처: 메시지 큐


핵심 개념

Kafka·RabbitMQ·SQS·발행/구독·idempotency — 비동기 통신.

본문

메시지 큐 비교

📋 코드 (7줄)
| 도구 | 처리량 | 보존 | 적합 |
|---|---|---|---|
| Kafka | 매우 높음 | 길게 (TB) | 이벤트 소싱·로그·스트리밍 |
| RabbitMQ | 높음 | 짧게 | 작업 큐·RPC |
| AWS SQS | 높음 | 14일 | 단순 큐·서버리스 |
| Redis Streams | 중간 | 짧게 | 가벼운 이벤트 |
| BullMQ | 중간 | 임의 | Node.js 작업 큐 |

Kafka 기본

TYPESCRIPT📋 코드 (44줄)
// Producer
import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka1:9092', 'kafka2:9092'],
});

const producer = kafka.producer();
await producer.connect();

await producer.send({
  topic: 'orders',
  messages: [{
    key: order.id,
    value: JSON.stringify({
      type: 'order.placed',
      orderId: order.id,
      userId: order.userId,
      total: order.total,
      timestamp: Date.now(),
    }),
    headers: {
      'correlation-id': req.correlationId,
      'event-version': '1',
    },
  }],
});


// Consumer
const consumer = kafka.consumer({ groupId: 'inventory-service' });
await consumer.connect();
await consumer.subscribe({ topic: 'orders' });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value!.toString());

    if (event.type === 'order.placed') {
      await reserveInventory(event.orderId, event.items);
    }
  },
});

Idempotency (중복 처리 방지)

TYPESCRIPT📋 코드 (57줄)
// 같은 메시지가 두 번 전달될 수 있음
// → 멱등 처리 필수


async function handleOrderPlaced(event: OrderPlacedEvent) {
  // 1. 이미 처리됐는가?
  const existing = await db.processedEvent.findUnique({
    where: { eventId: event.id },
  });
  if (existing) return;  // 스킵

  // 2. 트랜잭션으로 처리 + 기록
  await db.$transaction(async (tx) => {
    await tx.processedEvent.create({
      data: { eventId: event.id, type: event.type, processedAt: new Date() },
    });

    // 비즈니스 로직
    await reserveInventory(event.orderId, event.items, tx);
  });
}


// 또는 outbox 패턴 (DB와 메시지 큐 동기화)
async function placeOrder(input: any) {
  await db.$transaction(async (tx) => {
    const order = await tx.order.create({ data: input });

    // 같은 트랜잭션에 이벤트 저장 (outbox)
    await tx.outbox.create({
      data: {
        type: 'order.placed',
        payload: { orderId: order.id, ...input },
      },
    });
  });
}


// 별도 worker가 outbox → Kafka 전송
const outboxWorker = setInterval(async () => {
  const pending = await db.outbox.findMany({
    where: { sentAt: null },
    take: 100,
  });

  for (const msg of pending) {
    await producer.send({
      topic: msg.type.split('.')[0] + 's',
      messages: [{ value: JSON.stringify(msg.payload) }],
    });
    await db.outbox.update({
      where: { id: msg.id },
      data: { sentAt: new Date() },
    });
  }
}, 1000);

발행-구독 vs 큐

📋 코드 (15줄)
[Pub-Sub] (Kafka·Redis Pub/Sub)
- 1 메시지 → N 구독자
- 모든 구독자 받음
- 이벤트 통지에 적합


[Queue] (RabbitMQ·SQS·BullMQ)
- 1 메시지 → 1 worker
- 작업 분산
- 동시 처리 X


Kafka는 둘 다 — Consumer Group:
- 같은 group 내 worker는 분산 (큐)
- 다른 group은 모두 받음 (pub-sub)

Dead Letter Queue (실패 처리)

TYPESCRIPT📋 코드 (29줄)
// 처리 실패 시 DLQ로 격리
async function processMessage(message: any) {
  try {
    await handleMessage(message);
  } catch (err) {
    const retries = message.headers['retry-count'] || 0;

    if (retries < 3) {
      // 재시도 (지연)
      await producer.send({
        topic: 'orders',
        messages: [{
          ...message,
          headers: { ...message.headers, 'retry-count': retries + 1 },
        }],
      });
    } else {
      // DLQ
      await producer.send({
        topic: 'orders.dlq',
        messages: [{ ...message, headers: { error: err.message } }],
      });
      await alertOps('Message moved to DLQ', { message, err });
    }
  }
}


// DLQ 모니터링 — 사람이 분석·재처리

Event Schema Registry

📋 코드 (19줄)
[문제] 이벤트 스키마 변경 시 모든 구독자 깨짐


[해결]
1. Schema Registry (Confluent)
2. JSON Schema 또는 Avro/Protobuf
3. 버전 관리 (v1, v2)
4. Backward/Forward compatibility


// 좋은 이벤트 설계
{
  "id": "evt_abc123",
  "type": "order.placed",
  "version": "1.2",
  "timestamp": "2026-04-29T10:00:00Z",
  "source": "order-service",
  "data": { ... }
}

흐름 추적 (Correlation ID)

TYPESCRIPT📋 코드 (19줄)
// 요청 ID를 모든 이벤트에 propagate
app.use((req, res, next) => {
  req.correlationId = req.headers['x-correlation-id'] || crypto.randomUUID();
  res.setHeader('x-correlation-id', req.correlationId);
  next();
});


await producer.send({
  topic: 'orders',
  messages: [{
    headers: { 'correlation-id': req.correlationId },
    value: JSON.stringify(event),
  }],
});


// 모든 서비스에 같은 ID로 추적
// → 분산 추적 (Jaeger·Zipkin) 가능

다음 챕터

CH.115 "CQRS와 이벤트 소싱 기초".


AI 프롬프트
🤖 AI에게 잘 물어보는 법 — 모델·전략별 프롬프트
Claude

무료: Sonnet 4.6 / Pro $20/mo: Opus 4.6

내 코드의 이벤트 기반 아키텍처 부분을 분석해서
실전 분석 + 개선 우선순위를 알려줘.
ChatGPT

무료: GPT-5.5 / Plus $20/mo: GPT-5.5 Pro

이벤트 기반 아키텍처 관련 베스트 프랙티스 5가지를
비교 분석해서 패턴 추출를 알려줘.
Gemini

무료: 2.5 Flash / Pro $19.99/mo: 3.1 Pro

내 프로젝트 전체에서 이벤트 기반 아키텍처
최적화 가능 위치를 보고해줘.
Grok

무료: Grok 4.1 / SuperGrok $30/mo

2026년 한국 시장의 이벤트 기반 아키텍처
트렌드를 솔직히 알려줘.

⭐ 이것만 기억하세요
이벤트 기반 아키텍처: 메시지 큐 이 3가지만 확실히 잡으세요
1.Kafka = 이벤트 스트리밍, RabbitMQ/SQS = 작업 큐
2.멱등성 + outbox = 메시지 중복·누락 방지
3.DLQ + 재시도 + 알림 = 실패 처리


공유하기
진행도 114 / 120