'National Instrument Long Monitoring Python

I'm building some scripting routines via the nidaqmx module developed by NI. I use 2 NI PXI 44-98 (32 channels) acquisition cards.

We would like to develop a monitoring experiment over long periods (10 hours) with a sampling rate of 200k Hz.

For the moment, I'm struggling with the intrinsic limits of python and the logic of the Nidaqmx module.

I have so far coded a continuous acquisition routine with a limited number of sensors.

import nidaqmx
from nidaqmx.constants import AcquisitionType
import matplotlib.pyplot as plt
import numpy as np

sample_time = 600  # units = seconds
s_freq = 200000
num_samples = sample_time*s_freq
dt = 1/s_freq
print('go acquisition !')
with nidaqmx.Task() as task:
    task.ai_channels.add_ai_accel_chan("PXI1Slot2_3/ai1:3",
                                       sensitivity=10000.0,
                                       max_val=1,
                                       min_val=-1)
    task.timing.cfg_samp_clk_timing(s_freq,
                                   sample_mode = AcquisitionType.CONTINUOUS)
    data = task.read(number_of_samples_per_channel=num_samples, timeout = nidaqmx.constants.WAIT_INFINITELY)
print('I do it right !')

But with this routine, which is very simple, I can't record monitoring > 10 min. The memory of python is not enough to allow it. And this is totally logical for me.

I tchek the buffer logic on NI website, but I didn't clearly understand how I can implement it here...

I can't understand how I can fit in this little routine a write to disk every X MB of data recorded by the task, while still monitoring and emptying the "data" directory to avoid an overflow, and I didn't see on stackoverflow some right answer in my case.

If you have already encountered this problem and you have the solution, I am interested,

Thanks for reading



Solution 1:[1]

I finally outcome my problems, here a solution for large array of sensor with high frequency rate to be continuously monitored.

import matplotlib.pyplot as plt
import numpy as np
import nidaqmx
from nidaqmx.stream_readers import AnalogMultiChannelReader
from nidaqmx import constants
import threading
from datetime import datetime
import os

# Parameters
sampling_freq_in = 200000  # in Hz
buffer_in_size = 800000
bufsize_callback = 200000
buffer_in_size_cfg = round(buffer_in_size) * 10  # clock configuration * 10 ?
chans_in = 32  # number of chan
refresh_rate_plot = 100000  # in Hz
crop = 0  # number of seconds to drop at acquisition start before saving

# Initialize data placeholders
buffer_in = np.zeros((chans_in, buffer_in_size))
data = np.zeros(
    (chans_in, 1))  # will contain a first column with zeros but that's fine
print(data.size)


# Definitions of basic functions
def ask_user():
    global running
    input("Press ENTER/RETURN to stop acquisition and coil drivers.")
    running = False


def cfg_read_task(acquisition):
    acquisition.ai_channels.ai_gain = 100
    acquisition.ai_channels.ai_max = 100
    acquisition.ai_channels.add_ai_accel_chan("Dev3/ai0:15",
                                              sensitivity=1000.0,
                                              max_val=1,
                                              min_val=-1)  # Cards1
    acquisition.ai_channels.add_ai_accel_chan("Dev4/ai0:15",
                                              sensitivity=1000.0,
                                              max_val=1,
                                              min_val=-1)  # Cards2
    acquisition.timing.cfg_samp_clk_timing(rate=sampling_freq_in,
                                           sample_mode=constants.AcquisitionType.CONTINUOUS,
                                           samps_per_chan=buffer_in_size_cfg)


def reading_task_callback(task_idx, event_type, num_samples, callback_data):
    global data
    global buffer_in
    if running:
        path = r'D:\Experience\caca\Acc/'
        isExist = os.path.exists(path)
        if not isExist:
            os.makedirs(path)
        buffer_in = np.zeros((chans_in, num_samples))  # double definition ???
        stream_in.read_many_sample(buffer_in, num_samples,
                                   timeout=constants.WAIT_INFINITELY)
        data = np.append(data, buffer_in,
                         axis=1)  # appends buffered data to total variable data
        filename = path + 'Acc_' + str(
            datetime.now().strftime("%m%d%h%H%M%S%f"))
        extension = '.npy'
        np.save(filename + extension, data)
        # f=np.fft.rfftfreq(data[4][1::].size,d=1/100000)
        # P=abs(np.fft.rfft(data[4][1::]))
        # plt.plot(f[f>200] ,P[f>200],'k')
        # plt.xlim(100,10000)
        # plt.plot(data[0][1::],'k--')
        data = np.zeros((chans_in, 1))
    return 0


# Configure and setup the tasks
task_in = nidaqmx.Task()
cfg_read_task(task_in)
stream_in = AnalogMultiChannelReader(task_in.in_stream)
task_in.register_every_n_samples_acquired_into_buffer_event(bufsize_callback,
                                                            reading_task_callback)

# Start threading to prompt user to stop
thread_user = threading.Thread(target=ask_user)
thread_user.start()

# Main loop
running = True
time_start = datetime.now()
task_in.start()

# Plot a visual feedback for the user's mental health

# f, (ax1, ax2, ax3) = plt.subplots(3, 1, sharex='all', sharey='none')
while running:  # make this adapt to number of channels automatically
    a = 0
    # ax1.clear()
    # ax2.clear()
    # ax3.clear()
    # ax1.plot(data[0, -sampling_freq_in * 5:].T,'r')  # 5 seconds rolling window
    # ax2.plot(data[1, -sampling_freq_in * 5:].T,'k')
    # ax3.plot(data[2, -sampling_freq_in * 5:].T,'b')
# Label and axis formatting
# ax3.set_xlabel('time [s]')
# ax1.set_ylabel('m/s**2')
# ax2.set_ylabel('m/s**2')
# ax3.set_ylabel('m/s**2')
# xticks = np.arange(0, data[0, -sampling_freq_in * 5:].size, sampling_freq_in)
# xticklabels = np.arange(0, xticks.size, 1)
# ax3.set_xticks(xticks)
# ax3.set_xticklabels(xticklabels)
# plt.pause(1/refresh_rate_plot)  # required for dynamic plot to work (if too low, nulling performance bad)
# Close task to clear connection once done
task_in.close()
duration = datetime.now() - time_start
print(duration, 'Pan t es mort')
experience_file = [duration, sampling_freq_in]
np.save('info.npy', experience_file)

# savefile:


# Some messages at the end
# print("\n")
# print("OPM acquisition ended.\n")
# print("Acquisition duration: {}.".format(duration))

Solution 2:[2]

I don't know if you solved your problem, or stopped trying, either way, this is a basic solution. I am using an ethernet controlled NI 9234 card, and I'm storing my data with csv write:

# This works for a NI9234 card
import nidaqmx
from nidaqmx.constants import AcquisitionType
from nidaqmx import stream_readers
import numpy as np
import csv
import time
from datetime import datetime

sample_rate = 2048                              # Usually sample rate goes 2048 Hz, 2560 Hz, 3200 Hz, 5120 Hz, 6400 Hz (at least for a NI9234 card) 
samples_to_acq = 2048                           # At least for vibration matters, Samples to acq go from 2048, 4096, 8192
wait_time = samples_to_acq/sample_rate          # Data Acquisition Time in s, very important for the NI task function, and since we have 2048 on both sides, time is 1 s
cont_mode = AcquisitionType.CONTINUOUS              # There is also FINITE for sporadic measurements
iterations = 10
    
with nidaqmx.Task() as task:
    now = datetime.now()
    military = now.strftime('%H:%M:%S')     # Just recording the time like 20:32:54 instead of 8:32:54 pm
    first_header = ['Some title in here']
    second_header = [f'T. Captura: {military}']

# Create accelerometer channel and configure sample clock. current_excit_val=0.002 enables IEPE, velocity measured in ips and accel in m/s2, 
# both prox and tachometer can be used with a simple voltage channel.
# For more info about channels: https://nidaqmx-python.readthedocs.io/en/latest/ai_channel_collection.html

# Two accelerometer channels
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai0", sensitivity = 100, current_excit_val = 0.002)
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai1", sensitivity = 100, current_excit_val = 0.002)

# Two voltage channels
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai2")
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai3")

total_wait_time = wait_time * iterations                         # We will only take 10 measurements, 10 s for this example
samples_to_acq_new = samples_to_acq * iterations                 # Also multiply by 10 to keep the same ratio, it should be 

# Sets source of sample clock, its rate, and number of samples to aquire = buffer size
task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = cont_mode, samps_per_chan = samples_to_acq_new)               
  
start = time.time()
print ('Starting task...')         # Just to keep a control in your console

# Saving the 4 channels to a csv file. This file is overwritten everytime the program is executed.
# It should appear in the same folder that your program is located. 
with open('data_10times.csv', 'w', newline = '') as f:
    writer = csv.writer(f)
    writer.writerow(first_header)
    writer.writerow(second_header)
    
    # adding some blank spaces in btw
    writer.writerow('')
    writer.writerow('')        
    
    x = np.linspace(0, total_wait_time, samples_to_acq_new)       # Your x axis (ms), starts from 0, final time is total_wait_time, equally divided by the number of samples you'll capture
    
    data = np.ndarray((4, samples_to_acq_new), dtype = np.float64)  #Creates an array, 4 columns each one of 20480 rows
    nidaqmx.stream_readers.AnalogMultiChannelReader(task.in_stream).read_many_sample(data, samples_to_acq_new, timeout = 14) # it should't take that long for this example, check out time for other exercises               
    
    for value in range(len(x)):
        writer.writerow([x[value], data[0][value], data[1][value], data[2][value], data[3][value]])

elapsed_time = (time.time() - start)
print (f'done in {elapsed_time}')

so... 5 months later, but I hope it might help.

Solution 3:[3]

I just wrote an answer to a related question, which asks for a way to store the acquired samples into a HDF5 file. I wrote an example code that basically loops over the internal buffer of the National Instruments DAQ and consequently adds these samples into a HDF5 file.

The example is well suited for long acquisitions and gives room for some online post-processing and preview tasks. Also the use of HDF5 files is superior to text files as such long recordings tend to grow large. File handling and data access is a nuisance with 20Gb and larger text files.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 marc_s
Solution 2 Eduardo
Solution 3