Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,7 @@ cython_debug/
*.vpwhistu
*.vpwwildcardcache
*.vtg

# Testing files for PCAP parseer
*.pcap
*.pcapng
9 changes: 9 additions & 0 deletions pyomnilogic_local/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""CLI module for OmniLogic local control.

This module provides the command-line interface for controlling Hayward
OmniLogic and OmniHub pool controllers.
"""

from pyomnilogic_local.cli.utils import ensure_connection

__all__ = ["ensure_connection"]
57 changes: 28 additions & 29 deletions pyomnilogic_local/cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,42 @@
import asyncio
from typing import Any

import click

from pyomnilogic_local.api import OmniLogicAPI
from pyomnilogic_local.cli.debug import commands as debug
from pyomnilogic_local.cli.get import commands as get
from pyomnilogic_local.cli.utils import async_get_mspconfig, async_get_telemetry


async def get_omni(host: str) -> OmniLogicAPI:
return OmniLogicAPI(host, 10444, 5.0)
@click.group(invoke_without_command=True)
@click.pass_context
@click.option("--host", default="127.0.0.1", help="Hostname or IP address of OmniLogic system (default: 127.0.0.1)")
def entrypoint(ctx: click.Context, host: str) -> None:
"""OmniLogic Local Control - Command line interface for Hayward pool controllers.

This CLI provides local control and monitoring of Hayward OmniLogic and OmniHub
pool controllers using their local UDP API (typically on port 10444).

async def fetch_startup_data(omni: OmniLogicAPI) -> tuple[Any, Any]:
"""Fetch MSPConfig and Telemetry from the controller."""
try:
mspconfig = await async_get_mspconfig(omni)
telemetry = await async_get_telemetry(omni)
except Exception as exc:
raise RuntimeError(f"[ERROR] Failed to fetch config or telemetry from controller: {exc}") from exc
return mspconfig, telemetry
The CLI connects to your pool controller when you run a command and caches
configuration and telemetry data for use by that command.

Examples:
# Connect to controller at default address
omnilogic get lights

@click.group()
@click.pass_context
@click.option("--host", default="127.0.0.1", help="Hostname or IP address of omnilogic system")
def entrypoint(ctx: click.Context, host: str) -> None:
"""Main CLI entrypoint for OmniLogic local control."""
# Connect to specific controller IP
omnilogic --host 192.168.1.100 debug get-telemetry

# Get raw XML responses for debugging
omnilogic debug --raw get-mspconfig

For more information, visit: https://github.com/cryptk/python-omnilogic-local
"""
ctx.ensure_object(dict)
try:
omni = asyncio.run(get_omni(host))
mspconfig, telemetry = asyncio.run(fetch_startup_data(omni))
except Exception as exc: # pylint: disable=broad-except
click.secho(str(exc), fg="red", err=True)
ctx.exit(1)
ctx.obj["OMNI"] = omni
ctx.obj["MSPCONFIG"] = mspconfig
ctx.obj["TELEMETRY"] = telemetry

# Store the host for later connection, but don't connect yet
ctx.obj["HOST"] = host

# If no subcommand was provided, show help and exit
if ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
ctx.exit(0)


entrypoint.add_command(debug.debug)
Expand Down
187 changes: 69 additions & 118 deletions pyomnilogic_local/cli/debug/commands.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,82 @@
# Need to figure out how to resolve the 'Untyped decorator makes function "..." untyped' errors in mypy when using click decorators
# mypy: disable-error-code="misc"
import asyncio
import xml.etree.ElementTree as ET
import zlib
from collections import defaultdict
from pathlib import Path
from typing import Literal, overload

import click
from scapy.layers.inet import UDP
from scapy.utils import rdpcap

from pyomnilogic_local.api import OmniLogicAPI
from pyomnilogic_local.cli.utils import async_get_mspconfig, async_get_telemetry
from pyomnilogic_local.models.filter_diagnostics import FilterDiagnostics
from pyomnilogic_local.models.leadmessage import LeadMessage
from pyomnilogic_local.omnitypes import MessageType
from pyomnilogic_local.protocol import OmniLogicMessage
from pyomnilogic_local.cli import ensure_connection
from pyomnilogic_local.cli.pcap_utils import parse_pcap_file, process_pcap_messages
from pyomnilogic_local.cli.utils import async_get_filter_diagnostics


@click.group()
@click.option("--raw/--no-raw", default=False, help="Output the raw XML from the OmniLogic, do not parse the response")
@click.pass_context
def debug(ctx: click.Context, raw: bool) -> None:
# Container for all get commands
"""Debug commands for low-level controller access.

These commands provide direct access to controller data and debugging utilities
including configuration, telemetry, diagnostics, and PCAP file analysis.
"""
ctx.ensure_object(dict)
ctx.obj["RAW"] = raw
# Don't connect yet - parse_pcap doesn't need it, others will call ensure_connection individually


@debug.command()
@click.pass_context
def get_mspconfig(ctx: click.Context) -> None:
mspconfig = asyncio.run(async_get_mspconfig(ctx.obj["OMNI"], ctx.obj["RAW"]))
"""Retrieve the MSP configuration from the controller.

The MSP configuration contains all pool equipment definitions, system IDs,
and configuration parameters. Use --raw to see the unprocessed XML.

Example:
omnilogic debug get-mspconfig
omnilogic debug --raw get-mspconfig
"""
ensure_connection(ctx)
omni: OmniLogicAPI = ctx.obj["OMNI"]
mspconfig = asyncio.run(omni.async_get_config(raw=ctx.obj["RAW"]))
click.echo(mspconfig)


@debug.command()
@click.pass_context
def get_telemetry(ctx: click.Context) -> None:
telemetry = asyncio.run(async_get_telemetry(ctx.obj["OMNI"], ctx.obj["RAW"]))
"""Retrieve current telemetry data from the controller.

Telemetry includes real-time sensor readings, equipment states, temperatures,
and other operational data. Use --raw to see the unprocessed XML.

Example:
omnilogic debug get-telemetry
omnilogic debug --raw get-telemetry
"""
ensure_connection(ctx)
omni: OmniLogicAPI = ctx.obj["OMNI"]
telemetry = asyncio.run(omni.async_get_telemetry(raw=ctx.obj["RAW"]))
click.echo(telemetry)


@debug.command()
@click.option("--pool-id", help="System ID of the Body Of Water the filter is associated with")
@click.option("--filter-id", help="System ID of the filter to request diagnostics for")
@click.option(
"--pool-id", required=True, type=int, help="System ID of the Body Of Water the filter is associated with. Example: --pool-id 1"
)
@click.option("--filter-id", required=True, type=int, help="System ID of the filter to request diagnostics for. Example: --filter-id 5")
@click.pass_context
def get_filter_diagnostics(ctx: click.Context, pool_id: int, filter_id: int) -> None:
"""Get diagnostic information for a specific filter/pump.

This command retrieves detailed diagnostic data including firmware versions,
power consumption, and error status for a filter or pump.

Example:
omnilogic debug get-filter-diagnostics --pool-id 1 --filter-id 5
"""
ensure_connection(ctx)
filter_diags = asyncio.run(async_get_filter_diagnostics(ctx.obj["OMNI"], pool_id, filter_id, ctx.obj["RAW"]))
if ctx.obj["RAW"]:
click.echo(filter_diags)
Expand All @@ -71,117 +100,39 @@ def get_filter_diagnostics(ctx: click.Context, pool_id: int, filter_id: int) ->
)


@overload
async def async_get_filter_diagnostics(omni: OmniLogicAPI, pool_id: int, filter_id: int, raw: Literal[True]) -> str: ...
@overload
async def async_get_filter_diagnostics(omni: OmniLogicAPI, pool_id: int, filter_id: int, raw: Literal[False]) -> FilterDiagnostics: ...
async def async_get_filter_diagnostics(omni: OmniLogicAPI, pool_id: int, filter_id: int, raw: bool) -> FilterDiagnostics | str:
filter_diags = await omni.async_get_filter_diagnostics(pool_id, filter_id, raw=raw)
return filter_diags


@debug.command()
@click.argument("pcap_file", type=click.Path(exists=True, path_type=Path))
@click.pass_context
def parse_pcap(ctx: click.Context, pcap_file: Path) -> None:
"""Parse a PCAP file and reconstruct Omnilogic protocol communication."""
"""Parse a PCAP file and reconstruct Omnilogic protocol communication.

Analyzes network packet captures to decode OmniLogic protocol messages.
Automatically reassembles multi-part messages (LeadMessage + BlockMessages)
and decompresses payloads.

The PCAP file should contain UDP traffic captured from OmniLogic controller
communication (typically on port 10444).

Example:
omnilogic debug parse-pcap /path/to/capture.pcap
tcpdump -i eth0 -w pool.pcap udp port 10444
omnilogic debug parse-pcap pool.pcap
"""
# Read the PCAP file
try:
packets = rdpcap(str(pcap_file))
packets = parse_pcap_file(str(pcap_file))
except Exception as e:
click.echo(f"Error reading PCAP file: {e}", err=True)
raise click.Abort()

# Track multi-message sequences (LeadMessage + BlockMessages)
# Key: (src_ip, dst_ip, msg_id), Value: list of messages
message_sequences: dict[tuple[str, str, int], list[OmniLogicMessage]] = defaultdict(list)

# Process packets in order
for packet in packets:
if not packet.haslayer(UDP):
click.echo("Not a UDP packet, skipping...", err=True)
continue

udp = packet[UDP]
src_ip = packet.payload.src
dst_ip = packet.payload.dst

# Parse the Omnilogic message
try:
omni_msg = OmniLogicMessage.from_bytes(bytes(udp.payload))
click.echo(f"Parsed Omnilogic message: {omni_msg}")
except Exception: # pylint: disable=broad-except
# Not an Omnilogic message, skip it
click.echo("Not an Omnilogic message, skipping...", err=True)
continue

# Print the basic packet info
click.echo(f"{src_ip} sent {omni_msg.type.name} to {dst_ip}")

# Track LeadMessage/BlockMessage sequences
if omni_msg.type == MessageType.MSP_LEADMESSAGE:
# Start a new sequence
seq_key = (src_ip, dst_ip, omni_msg.id)
message_sequences[seq_key] = [omni_msg]
elif omni_msg.type == MessageType.MSP_BLOCKMESSAGE:
# Find the matching LeadMessage sequence
# We need to find the sequence with the same src/dst and highest ID less than or equal to this message
matching_seq: tuple[str, str, int] = ("", "", 0)
for seq_key in message_sequences:
if seq_key[0] == src_ip and seq_key[1] == dst_ip:
# Check if this is the right sequence (the LeadMessage should have been received before this block)
if not matching_seq or seq_key[2] > matching_seq[2]:
matching_seq = seq_key

if matching_seq:
message_sequences[matching_seq].append(omni_msg)

# Check if we have all the blocks
lead_msg = message_sequences[matching_seq][0]
lead_data = LeadMessage.from_orm(ET.fromstring(lead_msg.payload[:-1]))

# We have LeadMessage + all BlockMessages
if len(message_sequences[matching_seq]) == lead_data.msg_block_count + 1:
# Reassemble and decode
try:
decoded_msg = _reassemble_and_decode(message_sequences[matching_seq])
click.echo(f"\nMessage from {src_ip} decoded:")
click.echo(decoded_msg)
click.echo() # Extra newline for readability
except Exception as e: # pylint: disable=broad-except
click.echo(f"Error decoding message: {e}", err=True)

# Clean up this sequence
del message_sequences[matching_seq]


def _reassemble_and_decode(messages: list[OmniLogicMessage]) -> str:
"""
Reassemble a LeadMessage + BlockMessages sequence and decode the payload.

Args:
messages: List containing LeadMessage followed by BlockMessages

Returns:
Decoded message content as string
"""
lead_msg = messages[0]
block_msgs = messages[1:]

# Reassemble the blocks
# Sort by message ID to ensure correct order
sorted_blocks = sorted(block_msgs, key=lambda m: m.id)

# Concatenate the block payloads (skip the 8-byte header on each block)
reassembled = b""
for block_msg in sorted_blocks:
reassembled += block_msg.payload[8:]

# Decompress if necessary
if lead_msg.compressed:
reassembled = zlib.decompress(reassembled)
# Process all packets and extract OmniLogic messages
results = process_pcap_messages(packets)

# Decode to string
decoded = reassembled.decode("utf-8").strip("\x00")
# Display the results
for src_ip, dst_ip, omni_msg, decoded_content in results:
click.echo(f"\n{src_ip} sent {omni_msg.type.name} to {dst_ip}")

return decoded
if decoded_content:
click.echo("Decoded message content:")
click.echo(decoded_content)
click.echo() # Extra newline for readability
Loading