|
12 | 12 |
|
13 | 13 | args = parser.parse_args() |
14 | 14 |
|
| 15 | +import requests |
| 16 | + |
| 17 | +def fetch_metadata(attribute): |
| 18 | + url = f"http://metadata.google.internal/computeMetadata/v1/instance/attributes/{attribute}" |
| 19 | + headers = {"Metadata-Flavor": "Google"} |
| 20 | + try: |
| 21 | + response = requests.get(url, headers=headers, timeout=5) |
| 22 | + response.raise_for_status() |
| 23 | + return response.text |
| 24 | + except Exception as e: |
| 25 | + print(f"Failed to fetch metadata attribute '{attribute}': {e}") |
| 26 | + return "unknown" |
| 27 | + |
| 28 | +machine_type = fetch_metadata("MACHINE_TYPE") |
| 29 | +gcsfuse_version = fetch_metadata("GCSFUSE_VERSION") |
| 30 | + |
15 | 31 | # Load the results file |
16 | 32 | with open(args.result_file) as f: |
17 | 33 | try: |
|
36 | 52 | # Create table if it doesn't exist |
37 | 53 | schema = [ |
38 | 54 | bigquery.SchemaField("job_name", "STRING"), |
| 55 | + bigquery.SchemaField("gcsfuse_version", "STRING"), |
| 56 | + bigquery.SchemaField("machine_type", "STRING"), |
39 | 57 | bigquery.SchemaField("start_time", "TIMESTAMP"), |
40 | 58 | bigquery.SchemaField("file_size", "STRING"), |
41 | 59 | bigquery.SchemaField("block_size", "STRING"), |
42 | 60 | bigquery.SchemaField("nrfiles", "INTEGER"), |
43 | 61 | bigquery.SchemaField("read_bandwidth_MiBps", "FLOAT"), |
44 | 62 | bigquery.SchemaField("write_bandwidth_MiBps", "FLOAT"), |
45 | 63 | bigquery.SchemaField("IOPS", "FLOAT"), |
46 | | - bigquery.SchemaField("duration_seconds", "FLOAT"), |
| 64 | + bigquery.SchemaField("avg_latency_ms", "FLOAT"), |
47 | 65 | ] |
48 | 66 |
|
49 | 67 | try: |
|
59 | 77 | rows = [] |
60 | 78 | for job in data.get("jobs", []): |
61 | 79 | jobname = job.get("jobname") |
62 | | - # Correctly access job options using .get() for nested keys |
63 | 80 | job_options = job.get("job options", {}) |
64 | 81 |
|
65 | | - # Use get with a default value for each option and handle string conversion |
66 | | - file_size = job_options.get("filesize", data.get("global options",{}).get("filesize", "unknown")) |
67 | | - block_size = job_options.get("bs", data.get("global options",{}).get("bs", "unknown")) |
68 | | - |
69 | | - # Convert nrfiles to int, handle missing values and potential string values |
70 | | - nrfiles_str = job_options.get("nrfiles", data.get("global options",{}).get("nrfiles")) |
| 82 | + file_size = job_options.get("filesize", data.get("global options", {}).get("filesize", "unknown")) |
| 83 | + block_size = job_options.get("bs", data.get("global options", {}).get("bs", "unknown")) |
| 84 | + |
| 85 | + nrfiles_str = job_options.get("nrfiles", data.get("global options", {}).get("nrfiles")) |
71 | 86 | nrfiles = int(nrfiles_str) if nrfiles_str and isinstance(nrfiles_str, str) and nrfiles_str.isdigit() else 0 |
72 | 87 |
|
73 | | - read_bw = job.get("read", {}).get("bw_bytes", 0) / (1024 * 1024) |
74 | | - write_bw = job.get("write", {}).get("bw_bytes", 0) / (1024 * 1024) |
75 | | - iops = job.get("read", {}).get("iops", 0.0) + job.get("write", {}).get("iops", 0.0) |
| 88 | + read = job.get("read", {}) |
| 89 | + write = job.get("write", {}) |
| 90 | + |
| 91 | + read_bw = read.get("bw_bytes", 0) / (1024 * 1024) |
| 92 | + write_bw = write.get("bw_bytes", 0) / (1024 * 1024) |
| 93 | + iops = read.get("iops", 0.0) + write.get("iops", 0.0) |
| 94 | + |
| 95 | + read_lat_ns = read.get("lat_ns", {}).get("mean") |
| 96 | + write_lat_ns = write.get("lat_ns", {}).get("mean") |
| 97 | + |
| 98 | + if read_lat_ns is not None and write_lat_ns is not None: |
| 99 | + avg_latency_ms = ((read_lat_ns + write_lat_ns) / 2) / 1_000_000 |
| 100 | + elif read_lat_ns is not None: |
| 101 | + avg_latency_ms = read_lat_ns / 1_000_000 |
| 102 | + elif write_lat_ns is not None: |
| 103 | + avg_latency_ms = write_lat_ns / 1_000_000 |
| 104 | + else: |
| 105 | + avg_latency_ms = 0.0 |
76 | 106 |
|
77 | 107 | rows.append({ |
78 | 108 | "job_name": jobname, |
| 109 | + "gcsfuse_version": gcsfuse_version, |
| 110 | + "machine_type": machine_type, |
79 | 111 | "start_time": start_time, |
80 | 112 | "file_size": file_size, |
81 | 113 | "block_size": block_size, |
82 | 114 | "nrfiles": nrfiles, |
83 | 115 | "read_bandwidth_MiBps": read_bw, |
84 | 116 | "write_bandwidth_MiBps": write_bw, |
85 | 117 | "IOPS": iops, |
86 | | - "duration_seconds": job.get("job_runtime", 0) / 1000, |
| 118 | + "avg_latency_ms": avg_latency_ms, |
87 | 119 | }) |
88 | 120 |
|
89 | 121 | # Insert rows |
|
0 commit comments