-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbeamsim.py
More file actions
170 lines (146 loc) · 5.08 KB
/
Copy pathbeamsim.py
File metadata and controls
170 lines (146 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import json
import subprocess
import os
import warnings
import numpy as np
import tempfile
import hashlib
def parse_report(lines):
if isinstance(lines, str):
lines = lines.splitlines()
items = []
for line in lines:
if not line.startswith('["report",'):
continue
_, *args = json.loads(line)
items.append(args)
items.sort(key=lambda x: x[0])
return items
def filter_report(items, type_):
return [x for x in items if x[1] == type_]
def time_axis(items):
return [x[0] for x in items]
def get_snark1_sent(items):
rows = filter_report(items, "snark1_sent")
xs, ys = [0], [0]
groups = dict()
for t, _, g, n2 in rows:
assert t != 0
n1 = groups.get(g, 0)
if n2 <= n1:
continue
groups[g] = n2
xs.append(t)
ys.append(ys[-1] + (n2 - n1))
return xs, ys
def get_snark1_received(items):
rows = filter_report(items, "snark1_received")
rows2 = list()
for row in rows:
if not rows2 or row[2] > rows2[-1][2]:
rows2.append(row)
return rows2
def get_signature_duplicates(items):
_, _, duplicates, signatures = filter_report(items, "signature-duplicates")[0]
avg_duplicates = duplicates / signatures
return duplicates, avg_duplicates
class Metrics:
def __init__(self, items):
rows = filter_report(items, "metrics")
_, _, *roles = filter_report(items, "metrics-roles")[0]
self.t = max(len(row[5]) for row in rows)
self.a = np.zeros((2, 3, 2, self.t))
for _, _, i1, i2, i3, bucket in rows:
self.a[i1, i2, i3, : len(bucket)] += bucket
self.messages_received_role = [np.cumsum(self.a[0][i][0]) for i in range(3)]
self.messages_received_all = np.sum(self.messages_received_role, axis=0)
self.messages_sent_role = [np.cumsum(self.a[0][i][1]) for i in range(3)]
self.messages_sent_all = np.sum(self.messages_sent_role, axis=0)
self.bytes_received_role = [np.cumsum(self.a[1][i][0]) for i in range(3)]
self.bytes_received_all = np.sum(self.bytes_received_role, axis=0)
self.bytes_received_role_avg = [
a / n for a, n in zip(self.bytes_received_role, roles)
]
self.bytes_sent_role = [np.cumsum(self.a[1][i][1]) for i in range(3)]
self.bytes_sent_all = np.sum(self.bytes_sent_role, axis=0)
self.bytes_sent_role_avg = [a / n for a, n in zip(self.bytes_sent_role, roles)]
topology_name = {
"direct": "Direct",
"gossip": "Gossip",
"grid": "Grid",
}
topologies = list(topology_name.keys())
role_name = [
"Validator",
"Local Aggregator",
"Global Aggregator",
]
exe = "build/beamsim"
if not os.path.exists(exe): # for docker build
exe = "/usr/local/bin/beamsim"
if not os.path.exists(exe):
raise FileNotFoundError(
f"Executable {exe} not found. Please build the project first."
)
run_cache = dict()
run_exe_time = None
def run(
b=None, t=None, g=None, gv=None, shuffle=False, mpi=False, c=None, la=None, ga=None, local_aggregation_only=False
):
if not isinstance(mpi, bool) and mpi > os.cpu_count():
warnings.warn(
f"beamsim.run requested mpi {mpi} exceeds os cpu count {os.cpu_count()}"
)
mpi = os.cpu_count()
if c is None:
if b is None:
b = "ns3"
if t is None:
t = "direct"
if g is None:
g = 10
if gv is None:
gv = 10
global run_exe_time
exe_time = os.stat(exe).st_mtime
if run_exe_time != exe_time:
run_exe_time = exe_time
run_cache.clear()
c_key = None if c is None else (c, os.stat(c).st_mtime)
key = (b, t, g, gv, shuffle, mpi, c_key, la, ga, local_aggregation_only)
output = run_cache.get(key, None)
if output is None:
cmd = [
*(
(["mpirun"] if mpi else [])
if isinstance(mpi, bool)
else ["mpirun", "-n", str(mpi)]
),
exe,
*([] if c is None else ["-c", c]),
*([] if b is None else ["-b", b]),
*([] if t is None else ["-t", t]),
*([] if g is None else ["-g", str(g)]),
*([] if gv is None else ["-gv", str(gv)]),
*(["--shuffle"] if shuffle else []),
*(["-la", str(la)] if la is not None else []),
*(["-ga", str(ga)] if ga is not None else []),
*(["--local-aggregation-only"] if local_aggregation_only else []),
"--report",
]
print(f"run: {' '.join(cmd)}")
output = subprocess.check_output(cmd, text=True)
run_cache[key] = output
return parse_report(output)
yaml_dir = None
def yaml(yaml: str):
global yaml_dir
if yaml_dir is None:
yaml_dir = os.path.join(tempfile.gettempdir(), "beamsim-yaml-md5")
os.makedirs(yaml_dir, exist_ok=True)
md5 = hashlib.md5(yaml.encode()).hexdigest()
path = os.path.join(yaml_dir, md5)
if not os.path.exists(path):
with open(path, "wb") as file:
file.write(yaml.encode())
return path