security
CHAPTER 78 / 84
읽기 약 2분
FUNCTION
종합: 로그 분석 자동화 도구
핵심 개념
모듈 4 통합 프로젝트 — 로그 파일 → 파싱 → 이상 탐지 → 리포트의 자동화 CLI.
본문
통합 도구: logsentry
$ python logsentry.py --log access.log --output reports/
🔎 분석 중: access.log
✅ 파싱 완료: 50,000 줄
🚨 이상 탐지:
- 브루트포스: 3 IP (가장 많음 198.51.100.99 — 250회)
- SQL Injection: 12 시도 (IP: 203.0.113.5)
- XSS: 5 시도
- 경로 탐색: 8 시도
📊 통계:
- 총 요청: 50,000
- 4xx: 1,234 (2.5%)
- 5xx: 45 (0.1%)
📁 저장: reports/dashboard_2026-04-27.html (대시보드 + 상세 결과)전체 구현
# logsentry.py — 통합 로그 분석 도구
# ⚠️ 이 코드는 허가된 환경에서만 사용하세요.
import argparse
import json
import logging
import os
import re
from collections import Counter, defaultdict, deque
from datetime import datetime, timedelta
from pathlib import Path
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
log = logging.getLogger('logsentry')
# ── 정규식 ──────────────────────────────────────────
APACHE_COMBINED = re.compile(
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+'
r'\[(?P<time>[^\]]+)\]\s+'
r'"(?P<method>\S+)\s+(?P<path>\S+)\s+\S+"\s+'
r'(?P<status>\d{3})\s+(?P<bytes>\S+)\s+'
r'"(?P<referer>[^"]*)"\s+"(?P<user_agent>[^"]*)"'
)
ATTACK_PATTERNS = {
'sqli': re.compile(r'(?i)(\bUNION\s+SELECT|\bOR\s+1=1|--\s*$|sleep\s*\()'),
'xss': re.compile(r'(?i)(<script|javascript:|on(?:click|load|error)=|alert\s*\()'),
'path_traversal': re.compile(r'(?i)(\.\./|/etc/passwd|%2e%2e)'),
'command_inj': re.compile(r'(?i)(;\s*(?:cat|ls|whoami)|\$\(|`[^`]+`)'),
'malicious_ua': re.compile(r'(?i)(sqlmap|nikto|nmap|masscan|gobuster)'),
}
# ── 분석 함수 ───────────────────────────────────────
def analyze(log_path: str, bf_threshold: int = 10, bf_window: int = 5) -> dict:
"""파싱 + 통계 + 공격 탐지 + 브루트포스 1회 순회."""
stats = {
'total': 0, 'parse_failed': 0,
'ips': Counter(), 'paths': Counter(),
'statuses': Counter(), 'methods': Counter(),
'hourly': Counter(), 'bytes_total': 0,
}
attacks = []
bf_state: dict[str, deque] = defaultdict(deque)
bf_detected = {}
window = timedelta(minutes=bf_window)
with open(log_path, encoding='utf-8', errors='replace') as f:
for lineno, line in enumerate(f, 1):
stats['total'] += 1
m = APACHE_COMBINED.match(line)
if not m:
stats['parse_failed'] += 1
continue
ip = m['ip']
path = m['path']
status = int(m['status'])
ua = m['user_agent']
stats['ips'][ip] += 1
stats['paths'][path] += 1
stats['statuses'][status] += 1
stats['methods'][m['method']] += 1
try:
stats['bytes_total'] += int(m['bytes'])
except ValueError:
pass
# 시간대
try:
t = datetime.strptime(m['time'].split()[0], '%d/%b/%Y:%H:%M:%S')
stats['hourly'][t.hour] += 1
except ValueError:
t = None
# 공격 탐지
full_request = f'{m["method"]} {path} {ua}'.lower()
detected = []
for atk, pattern in ATTACK_PATTERNS.items():
if pattern.search(full_request):
detected.append(atk)
if detected:
attacks.append({
'lineno': lineno,
'ip': ip,
'time': m['time'],
'request': f'{m["method"]} {path}',
'attacks': detected,
'status': status,
})
# 브루트포스 — 401/403 반복
if status in (401, 403) and t:
q = bf_state[ip]
while q and q[0] < t - window:
q.popleft()
q.append(t)
if len(q) >= bf_threshold:
bf_detected.setdefault(ip, {
'count': len(q),
'first': q[0].isoformat(),
})
bf_detected[ip]['count'] = len(q)
bf_detected[ip]['last'] = t.isoformat()
return {
'stats': stats,
'attacks': attacks,
'bruteforce': bf_detected,
}
# ── 리포트 ──────────────────────────────────────────
def write_summary(result: dict, output_dir: str) -> str:
"""텍스트 요약 + JSON 상세."""
today = datetime.now().strftime('%Y-%m-%d')
Path(output_dir).mkdir(parents=True, exist_ok=True)
summary_path = f'{output_dir}/summary_{today}.txt'
with open(summary_path, 'w', encoding='utf-8') as f:
s = result['stats']
f.write(f'📊 로그 분석 요약 — {today}\n\n')
f.write(f'총 요청: {s["total"]:,}\n')
f.write(f'파싱 실패: {s["parse_failed"]}\n')
f.write(f'트래픽: {s["bytes_total"] / 1024 / 1024:.1f} MB\n\n')
f.write('## 상태 코드\n')
for code, cnt in sorted(s['statuses'].items()):
pct = cnt / s['total'] * 100
f.write(f' {code}: {cnt:,} ({pct:.1f}%)\n')
f.write('\n## Top 10 IP\n')
for ip, cnt in s['ips'].most_common(10):
f.write(f' {ip}: {cnt:,}\n')
f.write(f'\n## 🚨 공격 탐지: {len(result["attacks"])}건\n')
attack_counter = Counter()
for atk in result['attacks']:
for a in atk['attacks']:
attack_counter[a] += 1
for atype, cnt in attack_counter.most_common():
f.write(f' {atype}: {cnt}\n')
f.write(f'\n## 🔐 브루트포스: {len(result["bruteforce"])} IP\n')
for ip, info in sorted(result['bruteforce'].items(), key=lambda x: -x[1]['count'])[:10]:
f.write(f' {ip}: {info["count"]}회 (시작 {info["first"][:19]})\n')
# JSON 상세
json_path = f'{output_dir}/details_{today}.json'
serializable = {
'generated_at': datetime.now().isoformat(),
'stats': {
'total': result['stats']['total'],
'top_ips': result['stats']['ips'].most_common(20),
'top_paths': result['stats']['paths'].most_common(20),
'statuses': dict(result['stats']['statuses']),
'hourly': dict(result['stats']['hourly']),
},
'attacks': result['attacks'][:100],
'bruteforce': result['bruteforce'],
}
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(serializable, f, indent=2, ensure_ascii=False)
return summary_path
# ── CLI ─────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description='로그 분석 자동화 도구')
parser.add_argument('--log', '-l', required=True, help='분석할 로그 파일 경로')
parser.add_argument('--output', '-o', default='reports', help='출력 디렉토리')
parser.add_argument('--bf-threshold', type=int, default=10, help='브루트포스 임계값')
parser.add_argument('--bf-window', type=int, default=5, help='윈도우 (분)')
parser.add_argument('--verbose', '-v', action='store_true')
args = parser.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
if not os.path.exists(args.log):
log.error(f'파일 없음: {args.log}')
return 2
log.info(f'분석 시작: {args.log}')
result = analyze(args.log, args.bf_threshold, args.bf_window)
summary_path = write_summary(result, args.output)
log.info(f'요약 저장: {summary_path}')
# exit code: 공격 발견 → 1, 브루트포스 발견 → 2
if result['bruteforce']:
return 2
if result['attacks']:
return 1
return 0
if __name__ == '__main__':
raise SystemExit(main())사용 예시
# 기본 실행
python logsentry.py --log /var/log/nginx/access.log --output reports/
# 브루트포스 임계값 조정 (강력 차단)
python logsentry.py --log auth.log --bf-threshold 5 --bf-window 10
# CI/CD 통합 (exit code로 분기)
python logsentry.py --log /tmp/today.log || echo "보안 이벤트 발견!"
# crontab — 매시간 자동 실행
0 * * * * cd /opt/logsentry && python logsentry.py --log /var/log/nginx/access.log --output /var/log/sentry/확장 아이디어
- 실시간 모드 —
tail -f같은 스트리밍 분석 - Slack 알림 — 공격 발견 시 webhook 전송
- 자동 차단 — fail2ban 연동
- HTML 리포트 — 모듈 4 CH.5의 plotly 대시보드 통합
- 다중 로그 형식 — Apache/Nginx/JSON/syslog 자동 감지
🎯 다음 모듈 (5~7) 미리보기
이 트랙(모듈 1~4)으로 보안 자동화 도구의 80%는 만들 수 있습니다. 후속 모듈:
- 모듈 5: 암호학 실습 (cryptography, JWT, OAuth)
- 모듈 6: 시스템 보안 (paramiko로 원격 점검, OS hardening)
- 모듈 7: CTF 실전 + AI 기반 보안 자동화
AI 프롬프트
🤖 AI에게 잘 물어보는 법 — 모델·전략별 프롬프트
Claude
무료: Sonnet 4.6 / Pro $20/mo: Opus 4.6
내 logsentry 도구의 메모리·동시성·확장성을 코드 리뷰하고 10GB+ 로그에서도 안정적으로 동작하도록 개선해줘.
ChatGPT
무료: GPT-5.5 / Plus $20/mo: GPT-5.5 Pro
logsentry 패턴(argparse + 단일 순회 + 통계+탐지+리포트)을 다른 보안 도구(SSL 점검/방화벽 룰 분석/감사 리포트) 동일 구조로 재사용하는 템플릿을 보여줘.
Gemini
무료: 2.5 Flash / Pro $19.99/mo: 3.1 Pro
내 한 달치 nginx access.log + auth.log + syslog를 logsentry로 분석한 결과를 종합해서 경영진 보고용 보안 사고 리포트를 만들어줘.
Grok
무료: Grok 4.1 / SuperGrok $30/mo
2026년 로그 분석 자동화 트렌드 — 자체 Python 도구 vs Falco/Wazuh vs Splunk SOAR 1인 운영자에게 가성비 좋은 선택을 솔직히 알려줘.
⭐ 이것만 기억하세요
종합: 로그 분석 자동화 도구는 이 3가지만 확실히 잡으세요
1.모듈 4의 모든 기법(파싱/공격 탐지/브루트포스/리포트)을 단일 CLI로 통합해 실용 가치 있는 도구를 완성했다
2.단일 순회로 통계+공격+브루트포스를 동시 분석하면 1GB 로그도 1분 안에 처리할 수 있다
3.다음 모듈 5~7에서 암호학·시스템 보안·CTF로 보안 도구 제작의 깊이를 더한다
공유하기
진행도 78 / 84