이전 글(https://authentic-information.tistory.com/214)에서 확인한 것처럼,
현재 수집 환경은 일반적인 syslog 수신 서버 구조가 아니라 TAP(미러링) 기반 패킷 수집 구조이다.
즉, 내 서버가 syslog를 직접 수신하는 대상이 아니라,
다른 목적지로 향하는 syslog 패킷을 복제해서 관찰하는 구조이기 때문에 일반 UDP socket 방식으로는 수집이 불가능하다.
따라서 최종 수집 구조는 아래와 같이 정리했다.
TAP 미러링 트래픽
→ raw socket(AF_PACKET)
→ IPv4/UDP/syslog 패킷 파싱
→ 동일 메시지 dedup
→ 버퍼링 후 파일 저장
→ Splunk file monitor 수집
이번 글에서는 실제 운영 가능한 형태의 Python 수집기 전체 코드와 각 구성 요소를 왜 그렇게 만들었는지 정리한다.
1. 최종 코드 전체
#!/usr/bin/env python3
import os
import socket
import struct
import datetime
import re
import traceback
import hashlib
from collections import deque
from typing import Optional
# ========================
# 설정
# ========================
BASE_DIR = "/data/syslog"
LOG_FILE = "/var/log/raw_udp_syslog_capture.log"
IFACE = "eno2"
ETH_P_ALL = 0x0003
# Dedup 설정
DEDUP_WINDOW = 50000 # 최근 메시지 5만 건 기준으로 중복 제거
recent = deque()
recent_set = set()
# 파일 write batching
FLUSH_INTERVAL = 100
# socket recv buffer
BUFFER_SIZE = 65535
# 파일 버퍼
buffers = {}
buffer_count = 0
os.makedirs(BASE_DIR, exist_ok=True)
# ========================
# 로그 기록
# ========================
def log(msg: str) -> None:
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{now}] {msg}\n")
# ========================
# syslog payload decode
# ========================
def decode_payload(payload: bytes) -> Optional[str]:
m = re.search(rb"<\d+>", payload)
if not m:
return None
msg_bytes = payload[m.start():].rstrip(b"\x00\r\n ")
for enc in ("utf-8", "cp949", "euc-kr"):
try:
return msg_bytes.decode(enc)
except UnicodeDecodeError:
continue
try:
return msg_bytes.decode("utf-8", errors="replace")
except Exception:
return None
# ========================
# dedup
# ========================
def is_duplicate(msg: str) -> bool:
global recent, recent_set
h = hashlib.md5(msg.encode("utf-8", errors="ignore")).hexdigest()
if h in recent_set:
return True
recent.append(h)
recent_set.add(h)
if len(recent) > DEDUP_WINDOW:
old = recent.popleft()
recent_set.discard(old)
return False
# ========================
# 디렉토리 보장
# ========================
def ensure_dir(path: str) -> None:
os.makedirs(path, exist_ok=True)
# ========================
# 파일 버퍼 flush
# ========================
def flush_buffers() -> None:
global buffers, buffer_count
for file_path, msgs in buffers.items():
if not msgs:
continue
with open(file_path, "a", encoding="utf-8") as f:
f.write("\n".join(msgs) + "\n")
buffers[file_path] = []
buffer_count = 0
# ========================
# 로그 파일 write (버퍼링)
# ========================
def write_log(src_ip: str, msg: str) -> None:
global buffers, buffer_count
ts_hour = datetime.datetime.now().strftime("%Y-%m-%d-%H")
dir_path = os.path.join(BASE_DIR, src_ip)
ensure_dir(dir_path)
file_path = os.path.join(dir_path, f"{ts_hour}.log")
if file_path not in buffers:
buffers[file_path] = []
buffers[file_path].append(msg)
buffer_count += 1
if buffer_count >= FLUSH_INTERVAL:
flush_buffers()
# ========================
# Ethernet + IPv4 + UDP 파싱
# ========================
def parse_ipv4_udp(packet: bytes):
# Ethernet(14) + 최소 IPv4(20) + UDP(8)
if len(packet) < 42:
return None
eth_type = struct.unpack("!H", packet[12:14])[0]
# IPv4만 처리
if eth_type != 0x0800:
return None
ip_start = 14
ver_ihl = packet[ip_start]
version = ver_ihl >> 4
ihl = (ver_ihl & 0x0F) * 4
if version != 4:
return None
if len(packet) < ip_start + ihl + 8:
return None
proto = packet[ip_start + 9]
if proto != 17: # UDP만 처리
return None
src_ip = socket.inet_ntoa(packet[ip_start + 12:ip_start + 16])
dst_ip = socket.inet_ntoa(packet[ip_start + 16:ip_start + 20])
udp_start = ip_start + ihl
src_port, dst_port, udp_len, udp_checksum = struct.unpack(
"!HHHH", packet[udp_start:udp_start + 8]
)
# syslog 포트만 처리
if dst_port != 514:
return None
payload = packet[udp_start + 8:]
return {
"src_ip": src_ip,
"dst_ip": dst_ip,
"src_port": src_port,
"dst_port": dst_port,
"payload": payload,
}
# ========================
# 메인 루프
# ========================
def main() -> None:
log("=== START raw_udp_syslog_capture ===")
log(f"bind interface={IFACE}")
try:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(ETH_P_ALL))
s.bind((IFACE, 0))
log("raw socket opened successfully")
except Exception as e:
log(f"raw socket open failed: {e}")
raise
packet_count = 0
write_count = 0
dup_count = 0
while True:
try:
packet = s.recv(BUFFER_SIZE)
packet_count += 1
parsed = parse_ipv4_udp(packet)
if not parsed:
continue
msg = decode_payload(parsed["payload"])
if not msg:
continue
if not msg.startswith("<"):
continue
if is_duplicate(msg):
dup_count += 1
continue
src_ip = parsed["src_ip"]
write_log(src_ip, msg)
write_count += 1
if write_count % 1000 == 0:
log(
f"packet_count={packet_count}, write_count={write_count}, "
f"dup_count={dup_count}, latest_src_ip={src_ip}"
)
except KeyboardInterrupt:
log("keyboard interrupt received, flushing buffers")
flush_buffers()
raise
except Exception as e:
log(f"runtime error: {e}")
log(traceback.format_exc())
# 예외가 났더라도 현재 버퍼는 살려두고 계속 동작
continue
if __name__ == "__main__":
try:
main()
finally:
try:
flush_buffers()
except Exception:
pass
2. 왜 AF_PACKET raw socket을 유지했는가
이번 구조에서 가장 중요한 점은 일반 UDP socket이 아니라 raw socket을 유지해야 한다는 것이다.
처음에는 중복을 줄이기 위해 아래처럼 UDP socket으로 바꾸는 방법도 검토했다.
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("0.0.0.0", 514))
하지만 이 방식은 현재 구조에서는 맞지 않는다.
이유는 내 서버가 syslog를 직접 받는 서버가 아니라, TAP 포트로 복제된 트래픽을 보는 감시 서버이기 때문이다.
즉, 원래 패킷의 목적지는 다른 장비이고, 내 서버는 그 패킷의 복사본만 보고 있다.
따라서 AF_INET + SOCK_DGRAM 방식처럼 “내 IP로 온 UDP 패킷”만 받는 구조로는 수집이 불가능하다.
결국 TAP 환경에서는 아래 구조가 맞다.
TAP 미러링 트래픽
→ AF_PACKET raw socket
→ 패킷 직접 파싱
3. parse_ipv4_udp()를 별도로 둔 이유
raw socket은 Ethernet 프레임부터 들어오기 때문에, 필요한 부분만 직접 잘라내야 한다.
이 함수에서는 아래 순서로 필터링한다.
- Ethernet 타입이 IPv4인지 확인
- IPv4 패킷인지 확인
- Protocol이 UDP인지 확인
- 목적지 포트가 514(syslog)인지 확인
핵심 부분은 아래와 같다.
if eth_type != 0x0800:
return None
if proto != 17:
return None
if dst_port != 514:
return None
이렇게 한 이유는 모든 패킷을 다 처리하지 않고, Splunk에 저장할 필요가 있는 syslog 트래픽만 빠르게 골라내기 위해서다.
특히 TAP 환경에서는 여러 종류의 패킷이 들어오기 때문에, 이 필터가 없으면 불필요한 CPU 사용량이 급격히 늘어난다.
4. decode_payload()를 따로 둔 이유
syslog payload는 항상 UTF-8로만 들어오는 것이 아니다.
실제 환경에서는 장비마다 인코딩이 다를 수 있고, 한글 장비는 CP949/EUC-KR 계열인 경우도 많다.
그래서 아래 순서로 디코딩을 시도하도록 구성했다.
for enc in ("utf-8", "cp949", "euc-kr"):
try:
return msg_bytes.decode(enc)
except UnicodeDecodeError:
continue
그리고 마지막에는 손실을 허용하고라도 문자열 복구를 시도한다.
return msg_bytes.decode("utf-8", errors="replace")
이렇게 한 이유는 특정 장비의 로그가 일부 깨져도 수집 자체가 중단되지 않도록 하기 위해서다.
5. dedup을 넣은 이유
이번 문제의 핵심은 raw socket이 TAP 복제 패킷을 여러 번 인식하면서, 완전히 동일한 syslog 메시지가 파일에 여러 번 저장되는 것이었다.
따라서 동일 메시지를 빠르게 걸러내기 위해 MD5 해시 기반 dedup 로직을 넣었다.
def is_duplicate(msg: str) -> bool:
h = hashlib.md5(msg.encode("utf-8", errors="ignore")).hexdigest()
if h in recent_set:
return True
이때 단순히 set만 쓰지 않고 deque를 함께 사용한 이유는, 오래된 해시를 순서대로 제거하기 위해서다.
recent.append(h)
recent_set.add(h)
if len(recent) > DEDUP_WINDOW:
old = recent.popleft()
recent_set.discard(old)
즉, 구조는 다음과 같다.
set: 빠른 중복 여부 확인deque: 오래된 항목 제거
이 방식은 메모리를 무한히 쓰지 않으면서도 최근 구간에 대한 중복 제거를 빠르게 수행할 수 있다.
6. DEDUP_WINDOW를 50000으로 둔 이유
이 값은 “최근 몇 건까지를 중복 비교 대상으로 볼 것인가”를 의미한다.
DEDUP_WINDOW = 50000
너무 작으면 동일 패킷이 조금 늦게 다시 들어왔을 때 중복 제거가 되지 않을 수 있다.
너무 크면 메모리 사용량이 불필요하게 늘어난다.
운영 환경 기준으로는 아래처럼 생각하면 된다.
- 낮은 EPS: 10000 ~ 30000
- 중간 EPS: 50000 전후
- 높은 EPS: 100000 이상도 가능하지만 메모리 확인 필요
현재처럼 같은 패킷이 짧은 구간에 여러 번 복제되는 상황에서는 50000 정도면 충분히 현실적인 값이다.
7. write_log()에서 바로 파일에 쓰지 않고 버퍼링하는 이유
로그를 한 줄 받을 때마다 매번 open() - write() - close()를 반복하면 디스크 I/O가 급격히 많아진다.
그래서 일정 개수까지 메모리에 쌓아두었다가 한 번에 기록하는 구조로 바꿨다.
if buffer_count >= FLUSH_INTERVAL:
flush_buffers()
FLUSH_INTERVAL = 100으로 둔 이유는 다음과 같다.
- 너무 작으면 I/O 이점이 줄어듦
- 너무 크면 장애 시 메모리에만 남아 있는 로그가 늘어날 수 있음
100은 운영 안정성과 성능 사이에서 무난한 절충값이다.
8. 왜 src_ip 기준으로 디렉토리를 나누는가
저장 경로는 아래 형태를 유지했다.
/data/syslog/<src_ip>/YYYY-MM-DD-HH.log
이 구조를 유지한 이유는 다음과 같다.
- 장비별로 원본 파일을 빠르게 찾기 쉬움
- 문제 발생 시 특정 송신지 기준 grep이 쉬움
- Splunk source/host 매핑 시 관리가 편함
예를 들어 아래처럼 특정 장비 로그만 바로 확인할 수 있다.
grep -F "특정문자열" /data/syslog/165.141.3.164/2026-04-15-18.log | wc -l
9. 메인 루프에서 packet_count, write_count, dup_count를 따로 둔 이유
운영 중에는 단순히 “로그가 들어오고 있다”만으로는 부족하다.
다음 세 가지를 같이 봐야 한다.
packet_count: 전체 수신 패킷 수write_count: 실제 저장한 로그 수dup_count: dedup으로 버린 로그 수
이 값을 같이 보면 다음을 판단할 수 있다.
- 패킷은 많은데 저장이 적다 → 필터가 너무 강한지 확인
- dedup이 비정상적으로 많다 → TAP 중복 정도 확인 가능
- packet 수 자체가 적다 → 네트워크/TAP 문제 가능성
그래서 1000건 단위로 주기적으로 로그를 남기도록 했다.
if write_count % 1000 == 0:
log(
f"packet_count={packet_count}, write_count={write_count}, "
f"dup_count={dup_count}, latest_src_ip={src_ip}"
)
10. 예외 처리 방식
운영 코드에서는 예외가 한 번 발생했다고 바로 프로세스가 죽는 구조를 피하는 것이 좋다.
그래서 메인 루프에서는 대부분의 예외를 잡고 로그로 남긴 뒤 계속 동작하도록 했다.
except Exception as e:
log(f"runtime error: {e}")
log(traceback.format_exc())
continue
다만 KeyboardInterrupt는 별도로 처리해서, 수동 종료 시 버퍼를 flush하도록 했다.
except KeyboardInterrupt:
log("keyboard interrupt received, flushing buffers")
flush_buffers()
raise
이렇게 하면 수동 중단 시에도 메모리에 있던 로그를 최대한 파일에 남길 수 있다.
11. 운영 시 확인해야 할 포인트
이 코드를 운영에 올린 뒤에는 아래를 같이 보는 것이 좋다.
1) 서비스 상태 확인
systemctl status raw-udp-syslog-capture.service
2) 자체 로그 확인
tail -f /var/log/raw_udp_syslog_capture.log
3) 저장 파일 확인
tail -f /data/syslog/*/*.log
4) 동일 문자열 중복 확인
grep -F "특정 전체 문자열" /data/syslog/165.141.3.164/2026-04-15-18.log | wc -l
12. Splunk에서 검증할 때 사용한 쿼리
코드 적용 후에는 Splunk에서 아래 쿼리로 다시 검증하면 된다.
중복 확인
index=kh_*
| stats count by _raw
| where count > 1
이 쿼리를 쓴 이유는, 완전히 동일한 _raw가 아직 남아 있는지 확인하기 위해서다.
수집 상태 샘플 확인
index=kh_*
| stats count by index host source sourcetype
이 쿼리를 쓴 이유는, 특정 source나 sourcetype에만 이상이 생기지 않았는지 한 번에 보기 위해서다.
특정 이벤트 중복 여부 확인
index=kh_* "142.251.153.119"
| stats count by _raw
| where count > 1
이 쿼리를 쓴 이유는, 이미 리눅스에서 확인했던 특정 문제 이벤트가 Splunk에서도 여전히 중복되는지 비교하기 위해서다.
13. 이 구조의 장점
- TAP 환경에서도 수집 가능
- 동일 패킷 중복 저장 방지 가능
- 디스크 I/O 감소
- 장비별 파일 분리 가능
- Splunk file monitor 구조 유지 가능
14. 한계와 주의사항
- dedup은 메모리를 사용하므로
DEDUP_WINDOW를 무작정 크게 잡으면 안 된다 - 버퍼링은 성능상 유리하지만 장애 직전 일부 로그가 메모리에만 남을 수 있다
- AF_PACKET 기반이라 인터페이스가 바뀌면
IFACE값도 함께 수정해야 한다 - 현재 코드는 UDP 514만 대상으로 하므로, 다른 포트나 TCP syslog가 들어오면 별도 처리 필요
15. 마무리
이번 최종 수집기 코드는 단순히 “로그를 받아 파일에 쓰는 코드”가 아니라, 현재 환경이 TAP 미러링 기반이라는 점을 전제로 설계한 것이다.
즉, 일반 syslog 서버처럼 “내가 수신자”라는 가정이 아니라, “나는 복제된 패킷을 관찰하는 분석자”라는 전제에서 접근해야 했다.
그 결과 최종 구조는 다음처럼 정리된다.
TAP 트래픽
→ raw socket(AF_PACKET)
→ IPv4/UDP/syslog 필터링
→ dedup
→ buffering
→ 파일 저장
→ Splunk 수집
같은 문제를 겪는 경우, 단순히 Splunk 파싱 설정만 볼 것이 아니라 원본 파일에 이미 중복이 있는지, 그리고 현재 서버가 실제 수신자인지 아니면 미러링 감시자인지부터 먼저 구분하는 것이 중요하다.
'Splunk > Splunk Project' 카테고리의 다른 글
| 데이터 수집 | TAP 미러링 환경에서 Python raw socket으로 syslog 수집 시 중복 발생 원인 분석 및 해결 (1) | 2026.04.15 |
|---|---|
| 데이터 수집 | TAP(미러링) 환경에서 syslog 수집 안될 때 해결 방법 (tcpdump → Python RAW Socket까지) (0) | 2026.04.15 |
| Splunk Project | Ollama / AITK 모델 연동 시 발생하는 원인 박멸하기 (0) | 2026.04.14 |
| Splunk Project | Ollama / AITK 모델 연결 문제 해결 과정 정리 (0) | 2026.04.14 |
| Splunk Project | Splunk + Ollama + LLM 연동 구축 가이드 (실전 PoC 기준) (0) | 2026.04.01 |