Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/prime-tunnel/src/prime_tunnel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
TunnelTimeoutError,
)
from prime_tunnel.models import TunnelInfo
from prime_tunnel.tunnel import Tunnel
from prime_tunnel.tunnel import StateCallback, Tunnel, TunnelState

__all__ = [
"__version__",
Expand All @@ -21,6 +21,8 @@
"TunnelClient",
# Main interface
"Tunnel",
"TunnelState",
"StateCallback",
# Models
"TunnelInfo",
# Exceptions
Expand Down
88 changes: 86 additions & 2 deletions packages/prime-tunnel/src/prime_tunnel/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import subprocess
import threading
import time
import traceback
from enum import Enum
from pathlib import Path
from typing import Optional
from typing import Callable, Optional

import httpx

Expand All @@ -30,6 +32,25 @@
_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")


class TunnelState(str, Enum):
"""Lifecycle state of a tunnel's frpc process."""

STOPPED = "stopped"
CONNECTING = "connecting"
CONNECTED = "connected"
DISCONNECTED = "disconnected"


StateCallback = Callable[[TunnelState, TunnelState], None]

_DISCONNECT_SIGNALS = (
"heartbeat timeout",
"pong message contains error",
"try to connect to server",
)
_CONNECTED_SIGNAL = "start proxy success"


def _parse_frpc_error(
output_lines: list[str],
tunnel_id: str | None = None,
Expand Down Expand Up @@ -93,6 +114,10 @@ def __init__(
self._started = False
self._output_lines: list[str] = []

self._state: TunnelState = TunnelState.STOPPED
self._state_lock = threading.Lock()
self._state_callbacks: list[StateCallback] = []

@property
def tunnel_id(self) -> Optional[str]:
"""Get the tunnel ID."""
Expand All @@ -115,6 +140,52 @@ def is_running(self) -> bool:
return False
return self._process.poll() is None

@property
def state(self) -> TunnelState:
"""Current tunnel state."""
with self._state_lock:
return self._state

def on_state_change(self, callback: StateCallback) -> StateCallback:
"""Register a callback fired on state transitions."""
with self._state_lock:
self._state_callbacks.append(callback)
return callback

def off_state_change(self, callback: StateCallback) -> None:
"""Unregister a previously-registered state callback."""
with self._state_lock:
try:
self._state_callbacks.remove(callback)
except ValueError:
pass

def _set_state(self, new_state: TunnelState) -> None:
with self._state_lock:
if self._state == new_state:
return
old_state = self._state
self._state = new_state
callbacks = list(self._state_callbacks)

for cb in callbacks:
try:
cb(old_state, new_state)
except Exception:
traceback.print_exc()

def _handle_log_line(self, line: str) -> None:
"""Scan a drained frpc log line for state transition signals."""
clean = _ANSI_RE.sub("", line)
if _CONNECTED_SIGNAL in clean:
self._set_state(TunnelState.CONNECTED)
return
if self._state == TunnelState.CONNECTED:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlocked state read creates race in log handler

Medium Severity

_handle_log_line reads self._state at line 183 without holding _state_lock, while every other read/write of _state (state property, _set_state) correctly acquires the lock. Since _handle_log_line is called from a background drain thread, a TOCTOU race exists: the drain thread can read _state as CONNECTED, then _cleanup or sync_stop on the main thread sets state to STOPPED, and finally the drain thread calls _set_state(DISCONNECTED) — resulting in an invalid STOPPED → DISCONNECTED transition and spurious callbacks.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8d127db. Configure here.

for signal in _DISCONNECT_SIGNALS:
if signal in clean:
self._set_state(TunnelState.DISCONNECTED)
return

async def check_registered(self) -> bool:
"""Check if the tunnel is still registered server-side.

Expand Down Expand Up @@ -144,6 +215,8 @@ async def start(self) -> str:
if self._started:
raise TunnelError("Tunnel is already started")

self._set_state(TunnelState.CONNECTING)

# 1. Get frpc binary
frpc_path = await asyncio.to_thread(get_frpc_path)
Comment on lines +218 to 221
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Revert state when frpc binary lookup fails

start() moves the tunnel to CONNECTING before resolving the frpc binary, but the get_frpc_path call is outside any cleanup/rollback path. If binary resolution/download raises, the method exits with an exception while state remains CONNECTING, so consumers and callbacks see a tunnel that never transitions back to STOPPED after a failed start.

Useful? React with 👍 / 👎.


Expand Down Expand Up @@ -200,6 +273,7 @@ async def start(self) -> str:
raise TunnelConnectionError(message=f"Failed to start pipe drain: {e}") from e

self._started = True
self._set_state(TunnelState.CONNECTED)
Comment on lines 275 to +276
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Don't unconditionally force CONNECTED after pipe-drain startup

After _start_pipe_drain() returns, start() always sets CONNECTED even though drain threads can already have observed process exit/disconnect and set a later state (e.g., STOPPED). In that interleaving, this assignment overwrites the newer state and leaves state reporting CONNECTED for a dead tunnel, which can mislead reconnection and health logic.

Useful? React with 👍 / 👎.


return self.url

Expand Down Expand Up @@ -251,6 +325,7 @@ def sync_stop(self) -> None:
self._config_file = None

self._started = False
self._set_state(TunnelState.STOPPED)

async def _cleanup(self) -> None:
"""Clean up tunnel resources."""
Expand Down Expand Up @@ -293,6 +368,8 @@ async def _cleanup(self) -> None:
except Exception:
pass

self._set_state(TunnelState.STOPPED)

@property
def recent_output(self) -> list[str]:
"""Last N lines of frpc output (thread-safe). Falls back to startup output."""
Expand All @@ -319,7 +396,7 @@ def _start_pipe_drain(self) -> None:
self._recent_output: list[str] = list(self._output_lines[-max_lines:])

def drain_pipe(pipe):
"""Read output from a pipe, retaining recent lines."""
"""Read output from a pipe, retaining recent lines and firing state events."""
if pipe is None:
return
try:
Expand All @@ -330,8 +407,15 @@ def drain_pipe(pipe):
self._recent_output.append(line)
if len(self._recent_output) > max_lines:
self._recent_output.pop(0)
self._handle_log_line(line)
except (OSError, ValueError):
pass # Pipe closed
finally:
# If the process has exited, reflect it in state. Either pipe
# closing implies frpc is shutting down.
proc = self._process
if proc is None or proc.poll() is not None:
self._set_state(TunnelState.STOPPED)

self._drain_threads: list[threading.Thread] = []
for pipe in (self._process.stdout, self._process.stderr):
Expand Down
Loading
Loading