'National Instrument Long Monitoring Python
I'm building some scripting routines via the nidaqmx module developed by NI. I use 2 NI PXI 44-98 (32 channels) acquisition cards.
We would like to develop a monitoring experiment over long periods (10 hours) with a sampling rate of 200k Hz.
For the moment, I'm struggling with the intrinsic limits of python and the logic of the Nidaqmx module.
I have so far coded a continuous acquisition routine with a limited number of sensors.
import nidaqmx
from nidaqmx.constants import AcquisitionType
import matplotlib.pyplot as plt
import numpy as np
sample_time = 600 # units = seconds
s_freq = 200000
num_samples = sample_time*s_freq
dt = 1/s_freq
print('go acquisition !')
with nidaqmx.Task() as task:
task.ai_channels.add_ai_accel_chan("PXI1Slot2_3/ai1:3",
sensitivity=10000.0,
max_val=1,
min_val=-1)
task.timing.cfg_samp_clk_timing(s_freq,
sample_mode = AcquisitionType.CONTINUOUS)
data = task.read(number_of_samples_per_channel=num_samples, timeout = nidaqmx.constants.WAIT_INFINITELY)
print('I do it right !')
But with this routine, which is very simple, I can't record monitoring > 10 min. The memory of python is not enough to allow it. And this is totally logical for me.
I tchek the buffer logic on NI website, but I didn't clearly understand how I can implement it here...
I can't understand how I can fit in this little routine a write to disk every X MB of data recorded by the task, while still monitoring and emptying the "data" directory to avoid an overflow, and I didn't see on stackoverflow some right answer in my case.
If you have already encountered this problem and you have the solution, I am interested,
Thanks for reading
Solution 1:[1]
I finally outcome my problems, here a solution for large array of sensor with high frequency rate to be continuously monitored.
import matplotlib.pyplot as plt
import numpy as np
import nidaqmx
from nidaqmx.stream_readers import AnalogMultiChannelReader
from nidaqmx import constants
import threading
from datetime import datetime
import os
# Parameters
sampling_freq_in = 200000 # in Hz
buffer_in_size = 800000
bufsize_callback = 200000
buffer_in_size_cfg = round(buffer_in_size) * 10 # clock configuration * 10 ?
chans_in = 32 # number of chan
refresh_rate_plot = 100000 # in Hz
crop = 0 # number of seconds to drop at acquisition start before saving
# Initialize data placeholders
buffer_in = np.zeros((chans_in, buffer_in_size))
data = np.zeros(
(chans_in, 1)) # will contain a first column with zeros but that's fine
print(data.size)
# Definitions of basic functions
def ask_user():
global running
input("Press ENTER/RETURN to stop acquisition and coil drivers.")
running = False
def cfg_read_task(acquisition):
acquisition.ai_channels.ai_gain = 100
acquisition.ai_channels.ai_max = 100
acquisition.ai_channels.add_ai_accel_chan("Dev3/ai0:15",
sensitivity=1000.0,
max_val=1,
min_val=-1) # Cards1
acquisition.ai_channels.add_ai_accel_chan("Dev4/ai0:15",
sensitivity=1000.0,
max_val=1,
min_val=-1) # Cards2
acquisition.timing.cfg_samp_clk_timing(rate=sampling_freq_in,
sample_mode=constants.AcquisitionType.CONTINUOUS,
samps_per_chan=buffer_in_size_cfg)
def reading_task_callback(task_idx, event_type, num_samples, callback_data):
global data
global buffer_in
if running:
path = r'D:\Experience\caca\Acc/'
isExist = os.path.exists(path)
if not isExist:
os.makedirs(path)
buffer_in = np.zeros((chans_in, num_samples)) # double definition ???
stream_in.read_many_sample(buffer_in, num_samples,
timeout=constants.WAIT_INFINITELY)
data = np.append(data, buffer_in,
axis=1) # appends buffered data to total variable data
filename = path + 'Acc_' + str(
datetime.now().strftime("%m%d%h%H%M%S%f"))
extension = '.npy'
np.save(filename + extension, data)
# f=np.fft.rfftfreq(data[4][1::].size,d=1/100000)
# P=abs(np.fft.rfft(data[4][1::]))
# plt.plot(f[f>200] ,P[f>200],'k')
# plt.xlim(100,10000)
# plt.plot(data[0][1::],'k--')
data = np.zeros((chans_in, 1))
return 0
# Configure and setup the tasks
task_in = nidaqmx.Task()
cfg_read_task(task_in)
stream_in = AnalogMultiChannelReader(task_in.in_stream)
task_in.register_every_n_samples_acquired_into_buffer_event(bufsize_callback,
reading_task_callback)
# Start threading to prompt user to stop
thread_user = threading.Thread(target=ask_user)
thread_user.start()
# Main loop
running = True
time_start = datetime.now()
task_in.start()
# Plot a visual feedback for the user's mental health
# f, (ax1, ax2, ax3) = plt.subplots(3, 1, sharex='all', sharey='none')
while running: # make this adapt to number of channels automatically
a = 0
# ax1.clear()
# ax2.clear()
# ax3.clear()
# ax1.plot(data[0, -sampling_freq_in * 5:].T,'r') # 5 seconds rolling window
# ax2.plot(data[1, -sampling_freq_in * 5:].T,'k')
# ax3.plot(data[2, -sampling_freq_in * 5:].T,'b')
# Label and axis formatting
# ax3.set_xlabel('time [s]')
# ax1.set_ylabel('m/s**2')
# ax2.set_ylabel('m/s**2')
# ax3.set_ylabel('m/s**2')
# xticks = np.arange(0, data[0, -sampling_freq_in * 5:].size, sampling_freq_in)
# xticklabels = np.arange(0, xticks.size, 1)
# ax3.set_xticks(xticks)
# ax3.set_xticklabels(xticklabels)
# plt.pause(1/refresh_rate_plot) # required for dynamic plot to work (if too low, nulling performance bad)
# Close task to clear connection once done
task_in.close()
duration = datetime.now() - time_start
print(duration, 'Pan t es mort')
experience_file = [duration, sampling_freq_in]
np.save('info.npy', experience_file)
# savefile:
# Some messages at the end
# print("\n")
# print("OPM acquisition ended.\n")
# print("Acquisition duration: {}.".format(duration))
Solution 2:[2]
I don't know if you solved your problem, or stopped trying, either way, this is a basic solution. I am using an ethernet controlled NI 9234 card, and I'm storing my data with csv write:
# This works for a NI9234 card
import nidaqmx
from nidaqmx.constants import AcquisitionType
from nidaqmx import stream_readers
import numpy as np
import csv
import time
from datetime import datetime
sample_rate = 2048 # Usually sample rate goes 2048 Hz, 2560 Hz, 3200 Hz, 5120 Hz, 6400 Hz (at least for a NI9234 card)
samples_to_acq = 2048 # At least for vibration matters, Samples to acq go from 2048, 4096, 8192
wait_time = samples_to_acq/sample_rate # Data Acquisition Time in s, very important for the NI task function, and since we have 2048 on both sides, time is 1 s
cont_mode = AcquisitionType.CONTINUOUS # There is also FINITE for sporadic measurements
iterations = 10
with nidaqmx.Task() as task:
now = datetime.now()
military = now.strftime('%H:%M:%S') # Just recording the time like 20:32:54 instead of 8:32:54 pm
first_header = ['Some title in here']
second_header = [f'T. Captura: {military}']
# Create accelerometer channel and configure sample clock. current_excit_val=0.002 enables IEPE, velocity measured in ips and accel in m/s2,
# both prox and tachometer can be used with a simple voltage channel.
# For more info about channels: https://nidaqmx-python.readthedocs.io/en/latest/ai_channel_collection.html
# Two accelerometer channels
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai0", sensitivity = 100, current_excit_val = 0.002)
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai1", sensitivity = 100, current_excit_val = 0.002)
# Two voltage channels
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai2")
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai3")
total_wait_time = wait_time * iterations # We will only take 10 measurements, 10 s for this example
samples_to_acq_new = samples_to_acq * iterations # Also multiply by 10 to keep the same ratio, it should be
# Sets source of sample clock, its rate, and number of samples to aquire = buffer size
task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = cont_mode, samps_per_chan = samples_to_acq_new)
start = time.time()
print ('Starting task...') # Just to keep a control in your console
# Saving the 4 channels to a csv file. This file is overwritten everytime the program is executed.
# It should appear in the same folder that your program is located.
with open('data_10times.csv', 'w', newline = '') as f:
writer = csv.writer(f)
writer.writerow(first_header)
writer.writerow(second_header)
# adding some blank spaces in btw
writer.writerow('')
writer.writerow('')
x = np.linspace(0, total_wait_time, samples_to_acq_new) # Your x axis (ms), starts from 0, final time is total_wait_time, equally divided by the number of samples you'll capture
data = np.ndarray((4, samples_to_acq_new), dtype = np.float64) #Creates an array, 4 columns each one of 20480 rows
nidaqmx.stream_readers.AnalogMultiChannelReader(task.in_stream).read_many_sample(data, samples_to_acq_new, timeout = 14) # it should't take that long for this example, check out time for other exercises
for value in range(len(x)):
writer.writerow([x[value], data[0][value], data[1][value], data[2][value], data[3][value]])
elapsed_time = (time.time() - start)
print (f'done in {elapsed_time}')
so... 5 months later, but I hope it might help.
Solution 3:[3]
I just wrote an answer to a related question, which asks for a way to store the acquired samples into a HDF5 file. I wrote an example code that basically loops over the internal buffer of the National Instruments DAQ and consequently adds these samples into a HDF5 file.
The example is well suited for long acquisitions and gives room for some online post-processing and preview tasks. Also the use of HDF5 files is superior to text files as such long recordings tend to grow large. File handling and data access is a nuisance with 20Gb and larger text files.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | marc_s |
Solution 2 | Eduardo |
Solution 3 |