OPEN HYPER STEP
← 목록으로 (화이트햇 보안)
SECURITY · 78 / 84
security
CHAPTER 78 / 84
읽기 약 2
FUNCTION

종합: 로그 분석 자동화 도구


핵심 개념

모듈 4 통합 프로젝트 — 로그 파일 → 파싱 → 이상 탐지 → 리포트의 자동화 CLI.

본문

통합 도구: logsentry

BASH📋 코드 (14줄)
$ python logsentry.py --log access.log --output reports/

🔎 분석 중: access.log
✅ 파싱 완료: 50,000 줄
🚨 이상 탐지:
   - 브루트포스: 3 IP (가장 많음 198.51.100.99 — 250회)
   - SQL Injection: 12 시도 (IP: 203.0.113.5)
   - XSS: 5 시도
   - 경로 탐색: 8 시도
📊 통계:
   - 총 요청: 50,000
   - 4xx: 1,234 (2.5%)
   - 5xx: 45 (0.1%)
📁 저장: reports/dashboard_2026-04-27.html (대시보드 + 상세 결과)

전체 구현

PYTHON📋 코드 (202줄)
# logsentry.py — 통합 로그 분석 도구
# ⚠️ 이 코드는 허가된 환경에서만 사용하세요.

import argparse
import json
import logging
import os
import re
from collections import Counter, defaultdict, deque
from datetime import datetime, timedelta
from pathlib import Path

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
log = logging.getLogger('logsentry')


# ── 정규식 ──────────────────────────────────────────
APACHE_COMBINED = re.compile(
    r'^(?P<ip>\S+)\s+\S+\s+\S+\s+'
    r'\[(?P<time>[^\]]+)\]\s+'
    r'"(?P<method>\S+)\s+(?P<path>\S+)\s+\S+"\s+'
    r'(?P<status>\d{3})\s+(?P<bytes>\S+)\s+'
    r'"(?P<referer>[^"]*)"\s+"(?P<user_agent>[^"]*)"'
)

ATTACK_PATTERNS = {
    'sqli':            re.compile(r'(?i)(\bUNION\s+SELECT|\bOR\s+1=1|--\s*$|sleep\s*\()'),
    'xss':             re.compile(r'(?i)(<script|javascript:|on(?:click|load|error)=|alert\s*\()'),
    'path_traversal':  re.compile(r'(?i)(\.\./|/etc/passwd|%2e%2e)'),
    'command_inj':     re.compile(r'(?i)(;\s*(?:cat|ls|whoami)|\$\(|`[^`]+`)'),
    'malicious_ua':    re.compile(r'(?i)(sqlmap|nikto|nmap|masscan|gobuster)'),
}


# ── 분석 함수 ───────────────────────────────────────
def analyze(log_path: str, bf_threshold: int = 10, bf_window: int = 5) -> dict:
    """파싱 + 통계 + 공격 탐지 + 브루트포스 1회 순회."""
    stats = {
        'total': 0, 'parse_failed': 0,
        'ips': Counter(), 'paths': Counter(),
        'statuses': Counter(), 'methods': Counter(),
        'hourly': Counter(), 'bytes_total': 0,
    }
    attacks = []
    bf_state: dict[str, deque] = defaultdict(deque)
    bf_detected = {}
    window = timedelta(minutes=bf_window)

    with open(log_path, encoding='utf-8', errors='replace') as f:
        for lineno, line in enumerate(f, 1):
            stats['total'] += 1
            m = APACHE_COMBINED.match(line)
            if not m:
                stats['parse_failed'] += 1
                continue

            ip = m['ip']
            path = m['path']
            status = int(m['status'])
            ua = m['user_agent']

            stats['ips'][ip] += 1
            stats['paths'][path] += 1
            stats['statuses'][status] += 1
            stats['methods'][m['method']] += 1
            try:
                stats['bytes_total'] += int(m['bytes'])
            except ValueError:
                pass

            # 시간대
            try:
                t = datetime.strptime(m['time'].split()[0], '%d/%b/%Y:%H:%M:%S')
                stats['hourly'][t.hour] += 1
            except ValueError:
                t = None

            # 공격 탐지
            full_request = f'{m["method"]} {path} {ua}'.lower()
            detected = []
            for atk, pattern in ATTACK_PATTERNS.items():
                if pattern.search(full_request):
                    detected.append(atk)
            if detected:
                attacks.append({
                    'lineno': lineno,
                    'ip': ip,
                    'time': m['time'],
                    'request': f'{m["method"]} {path}',
                    'attacks': detected,
                    'status': status,
                })

            # 브루트포스 — 401/403 반복
            if status in (401, 403) and t:
                q = bf_state[ip]
                while q and q[0] < t - window:
                    q.popleft()
                q.append(t)
                if len(q) >= bf_threshold:
                    bf_detected.setdefault(ip, {
                        'count': len(q),
                        'first': q[0].isoformat(),
                    })
                    bf_detected[ip]['count'] = len(q)
                    bf_detected[ip]['last'] = t.isoformat()

    return {
        'stats': stats,
        'attacks': attacks,
        'bruteforce': bf_detected,
    }


# ── 리포트 ──────────────────────────────────────────
def write_summary(result: dict, output_dir: str) -> str:
    """텍스트 요약 + JSON 상세."""
    today = datetime.now().strftime('%Y-%m-%d')
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    summary_path = f'{output_dir}/summary_{today}.txt'
    with open(summary_path, 'w', encoding='utf-8') as f:
        s = result['stats']
        f.write(f'📊 로그 분석 요약 — {today}\n\n')
        f.write(f'총 요청: {s["total"]:,}\n')
        f.write(f'파싱 실패: {s["parse_failed"]}\n')
        f.write(f'트래픽: {s["bytes_total"] / 1024 / 1024:.1f} MB\n\n')

        f.write('## 상태 코드\n')
        for code, cnt in sorted(s['statuses'].items()):
            pct = cnt / s['total'] * 100
            f.write(f'  {code}: {cnt:,} ({pct:.1f}%)\n')

        f.write('\n## Top 10 IP\n')
        for ip, cnt in s['ips'].most_common(10):
            f.write(f'  {ip}: {cnt:,}\n')

        f.write(f'\n## 🚨 공격 탐지: {len(result["attacks"])}건\n')
        attack_counter = Counter()
        for atk in result['attacks']:
            for a in atk['attacks']:
                attack_counter[a] += 1
        for atype, cnt in attack_counter.most_common():
            f.write(f'  {atype}: {cnt}\n')

        f.write(f'\n## 🔐 브루트포스: {len(result["bruteforce"])} IP\n')
        for ip, info in sorted(result['bruteforce'].items(), key=lambda x: -x[1]['count'])[:10]:
            f.write(f'  {ip}: {info["count"]}회 (시작 {info["first"][:19]})\n')

    # JSON 상세
    json_path = f'{output_dir}/details_{today}.json'
    serializable = {
        'generated_at': datetime.now().isoformat(),
        'stats': {
            'total': result['stats']['total'],
            'top_ips': result['stats']['ips'].most_common(20),
            'top_paths': result['stats']['paths'].most_common(20),
            'statuses': dict(result['stats']['statuses']),
            'hourly': dict(result['stats']['hourly']),
        },
        'attacks': result['attacks'][:100],
        'bruteforce': result['bruteforce'],
    }
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(serializable, f, indent=2, ensure_ascii=False)

    return summary_path


# ── CLI ─────────────────────────────────────────────
def main():
    parser = argparse.ArgumentParser(description='로그 분석 자동화 도구')
    parser.add_argument('--log', '-l', required=True, help='분석할 로그 파일 경로')
    parser.add_argument('--output', '-o', default='reports', help='출력 디렉토리')
    parser.add_argument('--bf-threshold', type=int, default=10, help='브루트포스 임계값')
    parser.add_argument('--bf-window', type=int, default=5, help='윈도우 (분)')
    parser.add_argument('--verbose', '-v', action='store_true')
    args = parser.parse_args()

    if args.verbose:
        log.setLevel(logging.DEBUG)

    if not os.path.exists(args.log):
        log.error(f'파일 없음: {args.log}')
        return 2

    log.info(f'분석 시작: {args.log}')
    result = analyze(args.log, args.bf_threshold, args.bf_window)

    summary_path = write_summary(result, args.output)
    log.info(f'요약 저장: {summary_path}')

    # exit code: 공격 발견 → 1, 브루트포스 발견 → 2
    if result['bruteforce']:
        return 2
    if result['attacks']:
        return 1
    return 0


if __name__ == '__main__':
    raise SystemExit(main())

사용 예시

BASH📋 코드 (11줄)
# 기본 실행
python logsentry.py --log /var/log/nginx/access.log --output reports/

# 브루트포스 임계값 조정 (강력 차단)
python logsentry.py --log auth.log --bf-threshold 5 --bf-window 10

# CI/CD 통합 (exit code로 분기)
python logsentry.py --log /tmp/today.log || echo "보안 이벤트 발견!"

# crontab — 매시간 자동 실행
0 * * * * cd /opt/logsentry && python logsentry.py --log /var/log/nginx/access.log --output /var/log/sentry/

확장 아이디어

  1. 실시간 모드tail -f 같은 스트리밍 분석
  2. Slack 알림 — 공격 발견 시 webhook 전송
  3. 자동 차단 — fail2ban 연동
  4. HTML 리포트 — 모듈 4 CH.5의 plotly 대시보드 통합
  5. 다중 로그 형식 — Apache/Nginx/JSON/syslog 자동 감지

🎯 다음 모듈 (5~7) 미리보기

이 트랙(모듈 1~4)으로 보안 자동화 도구의 80%는 만들 수 있습니다. 후속 모듈:

  • 모듈 5: 암호학 실습 (cryptography, JWT, OAuth)
  • 모듈 6: 시스템 보안 (paramiko로 원격 점검, OS hardening)
  • 모듈 7: CTF 실전 + AI 기반 보안 자동화

AI 프롬프트
🤖 AI에게 잘 물어보는 법 — 모델·전략별 프롬프트
Claude

무료: Sonnet 4.6 / Pro $20/mo: Opus 4.6

내 logsentry 도구의
메모리·동시성·확장성을 코드 리뷰하고
10GB+ 로그에서도 안정적으로 동작하도록 개선해줘.
ChatGPT

무료: GPT-5.5 / Plus $20/mo: GPT-5.5 Pro

logsentry 패턴(argparse + 단일 순회 + 통계+탐지+리포트)을
다른 보안 도구(SSL 점검/방화벽 룰 분석/감사 리포트)
동일 구조로 재사용하는 템플릿을 보여줘.
Gemini

무료: 2.5 Flash / Pro $19.99/mo: 3.1 Pro

내 한 달치 nginx access.log + auth.log + syslog를
logsentry로 분석한 결과를 종합해서
경영진 보고용 보안 사고 리포트를 만들어줘.
Grok

무료: Grok 4.1 / SuperGrok $30/mo

2026년 로그 분석 자동화 트렌드 —
자체 Python 도구 vs Falco/Wazuh vs Splunk SOAR
1인 운영자에게 가성비 좋은 선택을 솔직히 알려줘.

⭐ 이것만 기억하세요
종합: 로그 분석 자동화 도구 이 3가지만 확실히 잡으세요
1.모듈 4의 모든 기법(파싱/공격 탐지/브루트포스/리포트)을 단일 CLI로 통합해 실용 가치 있는 도구를 완성했다
2.단일 순회로 통계+공격+브루트포스를 동시 분석하면 1GB 로그도 1분 안에 처리할 수 있다
3.다음 모듈 5~7에서 암호학·시스템 보안·CTF로 보안 도구 제작의 깊이를 더한다


공유하기
진행도 78 / 84