Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions webapp/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: CC0-1.0
FROM alpine:3.20
RUN apk add --no-cache py3-aiosqlite py3-jinja2 py3-starlette py3-uvloop uvicorn
RUN apk add --no-cache lz4 tcpdump
COPY . /webapp
WORKDIR /webapp
CMD ["uvicorn", "--host", "0.0.0.0", "--timeout-graceful-shutdown", "1", "main:app"]
61 changes: 49 additions & 12 deletions webapp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import asyncio
import base64
import contextlib
import itertools
import json
import time
from pathlib import Path
Expand All @@ -20,6 +21,8 @@
from starlette.staticfiles import StaticFiles
from starlette.templating import Jinja2Templates

from pcap import stream_pcaps_with_bpf


def row_to_dict(row: aiosqlite.Row) -> dict:
row_dict = dict(row)
Expand Down Expand Up @@ -181,25 +184,59 @@ async def api_flow_pcap_get(request):

# Query flow start timestamp from database
async with eve_db.execute(
"SELECT ts_start FROM flow WHERE id = ?", (flow_id,)
"SELECT ts_start, ts_end, src_ip, src_port, dest_ip, dest_port FROM flow WHERE id = ?",
(flow_id,),
) as cursor:
flow = await cursor.fetchone()
if not flow:
raise HTTPException(404)
flow_us = flow["ts_start"] // 1000

# convert μs to seconds
flow_start_secs = flow["ts_start"] // 1000_000
flow_end_secs = flow["ts_end"] // 1000_000

# Build the BPF filter for this flow
src_ip = flow["src_ip"].strip("[]")
src_port = flow["src_port"]
dst_ip = flow["dest_ip"].strip("[]")
dst_port = flow["dest_port"]
bpf_filter = (
f"host {src_ip} and host {dst_ip} and " f"port {src_port} and port {dst_port}"
)

# Serve corresponding pcap, found using timestamp
path = None
for pcap_path in sorted(Path("../suricata/output/pcaps/").glob("*.*")):
pcap_us = int(pcap_path.name.replace(".lz4", "").rsplit(".", 1)[-1])
if pcap_us * 1000 > flow_us:
break # take previous one
path = pcap_path
if path is None:
pcaps = []
pcap_paths = list(sorted(Path("../suricata/output/pcaps/").glob("*.*")))
timestamps = [
int(str(pcap_path).replace(".lz4", "").rsplit(".", 1)[-1])
for pcap_path in pcap_paths
]

# create pairs of (start, end) timestamps for each pcap
start_end_stamps = list(itertools.zip_longest(timestamps, timestamps[1:]))

# Find pcaps overlapping with the flow's time range
for pcap_path, pcap_start_end in zip(pcap_paths, start_end_stamps):
pcap_start, pcap_end = pcap_start_end
# Check if flow overlaps with this PCAP's time range
# For the last PCAP (pcap_end is None), only check if flow ends after pcap starts
if pcap_end is None:
# Last PCAP: include if flow hasn't ended before this PCAP started
if flow_end_secs > pcap_start:
pcaps.append(pcap_path)
else:
# Normal overlap check for PCAPs with known end time
if flow_start_secs < pcap_end and flow_end_secs > pcap_start:
pcaps.append(pcap_path)

if not pcaps:
raise HTTPException(404)
return Response(
path.open("rb").read(), # cache before sending as file might change
headers={"Content-Disposition": f'attachment; filename="{path.name}"'},

filename = f"flow-{flow_id}.pcap"
return StreamingResponse(
stream_pcaps_with_bpf(pcaps, bpf_filter),
media_type="application/vnd.tcpdump.pcap",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)


Expand Down
75 changes: 75 additions & 0 deletions webapp/pcap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Yun Zheng Hu
# SPDX-License-Identifier: GPL-2.0-or-later
import asyncio
import io
import logging
from pathlib import Path
from typing import AsyncIterable, BinaryIO, Iterable

PCAP_HEADER_SIZE = 24

log = logging.getLogger(__name__)


async def pipe_data(reader: asyncio.StreamReader, fout: BinaryIO, skip: int = 0):
"""Pipe data from reader to fout, skipping initial bytes if specified."""
while True:
chunk = await reader.read(io.DEFAULT_BUFFER_SIZE)
if not chunk:
break
if skip:
chunk = chunk[skip:]
skip = 0
fout.write(chunk)


async def stream_pcaps(pcaps: Iterable[str | Path], fout: BinaryIO) -> None:
"""Stream multiple pcap files to the given output file-like object."""
for i, pcap in enumerate(pcaps):
log.debug(pcap)
match Path(pcap).suffix.lower():
case ".lz4":
args = ["lz4cat", str(pcap)]
case ".gz":
args = ["zcat", str(pcap)]
case ".pcap" | ".pcapng":
args = ["cat", str(pcap)]
case _:
raise ValueError(f"Unknown file extension for pcap: {pcap}")

log.debug("stream_pcaps args=%r", args)
process = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
)
skip = PCAP_HEADER_SIZE if i > 0 else 0
async with asyncio.TaskGroup() as tg:
tg.create_task(pipe_data(process.stdout, fout, skip=skip))

log.debug("finished stream_pcaps")


async def stream_pcaps_with_bpf(
pcaps: list[str | Path], bpf_filter: str
) -> AsyncIterable[bytes]:
"""Stream pcap data through tcpdump with BPF filter applied."""
# Run tcpdump with reading from stdin and writing to stdout using the BPF filter
args = ["tcpdump", "-r", "-", "-w", "-", bpf_filter]
log.debug("stream_pcaps_with_bpf args=%r", args)
process = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)

# create task to stream pcaps into tcpdump's stdin
stream_task = asyncio.create_task(stream_pcaps(pcaps, process.stdin))
stream_task.add_done_callback(lambda t: process.stdin.close())

# read tcpdump's stdout and yield chunks
while True:
chunk = await process.stdout.read(io.DEFAULT_BUFFER_SIZE)
if not chunk:
break
yield chunk