stack-analysis
CHAPTER 117 / 120
읽기 약 2분
FUNCTION
데이터 파이프라인: ETL + 스트리밍
핵심 개념
Airflow·dbt·Kafka·BigQuery·실전 흐름 — 분석·ML 데이터 흐름.
본문
데이터 파이프라인 종류
[Batch ETL]
- 매일/시간 단위 처리
- 정확한 결과
- 도구: Airflow, dbt, Spark
[Stream Processing]
- 실시간 처리
- 1초~분 단위 응답
- 도구: Kafka Streams, Flink
[CDC (Change Data Capture)]
- DB 변경 → 다른 시스템 동기화
- Debezium, AWS DMSBatch ETL 흐름
[Source]
- PostgreSQL (운영 DB)
- API (외부 데이터)
- Files (S3)
↓
[Extract]
- Airflow operator로 추출
- 시간/일 단위 스케줄
↓
[Transform]
- dbt 또는 Python으로 변환
- 정규화·집계
↓
[Load]
- BigQuery / Snowflake
- 분석용 DW
↓
[BI]
- Looker / Metabase / Tableaudbt 예시
-- models/staging/orders.sql
WITH source AS (
SELECT * FROM {{ source('app_db', 'orders') }}
)
SELECT
id as order_id,
user_id,
total,
status,
created_at,
paid_at,
EXTRACT(EPOCH FROM (paid_at - created_at)) as time_to_pay_seconds
FROM source
WHERE deleted_at IS NULL
-- models/marts/daily_revenue.sql
{{ config(materialized='incremental', unique_key='date') }}
SELECT
DATE(created_at) as date,
COUNT(DISTINCT user_id) as unique_customers,
COUNT(*) as order_count,
SUM(total) as total_revenue,
AVG(total) as avg_order_value
FROM {{ ref('orders') }}
WHERE status = 'completed'
{% if is_incremental() %}
AND created_at > (SELECT MAX(date) FROM {{ this }})
{% endif %}
GROUP BY 1Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG(
'daily_revenue',
schedule='0 1 * * *', # 매일 새벽 1시
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_from_postgres,
)
transform = PythonOperator(
task_id='transform',
python_callable=run_dbt_models,
)
load = PythonOperator(
task_id='load_bigquery',
python_callable=load_to_bq,
)
notify = PythonOperator(
task_id='notify_slack',
python_callable=send_slack_summary,
)
extract >> transform >> load >> notifyCDC (Change Data Capture)
// PostgreSQL → Kafka via Debezium
// 실시간 동기화
// 사용 사례
// - PostgreSQL → Elasticsearch (검색 인덱스)
// - PostgreSQL → BigQuery (분석)
// - PostgreSQL → Redis (캐시 갱신)
// PostgreSQL Logical Replication
ALTER TABLE products REPLICA IDENTITY FULL;
SELECT pg_create_logical_replication_slot('cdc_slot', 'pgoutput');
// Debezium connector
{
"name": "products-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.dbname": "myapp",
"schema.include.list": "public",
"table.include.list": "public.products",
"topic.prefix": "myapp"
}
}
// → products 변경 시 Kafka 토픽 myapp.public.products로 자동 발행
// → 다른 시스템이 구독해서 동기화Stream Processing
// Kafka Streams (간단한 예)
import { Kafka } from 'kafkajs';
const kafka = new Kafka({ brokers: ['kafka:9092'] });
const consumer = kafka.consumer({ groupId: 'analytics' });
const producer = kafka.producer();
// 실시간 매출 집계 (1분 단위)
const windowAggregates = new Map<string, { count: number; total: number }>();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ message }) => {
const order = JSON.parse(message.value!.toString());
const minute = Math.floor(Date.now() / 60000);
const key = `${minute}`;
const agg = windowAggregates.get(key) ?? { count: 0, total: 0 };
agg.count++;
agg.total += order.total;
windowAggregates.set(key, agg);
},
});
// 매분 집계 결과 발행
setInterval(async () => {
const previousMinute = Math.floor(Date.now() / 60000) - 1;
const agg = windowAggregates.get(`${previousMinute}`);
if (agg) {
await producer.send({
topic: 'metrics.revenue.minutely',
messages: [{ value: JSON.stringify({ minute: previousMinute, ...agg }) }],
});
windowAggregates.delete(`${previousMinute}`);
}
}, 60000);데이터 품질
-- dbt tests
version: 2
models:
- name: orders
columns:
- name: id
tests: [unique, not_null]
- name: user_id
tests:
- not_null
- relationships:
to: ref('users')
field: id
- name: total
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
-- Great Expectations (Python)
import great_expectations as gx
context = gx.get_context()
suite = context.get_expectation_suite('orders_suite')
batch.expect_column_values_to_be_unique('id')
batch.expect_column_values_to_be_between('total', min_value=0)
batch.expect_column_values_to_match_regex('email', r'^[\w.-]+@')실전 스택 (한국)
[소규모]
- PostgreSQL → Metabase (직접 쿼리)
- 충분 (< 100K 이벤트/일)
[중규모]
- PostgreSQL + Read Replica
- pg_cron으로 집계 테이블
- Metabase / Superset
[대규모]
- PostgreSQL → Airbyte → BigQuery
- dbt로 변환
- Looker / Tableau
- Kafka로 실시간
[엔터프라이즈]
- Snowflake / Databricks
- Fivetran / Stitch
- dbt Cloud
- Hex / Mode (BI)다음 챕터
CH.118 "멀티 리전 배포와 재해 복구".
AI 프롬프트
🤖 AI에게 잘 물어보는 법 — 모델·전략별 프롬프트
Claude
무료: Sonnet 4.6 / Pro $20/mo: Opus 4.6
내 코드의 데이터 파이프라인 부분을 분석해서 실전 분석 + 개선 우선순위를 알려줘.
ChatGPT
무료: GPT-5.5 / Plus $20/mo: GPT-5.5 Pro
데이터 파이프라인 관련 베스트 프랙티스 5가지를 비교 분석해서 패턴 추출를 알려줘.
Gemini
무료: 2.5 Flash / Pro $19.99/mo: 3.1 Pro
내 프로젝트 전체에서 데이터 파이프라인 최적화 가능 위치를 보고해줘.
Grok
무료: Grok 4.1 / SuperGrok $30/mo
2026년 한국 시장의 데이터 파이프라인 트렌드를 솔직히 알려줘.
⭐ 이것만 기억하세요
데이터 파이프라인: ETL + 스트리밍은 이 3가지만 확실히 잡으세요
1.Batch ETL = Airflow + dbt + BigQuery 표준
2.CDC (Debezium)로 운영 DB → 분석 DB 실시간 동기화
3.dbt tests + Great Expectations로 데이터 품질 보장
공유하기
진행도 117 / 120