-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathCW.py
More file actions
480 lines (391 loc) · 19.5 KB
/
Copy pathCW.py
File metadata and controls
480 lines (391 loc) · 19.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import skrf as rf
from tqdm import tqdm
from Constants import *
def fft_to_dbm(fft_array, n_samples, impedance):
"""
Convert FFT array to power in dBm for plotting purposes.
Parameters:
fft_array (np.ndarray): Complex FFT array.
n_samples (int): Number of samples used in FFT.
impedance (float): System impedance in ohms.
Returns:
np.ndarray: Power spectrum in dBm.
"""
v_peak = (np.abs(fft_array) / n_samples) * 2
v_rms = v_peak / np.sqrt(2)
power_watts = (v_rms**2) / impedance
power_watts[power_watts < LOG_MINIMUM] = LOG_MINIMUM # avoid log(0)
return 10 * np.log10(power_watts) + DBM_REFERENCE_DB
def plot_fft_spectrum(ch1_fft, ch2_fft, fft_freq, ch1_real_freq, ch2_real_freq, freq, date, output_dir):
"""
Plot FFT spectrum for both channels with selected frequency points highlighted.
Parameters:
ch1_fft (np.ndarray): Complex FFT array for channel 1.
ch2_fft (np.ndarray): Complex FFT array for channel 2.
fft_freq (np.ndarray): Frequency array for FFT bins in Hz.
ch1_real_freq (float): Selected frequency for channel 1 in Hz.
ch2_real_freq (float): Selected frequency for channel 2 in Hz.
freq (float): Target frequency in GHz.
date (str): Date string for filename.
output_dir (str): Output directory path.
Returns:
str: Path to the saved plot file.
"""
ch1_dbm = fft_to_dbm(ch1_fft, SAMPLE_CUTOFF, R_0)
ch2_dbm = fft_to_dbm(ch2_fft, SAMPLE_CUTOFF, R_0)
plt.figure(figsize=(10, 5))
plt.plot(fft_freq, ch1_dbm, '-', label='Ch1 FFT', color=COLORS[0], linewidth=1)
plt.plot(fft_freq, ch2_dbm, '-', label='Ch2 FFT', color=COLORS[1], linewidth=1)
plt.scatter([ch1_real_freq], [ch1_dbm[np.argmin(np.abs(fft_freq - ch1_real_freq))]],
color=COLORS[0], s=100, label='Ch1 Selected Freq', zorder=5)
plt.scatter([ch2_real_freq], [ch2_dbm[np.argmin(np.abs(fft_freq - ch2_real_freq))]],
color=COLORS[1], s=100, label='Ch2 Selected Freq', zorder=5)
plt.title(f"FFT of Channels at {freq} GHz")
plt.xlabel("Frequency (Hz)")
plt.ylabel("Power (dBm)")
plt.legend()
plt.tight_layout()
fft_plot_path = os.path.join(output_dir, f"CW_FFT_{freq}GHz_{date}.png")
plt.savefig(fft_plot_path, bbox_inches='tight')
plt.close()
return fft_plot_path
def plot_power_results(power_df, date, output_dir):
"""
Plot corrected output power versus frequency.
Parameters:
power_df (pd.DataFrame): DataFrame containing power and frequency data.
date (str): Date string for filename.
output_dir (str): Output directory path.
Returns:
str: Path to the saved plot file.
"""
plt.figure(figsize=(10, 5))
plt.plot(power_df["ch1_real_freq"] / 1e9, power_df["ch1_power_out_dbm"],
'-o', label='Ch1 Power Out (Corrected)', color=COLORS[0], alpha=.7)
plt.plot(power_df["ch2_real_freq"] / 1e9, power_df["ch2_power_out_dbm"],
'-o', label='Ch2 Power Out (Corrected)', color=COLORS[1], alpha=.7)
plt.plot(power_df["Frequency"], power_df["Power(dBm)"], '-', label='Power In', color='red')
if "power_norminal" in power_df.columns:
plt.plot(power_df["Frequency"], power_df["power_norminal"],
'--', label='Nominal Power In', color='red', alpha=0.5)
plt.title("Corrected Output Power from CW Measurement")
plt.xlabel("Frequency (GHz)")
plt.ylabel("Power (dBm)")
plt.legend()
plt.tight_layout()
power_plot_path = os.path.join(output_dir, f"CW_Power_{date}.png")
plt.savefig(power_plot_path, bbox_inches='tight')
plt.close()
return power_plot_path
def plot_gain_phase_results(power_df, date, output_dir):
"""
Plot gain and phase difference versus frequency.
Parameters:
power_df (pd.DataFrame): DataFrame containing gain and phase data.
date (str): Date string for filename.
output_dir (str): Output directory path.
Returns:
str: Path to the saved plot file.
"""
plt.figure(figsize=(10, 5))
ax1 = plt.gca()
ax2 = ax1.twinx()
# plot gain
ax1.plot(power_df["ch1_real_freq"] / 1e9, power_df["ch1_gain"],
'-', color=COLORS[0], label='Ch1 Gain')
ax1.plot(power_df["ch2_real_freq"] / 1e9, power_df["ch2_gain"],
'-', color=COLORS[1], label='Ch2 Gain')
ax1.set_xlabel("Frequency (GHz)")
ax1.set_ylabel("Gain (dB)")
ax1.legend(loc="lower left")
# plot phase difference (convert to degrees)
phase_diff_deg = np.degrees(np.unwrap(power_df["phase_diff"].dropna()))
ax2.plot(power_df["ch1_real_freq"].dropna() / 1e9, phase_diff_deg,
'-', color=COLORS[3], label='Phase Difference')
ax2.set_ylabel("Phase Difference (degrees)")
ax2.legend(loc="upper right")
plt.title("Gain and Phase Difference vs Frequency")
plt.tight_layout()
gain_phase_plot_path = os.path.join(output_dir, f"CW_Gain_Phase_{date}.png")
plt.savefig(gain_phase_plot_path, bbox_inches='tight')
plt.close()
return gain_phase_plot_path
def process_CWdata(ch1_data, ch2_data, input_power_dbm, freq, **kwargs):
"""
Process time-domain data to calculate raw complex S-parameters.
This function takes time-domain voltage data for two channels and calculates
the raw, uncorrected complex S-parameters (e.g., S31, S46) based on the
known input power.
Parameters:
ch1_data (np.ndarray): Time-domain data for channel 1.
ch2_data (np.ndarray): Time-domain data for channel 2.
input_power_dbm (float): The input power in dBm.
freq (float): The target frequency in GHz.
**kwargs: Additional keyword arguments.
fft_freq (np.ndarray): Frequency bins for the FFT.
ch1s (np.ndarray): S-parameter compensation network for channel 1 (optional).
ch2s (np.ndarray): S-parameter compensation network for channel 2 (optional).
impedance (float): System impedance in ohms (default R_0).
n_samples (int): Number of FFT samples (default SAMPLE_CUTOFF).
Returns:
tuple: A tuple containing:
- s31_raw (complex): The raw complex S-parameter for channel 1.
- s46_raw (complex): The raw complex S-parameter for channel 2.
- ch1_real_freq (float): The actual frequency bin for channel 1 in Hz.
- ch2_real_freq (float): The actual frequency bin for channel 2 in Hz.
- ch1_fft (np.ndarray): The full complex FFT spectrum for channel 1.
- ch2_fft (np.ndarray): The full complex FFT spectrum for channel 2.
"""
# get kwargs
fft_freq = kwargs.get("fft_freq", np.fft.rfftfreq(SAMPLE_CUTOFF, d=1/(FS*1e9)))
n_samples = kwargs.get("n_samples", SAMPLE_CUTOFF)
impedance = kwargs.get("impedance", R_0)
ch1s = kwargs.get("ch1s", None)
ch2s = kwargs.get("ch2s", None)
# convert ADC counts to voltage
ch1_voltage = ch1_data * VOLT_PER_TICK
ch2_voltage = ch2_data * VOLT_PER_TICK
# perform FFT
ch1_fft = np.fft.rfft(ch1_voltage)
ch2_fft = np.fft.rfft(ch2_voltage)
# compensate with S-parameters if provided
if ch1s is not None and ch2s is not None:
ch1_fft /= ch1s[:, 1, 0] # Apply S21 correction for channel 1
ch2_fft /= ch2s[:, 1, 0] # Apply S21 correction for channel 2
# find the index closest to the target frequency
target_freq_hz = freq * 1e9
freq_idx_ch1 = np.argmin(np.abs(fft_freq - target_freq_hz))
freq_idx_ch2 = np.argmin(np.abs(fft_freq - target_freq_hz))
ch1_fft_peak = ch1_fft[freq_idx_ch1]
ch2_fft_peak = ch2_fft[freq_idx_ch2]
# --- Complex S-Parameter Calculation ---
# calculate the incident wave 'a' from input power
power_watts = 10**((input_power_dbm - DBM_REFERENCE_DB) / 10)
a_in = np.sqrt(power_watts) # assume phase is 0
# calculate the outgoing wave 'b' from the measured FFT voltage
# b = V_rms / sqrt(Z0), where V_rms = V_peak / sqrt(2)
# V_peak = (FFT_magnitude / N) * 2 (for single-sided spectrum)
def calculate_b_out(fft_peak_value):
v_peak_complex = (fft_peak_value / n_samples) * 2
v_rms_complex = v_peak_complex / np.sqrt(2)
return v_rms_complex / np.sqrt(impedance)
b_out_ch1 = calculate_b_out(ch1_fft_peak)
b_out_ch2 = calculate_b_out(ch2_fft_peak)
# S-parameter is the ratio of the outgoing to incoming wave
s31_raw = b_out_ch1 / a_in
s46_raw = b_out_ch2 / a_in
ch1_real_freq = fft_freq[freq_idx_ch1]
ch2_real_freq = fft_freq[freq_idx_ch2]
return s31_raw, s46_raw, ch1_real_freq, ch2_real_freq, ch1_fft, ch2_fft
def alphabeta_correction(power_df: pd.DataFrame, **kwargs) -> tuple[pd.DataFrame, bool]:
"""
Load S-parameter files and calculate alpha and beta corrections for the power dataframe.
Parameters:
power_df: The power dataframe containing frequency and power measurements.
**kwargs: Additional keyword arguments for file paths and processing options.
n_points: Number of points for frequency axis (default is DEFAULT_INTERP_POINTS).
file1, file2, file3, file4, file5, file6: Paths to S-parameter files.
Returns:
A tuple containing the updated power dataframe and a boolean indicating success.
"""
try:
# define a common frequency axis for all S-parameters
n_points = kwargs.get("n_points", DEFAULT_INTERP_POINTS)
freq_axis_ghz = np.linspace(1, 2, n_points, dtype=float)
common_freq = rf.Frequency.from_f(freq_axis_ghz, unit='GHz')
# 1 - S^SP_1A & S^SP_11
file1 = kwargs.get("file1", FILE1)
A1 = rf.Network(file1).interpolate(common_freq)
SSP1A = pd.Series(A1.s[:, 1, 0], index=freq_axis_ghz)
SSP11 = pd.Series(A1.s[:, 1, 1], index=freq_axis_ghz)
# 2 - S^SP_BB & S^SP_BA
file2 = kwargs.get("file2", FILE2)
ab_df = pd.read_csv(file2)
sspbb_real = np.interp(freq_axis_ghz, ab_df['Freq'], ab_df["S22_r"])
sspbb_imag = np.interp(freq_axis_ghz, ab_df['Freq'], ab_df["S22_i"])
SSPBB = pd.Series(sspbb_real + 1j * sspbb_imag, index=freq_axis_ghz)
sspba_real = np.interp(freq_axis_ghz, ab_df['Freq'], ab_df["S21_r"])
sspba_imag = np.interp(freq_axis_ghz, ab_df['Freq'], ab_df["S21_i"])
SSPBA = pd.Series(sspba_real + 1j * sspba_imag, index=freq_axis_ghz)
# 3 - G^PM
file3 = kwargs.get("file3", FILE3)
gpm_df = pd.read_csv(file3)
GPM = pd.Series(
np.interp(freq_axis_ghz, gpm_df['Freq'] / 1e9, gpm_df["S11_r"] + 1j * gpm_df["S11_i"]),
index=freq_axis_ghz
)
# 4 - G^NCin
file4 = kwargs.get("file4", FILE4)
GNCin = pd.Series(
rf.Network(file4).interpolate(common_freq).s[:, 0, 0], # S11
index=freq_axis_ghz
)
# 5 - S^SP_66 & S^SP_6A
file5 = kwargs.get("file5", FILE5)
A6 = rf.Network(file5).interpolate(common_freq)
SSP66 = pd.Series(A6.s[:, 1, 1], index=freq_axis_ghz) # S22
SSP6A = pd.Series(A6.s[:, 1, 0], index=freq_axis_ghz) # S21
# 6 - G^NCout
file6 = kwargs.get("file6", FILE6)
GNCout = pd.Series(
rf.Network(file6).interpolate(common_freq).s[:, 0, 0], # S11
index=freq_axis_ghz
)
# load all S-parameters into one dataframe
sparam_df = pd.DataFrame({
'SSP1A': SSP1A, 'SSP11': SSP11,
'SSPBB': SSPBB, 'SSPBA': SSPBA,
'GPM': GPM, 'GNCin': GNCin,
'SSP66': SSP66, 'SSP6A': SSP6A,
'GNCout': GNCout
})
# calculate alpha correction factor for channel 1
alpha_num = sparam_df['SSP1A'] * (1 - sparam_df['SSPBB'] * sparam_df['GPM'])
alpha_den = sparam_df['SSPBA'] * (1 - sparam_df['SSP11'] * sparam_df['GNCin'])
sparam_df['alpha'] = alpha_num / alpha_den
# calculate beta correction factor for channel 2
beta_num = sparam_df['SSP6A'] * (1 - sparam_df['SSPBB'] * sparam_df['GPM'])
beta_den = sparam_df['SSPBA'] * (1 - sparam_df['SSP66'] * sparam_df['GNCout'])
sparam_df['beta'] = beta_num / beta_den
# sort and merge alpha/beta corrections with power dataframe
power_df = power_df.sort_values('Frequency').reset_index(drop=True)
sparam_df = sparam_df.sort_index().reset_index().rename(columns={'index': 'Frequency'})
# merge alpha, beta, and GPM columns using nearest frequency matching
power_df = pd.merge_asof(
left=power_df,
right=sparam_df[['Frequency', 'alpha', 'beta', 'GPM']],
on='Frequency',
direction='nearest' # finds the closest s-param frequency
)
# ensure frequency column remains float (not complex)
power_df['Frequency'] = power_df['Frequency'].astype(float)
return power_df, True
except FileNotFoundError as e:
print(f"Alpha/Beta S-parameter files not found: {e}. Skipping Alpha/Beta correction.")
return power_df, False
def CW_main(date: str = CW_DATE, **kwargs) -> list[str]:
"""
Main function to process continuous wave (CW) measurement data and generate analysis results.
This function processes CW measurement data to calculate gain, apply S-parameter corrections,
and generate summary plots and CSV files with corrected complex S-parameters.
Parameters:
date (str): The date string used to identify the relevant CSV and data files.
Defaults to CW_DATE from Constants.
**kwargs: Additional keyword arguments.
filename (str): Template for the data file names. Defaults to CW_FILENAME.
graph_flag (int): Controls which frequency indices to graph.
0 = none, 1 = first and last, 2 = all. Defaults to 0.
file_kwargs (dict): Additional keyword arguments passed to alphabeta_correction.
Returns:
list[str]: A list of file paths for all generated output files.
Raises:
FileNotFoundError: If no CSV files matching the date pattern are found in the data directory.
"""
# ensure output directory exists
if not os.path.exists(CW_DIR):
os.makedirs(CW_DIR)
filepaths = []
# find the first CSV file ending with _{date}.csv
csv_files = [f for f in os.listdir(DATA_DIR) if f.endswith(f"_{date}.csv")]
if not csv_files:
raise FileNotFoundError(f"No CSV files found in {DATA_DIR} ending with _{date}.csv")
csv_name = csv_files[0]
print(f"Using CSV file: {csv_name}")
# read in power csv
csv_path = os.path.join(DATA_DIR, csv_name)
power_df = pd.read_csv(csv_path)
# load S-parameters for alpha/beta correction
filekwargs = kwargs.get("file_kwargs", {})
power_df, ab_success = alphabeta_correction(power_df, **filekwargs)
# initialize lists to store results
s31_raw_list, s46_raw_list = [], []
ch1_real_freq_list, ch2_real_freq_list = [], []
filename = kwargs.get("filename", CW_FILENAME)
graph_flag = kwargs.get("graph_flag", 0)
graph_indices = {0, len(power_df) - 1} if graph_flag == 1 else set(range(len(power_df))) if graph_flag == 2 else set()
fft_freq = np.fft.rfftfreq(SAMPLE_CUTOFF, d=1/(FS*1e9))
# process each frequency
for idx, row in tqdm(power_df.iterrows(), total=len(power_df), unit="frequencies", colour='#808080'):
freq = float(np.real(row["Frequency"])) # ensure frequency is real (not complex)
curr_filename = filename.format(freq=freq, FS=FS, date=date)
if not os.path.exists(os.path.join(DATA_DIR, curr_filename)):
print(f"File {curr_filename} does not exist in {DATA_DIR}. Skipping...")
# append placeholder values to maintain index alignment
s31_raw_list.append(np.nan)
s46_raw_list.append(np.nan)
ch1_real_freq_list.append(np.nan)
ch2_real_freq_list.append(np.nan)
continue
data_array = np.load(os.path.join(DATA_DIR, curr_filename))
s31_raw, s46_raw, f1, f2, ch1_fft, ch2_fft = process_CWdata(
ch1_data=data_array[0, :SAMPLE_CUTOFF],
ch2_data=data_array[1, :SAMPLE_CUTOFF],
input_power_dbm=row["Power(dBm)"],
freq=freq,
fft_freq=fft_freq
)
s31_raw_list.append(s31_raw)
s46_raw_list.append(s46_raw)
ch1_real_freq_list.append(f1)
ch2_real_freq_list.append(f2)
# graph frequency response if requested
if idx in graph_indices:
fft_plot_path = plot_fft_spectrum(
ch1_fft, ch2_fft, fft_freq, f1, f2, freq, date, CW_DIR
)
filepaths.append(fft_plot_path)
# add raw results to dataframe
power_df["s31_raw"] = s31_raw_list
power_df["s46_raw"] = s46_raw_list
power_df["ch1_real_freq"] = ch1_real_freq_list
power_df["ch2_real_freq"] = ch2_real_freq_list
# apply corrections if successful
if ab_success:
# ensure data types are correct for complex math
power_df['alpha'] = power_df['alpha'].astype(complex)
power_df['beta'] = power_df['beta'].astype(complex)
power_df['GPM'] = power_df['GPM'].astype(complex)
# calculate mismatch correction factor (scalar)
mismatch_factor = np.sqrt(1 - np.abs(power_df['GPM'])**2)
# apply alpha/beta correction using complex division
power_df["s31_corrected"] = (power_df["s31_raw"] / power_df["alpha"]) * mismatch_factor
power_df["s46_corrected"] = (power_df["s46_raw"] / power_df["beta"]) * mismatch_factor
else:
# if no correction, the corrected value is just the raw value
power_df["s31_corrected"] = power_df["s31_raw"]
power_df["s46_corrected"] = power_df["s46_raw"]
# calculate final gain and phase from the corrected complex S-parameters
power_df["ch1_gain"] = 20 * np.log10(np.abs(power_df["s31_corrected"]))
power_df["ch2_gain"] = 20 * np.log10(np.abs(power_df["s46_corrected"]))
power_df["ch1_phase"] = np.angle(power_df["s31_corrected"])
power_df["ch2_phase"] = np.angle(power_df["s46_corrected"])
# calculate phase difference from corrected values
cross_corrected = power_df["s31_corrected"] * np.conjugate(power_df["s46_corrected"])
power_df["phase_diff"] = np.angle(cross_corrected)
# print average gain
print(f"Ch1 Gain: mean = {power_df['ch1_gain'].mean():.2f} dB, median = {power_df['ch1_gain'].median():.2f} dB")
print(f"Ch2 Gain: mean = {power_df['ch2_gain'].mean():.2f} dB, median = {power_df['ch2_gain'].median():.2f} dB")
# calculate final output power for plotting
power_df['ch1_power_out_dbm'] = power_df['ch1_gain'] + power_df['Power(dBm)']
power_df['ch2_power_out_dbm'] = power_df['ch2_gain'] + power_df['Power(dBm)']
# generate plots
power_plot_path = plot_power_results(power_df, date, CW_DIR)
filepaths.append(power_plot_path)
gain_phase_plot_path = plot_gain_phase_results(power_df, date, CW_DIR)
filepaths.append(gain_phase_plot_path)
# create complex S-parameter results CSV
complex_gain_df = pd.DataFrame({
'Frequency': power_df['Frequency'], # Frequency in GHz
'S31': power_df['s31_corrected'],
'S46': power_df['s46_corrected'],
})
csv_output_path = os.path.join(CW_DIR, f"Processed_CW_{date}.csv")
complex_gain_df.to_csv(csv_output_path, index=False)
filepaths.append(csv_output_path)
return filepaths
if __name__ == "__main__":
CW_main()