OPEN HYPER STEP
← 목록으로 (stack-analysis)
STACK-ANALYSIS · 117 / 120
stack-analysis
CHAPTER 117 / 120
읽기 약 2
FUNCTION

데이터 파이프라인: ETL + 스트리밍


핵심 개념

Airflow·dbt·Kafka·BigQuery·실전 흐름 — 분석·ML 데이터 흐름.

본문

데이터 파이프라인 종류

📋 코드 (13줄)
[Batch ETL]
- 매일/시간 단위 처리
- 정확한 결과
- 도구: Airflow, dbt, Spark

[Stream Processing]
- 실시간 처리
- 1초~분 단위 응답
- 도구: Kafka Streams, Flink

[CDC (Change Data Capture)]
- DB 변경 → 다른 시스템 동기화
- Debezium, AWS DMS

Batch ETL 흐름

📋 코드 (19줄)
[Source]
- PostgreSQL (운영 DB)
- API (외부 데이터)
- Files (S3)
        ↓
[Extract]
- Airflow operator로 추출
- 시간/일 단위 스케줄
        ↓
[Transform]
- dbt 또는 Python으로 변환
- 정규화·집계
        ↓
[Load]
- BigQuery / Snowflake
- 분석용 DW
        ↓
[BI]
- Looker / Metabase / Tableau

dbt 예시

SQL📋 코드 (31줄)
-- models/staging/orders.sql
WITH source AS (
  SELECT * FROM {{ source('app_db', 'orders') }}
)
SELECT
  id as order_id,
  user_id,
  total,
  status,
  created_at,
  paid_at,
  EXTRACT(EPOCH FROM (paid_at - created_at)) as time_to_pay_seconds
FROM source
WHERE deleted_at IS NULL


-- models/marts/daily_revenue.sql
{{ config(materialized='incremental', unique_key='date') }}

SELECT
  DATE(created_at) as date,
  COUNT(DISTINCT user_id) as unique_customers,
  COUNT(*) as order_count,
  SUM(total) as total_revenue,
  AVG(total) as avg_order_value
FROM {{ ref('orders') }}
WHERE status = 'completed'
{% if is_incremental() %}
  AND created_at > (SELECT MAX(date) FROM {{ this }})
{% endif %}
GROUP BY 1

Airflow DAG

PYTHON📋 코드 (32줄)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    'daily_revenue',
    schedule='0 1 * * *',  # 매일 새벽 1시
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_from_postgres,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=run_dbt_models,
    )

    load = PythonOperator(
        task_id='load_bigquery',
        python_callable=load_to_bq,
    )

    notify = PythonOperator(
        task_id='notify_slack',
        python_callable=send_slack_summary,
    )

    extract >> transform >> load >> notify

CDC (Change Data Capture)

TYPESCRIPT📋 코드 (31줄)
// PostgreSQL → Kafka via Debezium
// 실시간 동기화


// 사용 사례
// - PostgreSQL → Elasticsearch (검색 인덱스)
// - PostgreSQL → BigQuery (분석)
// - PostgreSQL → Redis (캐시 갱신)


// PostgreSQL Logical Replication
ALTER TABLE products REPLICA IDENTITY FULL;
SELECT pg_create_logical_replication_slot('cdc_slot', 'pgoutput');


// Debezium connector
{
  "name": "products-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.dbname": "myapp",
    "schema.include.list": "public",
    "table.include.list": "public.products",
    "topic.prefix": "myapp"
  }
}


// → products 변경 시 Kafka 토픽 myapp.public.products로 자동 발행
// → 다른 시스템이 구독해서 동기화

Stream Processing

TYPESCRIPT📋 코드 (38줄)
// Kafka Streams (간단한 예)
import { Kafka } from 'kafkajs';

const kafka = new Kafka({ brokers: ['kafka:9092'] });
const consumer = kafka.consumer({ groupId: 'analytics' });
const producer = kafka.producer();


// 실시간 매출 집계 (1분 단위)
const windowAggregates = new Map<string, { count: number; total: number }>();

await consumer.subscribe({ topic: 'orders' });
await consumer.run({
  eachMessage: async ({ message }) => {
    const order = JSON.parse(message.value!.toString());
    const minute = Math.floor(Date.now() / 60000);
    const key = `${minute}`;

    const agg = windowAggregates.get(key) ?? { count: 0, total: 0 };
    agg.count++;
    agg.total += order.total;
    windowAggregates.set(key, agg);
  },
});


// 매분 집계 결과 발행
setInterval(async () => {
  const previousMinute = Math.floor(Date.now() / 60000) - 1;
  const agg = windowAggregates.get(`${previousMinute}`);
  if (agg) {
    await producer.send({
      topic: 'metrics.revenue.minutely',
      messages: [{ value: JSON.stringify({ minute: previousMinute, ...agg }) }],
    });
    windowAggregates.delete(`${previousMinute}`);
  }
}, 60000);

데이터 품질

SQL📋 코드 (29줄)
-- dbt tests
version: 2
models:
  - name: orders
    columns:
      - name: id
        tests: [unique, not_null]
      - name: user_id
        tests:
          - not_null
          - relationships:
              to: ref('users')
              field: id
      - name: total
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0


-- Great Expectations (Python)
import great_expectations as gx

context = gx.get_context()
suite = context.get_expectation_suite('orders_suite')

batch.expect_column_values_to_be_unique('id')
batch.expect_column_values_to_be_between('total', min_value=0)
batch.expect_column_values_to_match_regex('email', r'^[\w.-]+@')

실전 스택 (한국)

📋 코드 (20줄)
[소규모]
- PostgreSQL → Metabase (직접 쿼리)
- 충분 (< 100K 이벤트/일)

[중규모]
- PostgreSQL + Read Replica
- pg_cron으로 집계 테이블
- Metabase / Superset

[대규모]
- PostgreSQL → Airbyte → BigQuery
- dbt로 변환
- Looker / Tableau
- Kafka로 실시간

[엔터프라이즈]
- Snowflake / Databricks
- Fivetran / Stitch
- dbt Cloud
- Hex / Mode (BI)

다음 챕터

CH.118 "멀티 리전 배포와 재해 복구".


AI 프롬프트
🤖 AI에게 잘 물어보는 법 — 모델·전략별 프롬프트
Claude

무료: Sonnet 4.6 / Pro $20/mo: Opus 4.6

내 코드의 데이터 파이프라인 부분을 분석해서
실전 분석 + 개선 우선순위를 알려줘.
ChatGPT

무료: GPT-5.5 / Plus $20/mo: GPT-5.5 Pro

데이터 파이프라인 관련 베스트 프랙티스 5가지를
비교 분석해서 패턴 추출를 알려줘.
Gemini

무료: 2.5 Flash / Pro $19.99/mo: 3.1 Pro

내 프로젝트 전체에서 데이터 파이프라인
최적화 가능 위치를 보고해줘.
Grok

무료: Grok 4.1 / SuperGrok $30/mo

2026년 한국 시장의 데이터 파이프라인
트렌드를 솔직히 알려줘.

⭐ 이것만 기억하세요
데이터 파이프라인: ETL + 스트리밍 이 3가지만 확실히 잡으세요
1.Batch ETL = Airflow + dbt + BigQuery 표준
2.CDC (Debezium)로 운영 DB → 분석 DB 실시간 동기화
3.dbt tests + Great Expectations로 데이터 품질 보장


공유하기
진행도 117 / 120