-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdumper.py
More file actions
117 lines (95 loc) · 3.25 KB
/
dumper.py
File metadata and controls
117 lines (95 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from __future__ import print_function, division
import struct
import sys
import io
import MTS
from MTS.Header import Header
from MTS.word.HeaderWord import HeaderWord
__author__ = 'rob'
def scan_to_headerword(serial_input, maximum_bytes=9999, header_magic=HeaderWord.MAGIC_MASK):
"""
Consume bytes until header magic is found in a word
:param header_magic:
:param maximum_bytes:
:param serial_input:
:rtype : MTS.Header.Header
"""
headerword = 0x0000
bytecount = 0
while headerword & header_magic != header_magic:
# BlockingIOError
# Read a single byte
nextbyte = serial_input.read(1)
if len(nextbyte) == 0:
raise BufferError("Reached end of stream")
bytecount += 1
# Take the low word and shift it high; Use OR to add this byte
nextint = ord(nextbyte)
# if DEBUG: print('0x{byte:02X} {byte:08b}'.format(byte=nextint))
headerword = ((headerword & 0x00FF) << 8) | nextint
if 0 < maximum_bytes <= bytecount:
raise BufferError("Failed to detect header word in serial stream")
try:
h = MTS.Header.Header(word=headerword)
# if DEBUG: print("Found header word. 0x{:04X}".format(h.word))
return h
except ValueError as e:
print("Invalid header word 0x{:04X}".format(headerword))
raise e
def read_packets(serial_input):
"""
Consume bytes from input, creating packet frames of words
:rtype: MTS.Packet.Packet
:type serial_input:
"""
while 1:
header = scan_to_headerword(serial_input)
yield header.read_packet(serial_input)
def captured_stream(filename='Serial-log.isp2'):
return io.open(
filename,
mode='rb',
buffering=io.DEFAULT_BUFFER_SIZE
)
# StampedMarker = namedtuple('StampedMarker', 'counter clock')
def live_stream(tty='cu.usbserial'):
import serial
try:
return serial.Serial('/dev/{}'.format(tty), 19200)
except OSError as e:
print("Failed to open port", file=sys.stderr)
return None
def print_packet(i, packet):
print("{: 5d} 0x{} {}".format(
i,
'-'.join(['{:04X}'.format(word) for word in packet.words()]),
packet.data_line()
))
def dump(instream, outstream=None):
for i, packet in enumerate(read_packets(instream)):
# Raw dump to screen
allwords = packet.words()
print_packet(i, packet)
outstream.write(struct.pack('{:d}H'.format(len(allwords)), *allwords))
if i % 100 == 0:
outstream.flush()
# print('Flush.', file=sys.stderr)
if __name__ == '__main__':
import tempfile
outwrapper = tempfile.NamedTemporaryFile(suffix='.ISP2', delete=False)
outwrapper.close()
outfile = io.open(outwrapper.name, mode='w+b')
print('Logging raw data to temp file: {}'.format(outwrapper.name), file=sys.stderr)
try:
# scan_swappedwords(captured_stream())
dump(
# live_stream('cu.usbserial'),
captured_stream('dumped-fromstorage.swapped.ISP2'),
# live_stream('cu.UC-232AC'),
outfile
)
except BufferError as e:
print("All done; {}".format(e))
finally:
if outfile is not None:
outfile.close()