Skip to content

Commit 4e789fe

Browse files
bizfscacolomb
andauthored
Add unit tests for LSS master (#643)
- Switch state global/selective commands - Node ID and bit timing configuration - Activate bit timing and store configuration - LSS address inquiry (vendor/product/revision/serial) - Timeout and error handling - Fast scan protocol - Obsolete method aliases - LssError exception Co-authored-by: André Colomb <src@andre.colomb.de>
1 parent 80c6da3 commit 4e789fe

1 file changed

Lines changed: 237 additions & 0 deletions

File tree

test/test_lss.py

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import re
2+
import unittest
3+
from unittest.mock import MagicMock
4+
5+
from canopen import lss
6+
from canopen.lss import LssError, LssMaster
7+
8+
9+
class TestLssMaster(unittest.TestCase):
10+
"""Tests for LssMaster message encoding, decoding, and error handling.
11+
12+
Follows the same pattern as test_sdo.py: replace network.send_message
13+
with a custom method that records sent data and injects responses
14+
synchronously.
15+
"""
16+
17+
def setUp(self):
18+
self.lss = LssMaster()
19+
self.lss.RESPONSE_TIMEOUT = 0.1
20+
self.network = MagicMock()
21+
self.lss.network = self.network
22+
self.sent_messages = []
23+
24+
def _send_and_respond(self, response):
25+
"""Return a send_message side_effect that always injects the given response."""
26+
27+
def side_effect(cob_id, data):
28+
self.sent_messages.append((cob_id, bytes(data)))
29+
if data[0] in lss.ListMessageNeedResponse:
30+
self.lss.on_message_received(LssMaster.LSS_RX_COBID, response, 0.0)
31+
32+
return side_effect
33+
34+
def _send_no_response(self, cob_id, data):
35+
"""Record but do not send a response."""
36+
self.sent_messages.append((cob_id, bytes(data)))
37+
38+
# ---- switch state global ----
39+
40+
def test_send_switch_state_global_configuration(self):
41+
self.network.send_message.side_effect = self._send_no_response
42+
self.lss.send_switch_state_global(LssMaster.CONFIGURATION_STATE)
43+
self.assertEqual(len(self.sent_messages), 1)
44+
cob_id, data = self.sent_messages[0]
45+
self.assertEqual(cob_id, LssMaster.LSS_TX_COBID)
46+
self.assertEqual(len(data), 8)
47+
self.assertEqual(data[:2], b'\x04\x01')
48+
49+
def test_send_switch_state_global_waiting(self):
50+
self.network.send_message.side_effect = self._send_no_response
51+
self.lss.send_switch_state_global(LssMaster.WAITING_STATE)
52+
_, data = self.sent_messages[0]
53+
self.assertEqual(data[:2], b'\x04\x00')
54+
55+
# ---- configure node ID ----
56+
57+
def test_configure_node_id_success(self):
58+
response = b'\x11\x00\x00\x00\x00\x00\x00\x00'
59+
self.network.send_message.side_effect = self._send_and_respond(response)
60+
self.lss.configure_node_id(5)
61+
_, data = self.sent_messages[0]
62+
self.assertEqual(data[:2], b'\x11\x05')
63+
64+
def test_configure_node_id_error(self):
65+
response = b'\x11\x01\x00\x00\x00\x00\x00\x00'
66+
self.network.send_message.side_effect = self._send_and_respond(response)
67+
with self.assertRaisesRegex(LssError, re.compile('error.*1', re.I)):
68+
self.lss.configure_node_id(200)
69+
70+
def test_configure_node_id_wrong_cs(self):
71+
response = b'\xFF\x00\x00\x00\x00\x00\x00\x00'
72+
self.network.send_message.side_effect = self._send_and_respond(response)
73+
with self.assertRaisesRegex(LssError, re.compile('not for.*request', re.I)):
74+
self.lss.configure_node_id(5)
75+
76+
# ---- configure bit timing ----
77+
78+
def test_configure_bit_timing_success(self):
79+
response = b'\x13\x00\x00\x00\x00\x00\x00\x00'
80+
self.network.send_message.side_effect = self._send_and_respond(response)
81+
82+
self.lss.configure_bit_timing(4)
83+
_, data = self.sent_messages[0]
84+
self.assertEqual(data[:3], b'\x13\x00\x04')
85+
86+
# ---- activate bit timing ----
87+
88+
def test_activate_bit_timing(self):
89+
self.network.send_message.side_effect = self._send_no_response
90+
self.lss.activate_bit_timing(500)
91+
_, data = self.sent_messages[0]
92+
self.assertEqual(data[:3], b'\x15\xF4\x01')
93+
94+
# ---- store configuration ----
95+
96+
def test_store_configuration_success(self):
97+
response = b'\x17\x00\x00\x00\x00\x00\x00\x00'
98+
self.network.send_message.side_effect = self._send_and_respond(response)
99+
self.lss.store_configuration()
100+
101+
def test_store_configuration_error(self):
102+
response = b'\x17\x01\x00\x00\x00\x00\x00\x00'
103+
self.network.send_message.side_effect = self._send_and_respond(response)
104+
with self.assertRaisesRegex(LssError, re.compile('error.*1', re.I)):
105+
self.lss.store_configuration()
106+
107+
# ---- inquire node ID ----
108+
109+
def test_inquire_node_id(self):
110+
response = b'\x5E\x2A\x00\x00\x00\x00\x00\x00'
111+
self.network.send_message.side_effect = self._send_and_respond(response)
112+
node_id = self.lss.inquire_node_id()
113+
self.assertEqual(node_id, 42)
114+
115+
def test_inquire_node_id_wrong_cs(self):
116+
response = b'\xFF\x2A\x00\x00\x00\x00\x00\x00'
117+
self.network.send_message.side_effect = self._send_and_respond(response)
118+
with self.assertRaisesRegex(LssError, re.compile('not for.*request', re.I)):
119+
self.lss.inquire_node_id()
120+
121+
# ---- inquire LSS address ----
122+
123+
def test_inquire_vendor_id(self):
124+
response = b'\x5A\x78\x56\x34\x12\x00\x00\x00'
125+
self.network.send_message.side_effect = self._send_and_respond(response)
126+
result = self.lss.inquire_lss_address(lss.CS_INQUIRE_VENDOR_ID)
127+
self.assertEqual(result, 0x12345678)
128+
129+
def test_inquire_product_code(self):
130+
response = b'\x5B\xCD\xAB\x00\x00\x00\x00\x00'
131+
self.network.send_message.side_effect = self._send_and_respond(response)
132+
result = self.lss.inquire_lss_address(lss.CS_INQUIRE_PRODUCT_CODE)
133+
self.assertEqual(result, 0xABCD)
134+
135+
def test_inquire_revision_number(self):
136+
response = b'\x5C\x63\x00\x00\x00\x00\x00\x00'
137+
self.network.send_message.side_effect = self._send_and_respond(response)
138+
result = self.lss.inquire_lss_address(lss.CS_INQUIRE_REVISION_NUMBER)
139+
self.assertEqual(result, 99)
140+
141+
def test_inquire_serial_number(self):
142+
response = b'\x5D\xE9\x03\x00\x00\x00\x00\x00'
143+
self.network.send_message.side_effect = self._send_and_respond(response)
144+
result = self.lss.inquire_lss_address(lss.CS_INQUIRE_SERIAL_NUMBER)
145+
self.assertEqual(result, 1001)
146+
147+
def test_inquire_lss_address_wrong_cs(self):
148+
response = b'\xFF\x00\x00\x00\x00\x00\x00\x00'
149+
self.network.send_message.side_effect = self._send_and_respond(response)
150+
with self.assertRaisesRegex(LssError, re.compile('not for.*request', re.I)):
151+
self.lss.inquire_lss_address(lss.CS_INQUIRE_VENDOR_ID)
152+
153+
# ---- switch state selective ----
154+
155+
def test_send_switch_state_selective_success(self):
156+
response = b'\x44\x00\x00\x00\x00\x00\x00\x00'
157+
self.network.send_message.side_effect = self._send_and_respond(response)
158+
result = self.lss.send_switch_state_selective(0x1110, 0x2220, 0x3330, 0x4440)
159+
self.assertTrue(result)
160+
161+
self.assertEqual(len(self.sent_messages), 4)
162+
self.assertEqual(self.sent_messages[0][1][:5], b'\x40\x10\x11\x00\x00')
163+
self.assertEqual(self.sent_messages[1][1][:5], b'\x41\x20\x22\x00\x00')
164+
self.assertEqual(self.sent_messages[2][1][:5], b'\x42\x30\x33\x00\x00')
165+
self.assertEqual(self.sent_messages[3][1][:5], b'\x43\x40\x44\x00\x00')
166+
167+
def test_send_switch_state_selective_no_match(self):
168+
response = bytearray(8)
169+
self.network.send_message.side_effect = self._send_and_respond(response)
170+
result = self.lss.send_switch_state_selective(0x1110, 0x2220, 0x3330, 0x4440)
171+
self.assertFalse(result)
172+
173+
# ---- timeout / error handling ----
174+
175+
def test_no_response_timeout(self):
176+
self.network.send_message.side_effect = self._send_no_response
177+
with self.assertRaisesRegex(LssError, re.compile('no LSS response', re.I)):
178+
self.lss.inquire_node_id()
179+
180+
def test_unexpected_messages_cleared(self):
181+
"""Stale messages in queue should be cleared before sending."""
182+
self.lss.responses.put(bytearray(8))
183+
response = b'\x5E\x0A\x00\x00\x00\x00\x00\x00'
184+
self.network.send_message.side_effect = self._send_and_respond(response)
185+
186+
with self.assertLogs(level='INFO') as logs:
187+
node_id = self.lss.inquire_node_id()
188+
self.assertEqual(node_id, 10)
189+
self.assertTrue(any("unexpected" in msg for msg in logs.output))
190+
191+
# ---- on_message_received ----
192+
193+
def test_on_message_received(self):
194+
data = b'\xAA\x00\x00\x00\x00\x00\x00\x00'
195+
self.lss.on_message_received(LssMaster.LSS_RX_COBID, data, 1.0)
196+
result = self.lss.responses.get(block=False)
197+
self.assertEqual(result[0], 0xAA)
198+
199+
# ---- fast scan ----
200+
201+
def test_fast_scan_no_slave(self):
202+
"""No slave responds → returns (False, None)."""
203+
self.network.send_message.side_effect = self._send_no_response
204+
result, lss_id = self.lss.fast_scan()
205+
self.assertFalse(result)
206+
self.assertIsNone(lss_id)
207+
208+
def test_fast_scan_finds_slave(self):
209+
"""Simulate a slave that always responds to fast scan."""
210+
response = b'\x4F\x00\x00\x00\x00\x00\x00\x00'
211+
self.network.send_message.side_effect = self._send_and_respond(response)
212+
result, lss_id = self.lss.fast_scan()
213+
self.assertTrue(result)
214+
self.assertEqual(lss_id, [0, 0, 0, 0])
215+
216+
# ---- obsolete aliases ----
217+
218+
def test_send_switch_mode_global_alias(self):
219+
"""The obsolete send_switch_mode_global should delegate."""
220+
self.network.send_message.side_effect = self._send_no_response
221+
self.lss.send_switch_mode_global(LssMaster.CONFIGURATION_STATE)
222+
_, data = self.sent_messages[0]
223+
self.assertEqual(data[:2], b'\x04\x01')
224+
225+
226+
class TestLssError(unittest.TestCase):
227+
228+
def test_lss_error_is_exception(self):
229+
self.assertIsInstance(LssError("test"), Exception)
230+
231+
def test_lss_error_message(self):
232+
err = LssError("something went wrong")
233+
self.assertEqual(str(err), "something went wrong")
234+
235+
236+
if __name__ == '__main__':
237+
unittest.main()

0 commit comments

Comments
 (0)