Skip to content

Commit 81b1bee

Browse files
zhindesZach Hindes
andauthored
add an nidaqmx example with an fft (#210)
* add an nidaqmx example with an fft * oops * black * code review * lint * back to cast * cleanup * words --------- Co-authored-by: Zach Hindes <zach.hindes@emerson.com>
1 parent 4e5e622 commit 81b1bee

3 files changed

Lines changed: 354 additions & 0 deletions

File tree

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# NI-DAQmx Analog Input with FFT Example
2+
3+
### Overview
4+
5+
This is a nipanel example that acquires a continuous amount of data using the DAQ device's internal clock and graphs the acquired data in both the time domain and frequency domain in an interactive Streamlit panel.
6+
7+
### Features
8+
9+
- NI-DAQmx Python configuration and acquisition
10+
- Displays data in an interactive chart using ECharts
11+
- Updates automatically as new data is acquired
12+
13+
### Prerequisites
14+
15+
- Python 3.10 or later
16+
- InstrumentStudio 2026 Q1 or later
17+
- A physical or simulated device, refer to the [NI-DAQmx Python README (Getting Started section)](https://github.com/ni/nidaqmx-python/blob/master/README.rst#getting-started)
18+
19+
### Usage
20+
21+
```pwsh
22+
poetry install --with examples
23+
poetry run python examples\nidaqmx\nidaqmx_analog_input_fft\nidaqmx_analog_input_fft.py
24+
```
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""Example of analog input voltage acquisition with a Fast Fourier Transform.
2+
3+
This example demonstrates how to acquire a continuous amount of data using the
4+
DAQ device's internal clock. The settings are configured from the Streamlit
5+
panel, and the acquired data is displayed on a graph. An FFT is computed from
6+
the acquired data and displayed on a separate graph. Refer to the panel script
7+
for details: nidaqmx_analog_input_fft_panel.py
8+
"""
9+
10+
import time
11+
from pathlib import Path
12+
from typing import cast
13+
14+
import nidaqmx
15+
import nidaqmx.system
16+
import numpy as np
17+
from nidaqmx.constants import (
18+
AcquisitionType,
19+
TerminalConfiguration,
20+
UsageTypeAI,
21+
)
22+
from nidaqmx.errors import DaqError
23+
from nitypes.waveform import AnalogWaveform
24+
25+
import nipanel
26+
27+
panel_script_path = Path(__file__).with_name("nidaqmx_analog_input_fft_panel.py")
28+
panel = nipanel.create_streamlit_panel(panel_script_path)
29+
panel.set_value("is_running", False)
30+
31+
system = nidaqmx.system.System.local()
32+
33+
available_voltage_channels = []
34+
for dev in system.devices:
35+
for chan in dev.ai_physical_chans:
36+
if UsageTypeAI.VOLTAGE in chan.ai_meas_types:
37+
available_voltage_channels.append(chan.name)
38+
panel.set_value("available_voltage_channels", available_voltage_channels)
39+
40+
print(f"Panel URL: {panel.panel_url}")
41+
print(f"Waiting for the 'Run' button to be pressed...")
42+
print(f"(Press Ctrl + C to quit)")
43+
44+
try:
45+
while True:
46+
while not panel.get_value("is_running", False):
47+
time.sleep(0.1)
48+
49+
print(f"Running...")
50+
try:
51+
# How to use nidaqmx: https://nidaqmx-python.readthedocs.io/en/stable/
52+
panel.set_value("daq_error", "")
53+
with nidaqmx.Task() as task:
54+
task.ai_channels.add_ai_voltage_chan(
55+
physical_channel=panel.get_value("voltage_channel", ""),
56+
min_val=panel.get_value("voltage_min_value", -5.0),
57+
max_val=panel.get_value("voltage_max_value", 5.0),
58+
terminal_config=panel.get_value(
59+
"terminal_configuration", TerminalConfiguration.DEFAULT
60+
),
61+
)
62+
task.timing.cfg_samp_clk_timing(
63+
rate=panel.get_value("sample_rate_input", 1000.0),
64+
sample_mode=AcquisitionType.CONTINUOUS,
65+
samps_per_chan=panel.get_value("samples_per_channel", 100),
66+
)
67+
panel.set_value("sample_rate", task.timing.samp_clk_rate)
68+
try:
69+
print(f"Starting data acquisition...")
70+
samples_per_channel = panel.get_value("samples_per_channel", 100)
71+
task.start()
72+
73+
while panel.get_value("is_running", False):
74+
waveform = cast(
75+
AnalogWaveform[np.float64],
76+
task.read_waveform(number_of_samples_per_channel=samples_per_channel),
77+
)
78+
panel.set_value("voltage_waveform", waveform)
79+
80+
# calculate the Discrete Fourier Transform
81+
fft = np.fft.fft(waveform.scaled_data)
82+
dft_sample_freqs = np.fft.fftfreq(
83+
waveform.sample_count, d=waveform.timing.sample_interval.total_seconds()
84+
)
85+
86+
# convert to decibels
87+
magnitudes = np.abs(fft)
88+
normalized_magnitudes = magnitudes / np.max(magnitudes)
89+
dbs = 20 * np.log10(normalized_magnitudes)
90+
91+
# only graph up to Nyquist
92+
num_freqs_to_graph = samples_per_channel // 2
93+
panel.set_value("fft_freqs", dft_sample_freqs[0:num_freqs_to_graph])
94+
panel.set_value("fft_mags", dbs[0:num_freqs_to_graph])
95+
except KeyboardInterrupt:
96+
raise
97+
finally:
98+
print(f"Stopping data acquisition...")
99+
task.stop()
100+
panel.set_value("is_running", False)
101+
print(f"Stopped")
102+
103+
except DaqError as e:
104+
daq_error = str(e)
105+
panel.set_value("daq_error", daq_error)
106+
panel.set_value("is_running", False)
107+
print(f"ERRORED")
108+
109+
except KeyboardInterrupt:
110+
pass
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
"""Streamlit visualization script to display data acquired by nidaqmx_continuous_analog_input.py."""
2+
3+
from typing import cast
4+
5+
import hightime as ht
6+
import streamlit as st
7+
from nidaqmx.constants import (
8+
TerminalConfiguration,
9+
)
10+
from nitypes.waveform import AnalogWaveform
11+
from streamlit_echarts import st_echarts
12+
13+
import nipanel
14+
from nipanel.controls import enum_selectbox
15+
16+
st.set_page_config(
17+
page_title="NI-DAQmx - Analog Input - Voltage with FFT", page_icon="📈", layout="wide"
18+
)
19+
20+
st.markdown(
21+
"""
22+
<style>
23+
div[data-baseweb="select"] {
24+
max-width: 250px !important;
25+
}
26+
div.stNumberInput {
27+
max-width: 250px !important;
28+
}
29+
div.stTextInput {
30+
max-width: 250px !important;
31+
}
32+
</style>
33+
""",
34+
unsafe_allow_html=True,
35+
)
36+
37+
38+
def _click_start() -> None:
39+
panel.set_value("is_running", True)
40+
41+
42+
def _click_stop() -> None:
43+
panel.set_value("is_running", False)
44+
45+
46+
panel = nipanel.get_streamlit_panel_accessor()
47+
48+
st.header("NI-DAQmx - Analog Input - Voltage with FFT")
49+
if panel.get_value("is_running", False):
50+
st.button(r"⏹️ Stop", key="stop_button", on_click=_click_stop)
51+
else:
52+
st.button(r"▶️ Run", key="run_button", on_click=_click_start)
53+
54+
sample_rate = panel.get_value("sample_rate", 0.0)
55+
56+
# Create two-column layout for the entire interface
57+
left_column, right_column = st.columns([1, 1])
58+
59+
# Left column - Channel tabs and Timing Settings
60+
with left_column:
61+
# Channel Settings tabs
62+
with st.container(border=True):
63+
st.header("Channel Settings")
64+
voltage_tab = st.tabs(["Voltage"])[0]
65+
66+
voltage_tab.header("Voltage")
67+
with voltage_tab:
68+
channel_left_column, channel_right_column = st.columns(2)
69+
with channel_left_column:
70+
st.selectbox(
71+
options=panel.get_value("available_voltage_channels", [""]),
72+
index=0,
73+
label="Voltage Channel",
74+
disabled=panel.get_value("is_running", False),
75+
key="voltage_channel",
76+
)
77+
st.number_input(
78+
"Min Value",
79+
value=-5.0,
80+
step=0.1,
81+
disabled=panel.get_value("is_running", False),
82+
key="voltage_min_value",
83+
)
84+
st.number_input(
85+
"Max Value",
86+
value=5.0,
87+
step=0.1,
88+
disabled=panel.get_value("is_running", False),
89+
key="voltage_max_value",
90+
)
91+
with channel_right_column:
92+
enum_selectbox(
93+
panel,
94+
label="Terminal Configuration",
95+
value=TerminalConfiguration.DEFAULT,
96+
disabled=panel.get_value("is_running", False),
97+
key="terminal_configuration",
98+
)
99+
100+
# Timing Settings section in left column
101+
with st.container(border=True):
102+
st.header("Timing Settings")
103+
timing_left_column, timing_right_column = st.columns(2)
104+
with timing_left_column:
105+
st.selectbox(
106+
options=["OnboardClock"],
107+
label="Sample Clock Source",
108+
disabled=True,
109+
)
110+
st.number_input(
111+
"Sample Rate",
112+
value=1000.0,
113+
step=100.0,
114+
min_value=1.0,
115+
disabled=panel.get_value("is_running", False),
116+
key="sample_rate_input",
117+
)
118+
with timing_right_column:
119+
st.number_input(
120+
"Samples to Read",
121+
value=1000,
122+
step=100,
123+
min_value=10,
124+
disabled=panel.get_value("is_running", False),
125+
key="samples_per_channel",
126+
)
127+
st.text_input(
128+
label="Actual Sample Rate",
129+
value=str(sample_rate) if sample_rate else "",
130+
disabled=True,
131+
)
132+
133+
# Right column - Graph
134+
with right_column:
135+
if panel.get_value("daq_error", "") != "":
136+
st.error(
137+
f"There was an error running the script. Fix the issue and click Run again. \n\n {panel.get_value('daq_error', '')}"
138+
)
139+
else:
140+
with st.container(border=True):
141+
# Voltage Data Graph section
142+
st.header("Acquired Data")
143+
144+
voltage_waveform = panel.get_value("voltage_waveform", AnalogWaveform())
145+
if voltage_waveform.sample_count == 0:
146+
time_labels = ["00:00:00.000"]
147+
else:
148+
timestamps = cast(
149+
list[ht.datetime],
150+
list(voltage_waveform.timing.get_timestamps(0, voltage_waveform.sample_count)),
151+
)
152+
time_labels = [
153+
f"{ts.hour:02d}:{ts.minute:02d}:{ts.second:02d}.{ts.microsecond//1000:03d}"
154+
for ts in timestamps
155+
]
156+
157+
voltage_graph = {
158+
"animation": False,
159+
"tooltip": {"trigger": "axis"},
160+
"legend": {"data": [voltage_waveform.units]},
161+
"xAxis": {
162+
"type": "category",
163+
"data": time_labels,
164+
"name": "Time",
165+
"nameLocation": "center",
166+
"nameGap": 40,
167+
},
168+
"yAxis": {
169+
"type": "value",
170+
"name": "Measurement",
171+
"nameRotate": 90,
172+
"nameLocation": "center",
173+
"nameGap": 40,
174+
},
175+
"series": [
176+
{
177+
"name": voltage_waveform.units,
178+
"type": "line",
179+
"data": list(voltage_waveform.scaled_data),
180+
"emphasis": {"focus": "series"},
181+
"smooth": True,
182+
"seriesLayoutBy": "row",
183+
},
184+
],
185+
}
186+
st_echarts(options=voltage_graph, height="300px", key="voltage_graph")
187+
188+
frequencies = panel.get_value("fft_freqs", [0.0])
189+
magnitudes = panel.get_value("fft_mags", [0.0])
190+
fft_data = [{"value": [x, y]} for x, y in zip(frequencies, magnitudes)]
191+
192+
fft_graph = {
193+
"animation": False,
194+
"tooltip": {"trigger": "axis"},
195+
"legend": {"data": ["Magnitude"]},
196+
"xAxis": {
197+
"type": "value",
198+
"name": "Frequency",
199+
"nameLocation": "center",
200+
"nameGap": 40,
201+
},
202+
"yAxis": {
203+
"type": "value",
204+
"name": "Magnitude",
205+
"nameRotate": 90,
206+
"nameLocation": "center",
207+
"nameGap": 40,
208+
},
209+
"series": [
210+
{
211+
"name": "Magnitude",
212+
"type": "line",
213+
"data": fft_data,
214+
"emphasis": {"focus": "series"},
215+
"smooth": True,
216+
"seriesLayoutBy": "row",
217+
},
218+
],
219+
}
220+
st_echarts(options=fft_graph, height="300px", key="fft_graph")

0 commit comments

Comments
 (0)