# Do not edit this file; it was automatically generated.
import ctypes
import numpy
from nidaqmx._lib import lib_importer, ctypes_byte_str, c_bool32
from nidaqmx._task_modules.read_functions import _read_raw
from nidaqmx.errors import check_for_error, is_string_buffer_too_small
from nidaqmx._task_modules.channels.channel import Channel
from nidaqmx.utils import unflatten_channel_string
from nidaqmx.constants import (
AcquisitionType, LoggingMode, LoggingOperation, OverwriteMode,
READ_ALL_AVAILABLE, ReadRelativeTo, WaitMode)
[docs]class InStream(object):
"""
Exposes an input data stream on a DAQmx task.
The input data stream be used to control reading behavior and can be
used in conjunction with reader classes to read samples from an
NI-DAQmx task.
"""
def __init__(self, task):
self._task = task
self._handle = task._handle
self._timeout = 10.0
super(InStream, self).__init__()
def __eq__(self, other):
if isinstance(other, self.__class__):
return (self._handle == other._handle and
self._timeout == other._timeout)
return False
def __hash__(self):
return hash((self._handle.value, self._timeout))
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return 'InStream(task={0})'.format(self._task.name)
@property
def timeout(self):
"""
float: Specifies the amount of time in seconds to wait for
samples to become available. If the time elapses, the read
method returns an error and any samples read before the
timeout elapsed. The default timeout is 10 seconds. If you
set timeout to nidaqmx.WAIT_INFINITELY, the read method
waits indefinitely. If you set timeout to 0, the read method
tries once to read the requested samples and returns an error
if it is unable to.
"""
return self._timeout
@timeout.setter
def timeout(self, val):
self._timeout = val
@timeout.deleter
def timeout(self):
self._timeout = 10.0
@property
def accessory_insertion_or_removal_detected(self):
"""
bool: Indicates if any device(s) in the task detected the
insertion or removal of an accessory since the task started.
Reading this property clears the accessory change status for
all channels in the task. You must read this property before
you read **devs_with_inserted_or_removed_accessories**.
Otherwise, you will receive an error.
"""
val = c_bool32()
cfunc = (lib_importer.windll.
DAQmxGetReadAccessoryInsertionOrRemovalDetected)
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def auto_start(self):
"""
bool: Specifies if DAQmx Read automatically starts the task if
you did not start the task explicitly by using DAQmx Start.
The default value is True. When DAQmx Read starts a finite
acquisition task, it also stops the task after reading the
last sample.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadAutoStart
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@auto_start.setter
def auto_start(self, val):
cfunc = lib_importer.windll.DAQmxSetReadAutoStart
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, c_bool32]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@auto_start.deleter
def auto_start(self):
cfunc = lib_importer.windll.DAQmxResetReadAutoStart
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def aux_power_error_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which an auxiliary power supply error condition
has been detected. You must read the Aux Power Error
Channels Exist property before you read this property.
Otherwise, you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetAuxPowerErrorChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def aux_power_error_chans_exist(self):
"""
bool: Indicates if the device(s) detected an auxiliary power
supply error condition for any channel in the task. Reading
this property clears the error condition status for all
channels in the task. You must read this property before you
read the Aux Power Error Channels property. Otherwise, you
will receive an error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetAuxPowerErrorChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def avail_samp_per_chan(self):
"""
int: Indicates the number of samples available to read per
channel. This value is the same for all channels in the
task.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetReadAvailSampPerChan
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def change_detect_overflowed(self):
"""
bool: Indicates if samples were missed because change detection
events occurred faster than the device could handle them.
Some devices detect overflows differently than others.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadChangeDetectHasOverflowed
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def channels_to_read(self):
"""
:class:`nidaqmx._task_modules.channels.channel.Channel`:
Specifies a subset of channels in the task from which to
read.
"""
cfunc = lib_importer.windll.DAQmxGetReadChannelsToRead
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return Channel._factory(self._handle, val.value.decode('ascii'))
@channels_to_read.setter
def channels_to_read(self, val):
val = val.name
cfunc = lib_importer.windll.DAQmxSetReadChannelsToRead
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes_byte_str]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@channels_to_read.deleter
def channels_to_read(self):
cfunc = lib_importer.windll.DAQmxResetReadChannelsToRead
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def common_mode_range_error_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which the device(s) detected a common mode
range violation. You must read
**common_mode_range_error_chans_exist** before you read this
property. Otherwise, you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadCommonModeRangeErrorChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def common_mode_range_error_chans_exist(self):
"""
bool: Indicates if the device(s) detected a common mode range
violation for any virtual channel in the task. Common mode
range violation occurs when the voltage of either the
positive terminal or negative terminal to ground are out of
range. Reading this property clears the common mode range
violation status for all channels in the task. You must read
this property before you read
**common_mode_range_error_chans**. Otherwise, you will
receive an error.
"""
val = c_bool32()
cfunc = (lib_importer.windll.
DAQmxGetReadCommonModeRangeErrorChansExist)
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def curr_read_pos(self):
"""
long: Indicates in samples per channel the current position in
the buffer.
"""
val = ctypes.c_ulonglong()
cfunc = lib_importer.windll.DAQmxGetReadCurrReadPos
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_ulonglong)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def devs_with_inserted_or_removed_accessories(self):
"""
List[str]: Indicates the names of any devices that detected the
insertion or removal of an accessory since the task started.
You must read **accessory_insertion_or_removal_detected**
before you read this property. Otherwise, you will receive
an error.
"""
cfunc = (lib_importer.windll.
DAQmxGetReadDevsWithInsertedOrRemovedAccessories)
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def di_num_booleans_per_chan(self):
"""
int: Indicates the number of booleans per channel that NI-DAQmx
returns in a sample for line-based reads. If a channel has
fewer lines than this number, the extra booleans are False.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetReadDigitalLinesBytesPerChan
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def excit_fault_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which the device(s) detected an excitation
fault condition. You must read **excit_fault_chans_exist**
before you read this property. Otherwise, you will receive
an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadExcitFaultChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def excit_fault_chans_exist(self):
"""
bool: Indicates if the device(s) detected an excitation fault
condition for any virtual channel in the task. Reading this
property clears the excitation fault status for all channels
in the task. You must read this property before you read
**excit_fault_chans**. Otherwise, you will receive an error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadExcitFaultChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def input_buf_size(self):
"""
int: Specifies the number of samples the input buffer can hold
for each channel in the task. Zero indicates to allocate no
buffer. Use a buffer size of 0 to perform a hardware-timed
operation without using a buffer. Setting this property
overrides the automatic input buffer allocation that NI-
DAQmx performs.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetBufInputBufSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@input_buf_size.setter
def input_buf_size(self, val):
cfunc = lib_importer.windll.DAQmxSetBufInputBufSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_uint]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@input_buf_size.deleter
def input_buf_size(self):
cfunc = lib_importer.windll.DAQmxResetBufInputBufSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def input_limits_fault_chans(self):
"""
List[str]: Indicates the virtual channels that have detected
samples outside the upper or lower limits configured for
each channel in the task. You must read
**input_limits_fault_chans_exist** before you read this
property. Otherwise, you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadInputLimitsFaultChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def input_limits_fault_chans_exist(self):
"""
bool: Indicates if the device or devices detected a sample that
was outside the upper or lower limits configured for each
channel in the task. Reading this property clears the input
limits fault channel status for all channels in the task.
You must read this property before you read
**input_limits_fault_chans**. Otherwise, you will receive an
error. Note: Fault detection applies to both positive and
negative inputs. For instance, if you specify a lower limit
of 2 mA and an upper limit of 12 mA, NI-DAQmx detects a
fault at 15 mA and -15 mA, but not at -6 mA because it is in
the range of -12 mA to -2 mA.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadInputLimitsFaultChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def input_onbrd_buf_size(self):
"""
int: Indicates in samples per channel the size of the onboard
input buffer of the device.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetBufInputOnbrdBufSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def logging_file_path(self):
"""
str: Specifies the path to the TDMS file to which you want to
log data. If the file path is changed while the task is
running, this takes effect on the next sample interval (if
Logging.SampsPerFile has been set) or when DAQmx Start New
File is called. New file paths can be specified by ending
with "\\" or "/". Files created after specifying a new file
path retain the same name and numbering sequence.
"""
cfunc = lib_importer.windll.DAQmxGetLoggingFilePath
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return val.value.decode('ascii')
@logging_file_path.setter
def logging_file_path(self, val):
cfunc = lib_importer.windll.DAQmxSetLoggingFilePath
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes_byte_str]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@logging_file_path.deleter
def logging_file_path(self):
cfunc = lib_importer.windll.DAQmxResetLoggingFilePath
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def logging_file_preallocation_size(self):
"""
long: Specifies a size in samples to be used to pre-allocate
space on disk. Pre-allocation can improve file I/O
performance, especially in situations where multiple files
are being written to disk. For finite tasks, the default
behavior is to pre-allocate the file based on the number of
samples you configure the task to acquire.
"""
val = ctypes.c_ulonglong()
cfunc = lib_importer.windll.DAQmxGetLoggingFilePreallocationSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_ulonglong)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@logging_file_preallocation_size.setter
def logging_file_preallocation_size(self, val):
cfunc = lib_importer.windll.DAQmxSetLoggingFilePreallocationSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_ulonglong]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@logging_file_preallocation_size.deleter
def logging_file_preallocation_size(self):
cfunc = lib_importer.windll.DAQmxResetLoggingFilePreallocationSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def logging_file_write_size(self):
"""
int: Specifies the size, in samples, in which data will be
written to disk. The size must be evenly divisible by the
volume sector size, in bytes.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetLoggingFileWriteSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@logging_file_write_size.setter
def logging_file_write_size(self, val):
cfunc = lib_importer.windll.DAQmxSetLoggingFileWriteSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_uint]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@logging_file_write_size.deleter
def logging_file_write_size(self):
cfunc = lib_importer.windll.DAQmxResetLoggingFileWriteSize
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def logging_mode(self):
"""
:class:`nidaqmx.constants.LoggingMode`: Specifies whether to
enable logging and whether to allow reading data while
logging. Log mode allows for the best performance. However,
you cannot read data while logging if you specify this mode.
If you want to read data while logging, specify Log and Read
mode.
"""
val = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxGetLoggingMode
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(ctypes.c_int)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return LoggingMode(val.value)
@logging_mode.setter
def logging_mode(self, val):
val = val.value
cfunc = lib_importer.windll.DAQmxSetLoggingMode
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@logging_mode.deleter
def logging_mode(self):
cfunc = lib_importer.windll.DAQmxResetLoggingMode
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def logging_pause(self):
"""
bool: Specifies whether logging is paused while a task is
executing. If **logging_mode** is set to Log and Read mode,
this value is taken into consideration on the next call to
DAQmx Read, where data is written to disk. If
**logging_mode** is set to Log Only mode, this value is
taken into consideration the next time that data is written
to disk. A new TDMS group is written when logging is resumed
from a paused state.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetLoggingPause
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@logging_pause.setter
def logging_pause(self, val):
cfunc = lib_importer.windll.DAQmxSetLoggingPause
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, c_bool32]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@logging_pause.deleter
def logging_pause(self):
cfunc = lib_importer.windll.DAQmxResetLoggingPause
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def logging_samps_per_file(self):
"""
long: Specifies how many samples to write to each file. When the
file reaches the number of samples specified, a new file is
created with the naming convention of <filename>_####.tdms,
where #### starts at 0001 and increments automatically with
each new file. For example, if the file specified is
C:\\data.tdms, the next file name used is
C:\\data_0001.tdms. To disable file spanning behavior, set
this attribute to 0. If **logging_file_path** is changed
while this attribute is set, the new file path takes effect
on the next file created.
"""
val = ctypes.c_ulonglong()
cfunc = lib_importer.windll.DAQmxGetLoggingSampsPerFile
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_ulonglong)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@logging_samps_per_file.setter
def logging_samps_per_file(self, val):
cfunc = lib_importer.windll.DAQmxSetLoggingSampsPerFile
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_ulonglong]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@logging_samps_per_file.deleter
def logging_samps_per_file(self):
cfunc = lib_importer.windll.DAQmxResetLoggingSampsPerFile
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def logging_tdms_group_name(self):
"""
str: Specifies the name of the group to create within the TDMS
file for data from this task. If you append data to an
existing file and the specified group already exists, NI-
DAQmx appends a number symbol and a number to the group
name, incrementing that number until finding a group name
that does not exist. For example, if you specify a group
name of Voltage Task, and that group already exists, NI-
DAQmx assigns the group name Voltage Task #1, then Voltage
Task #2.
"""
cfunc = lib_importer.windll.DAQmxGetLoggingTDMSGroupName
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return val.value.decode('ascii')
@logging_tdms_group_name.setter
def logging_tdms_group_name(self, val):
cfunc = lib_importer.windll.DAQmxSetLoggingTDMSGroupName
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes_byte_str]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@logging_tdms_group_name.deleter
def logging_tdms_group_name(self):
cfunc = lib_importer.windll.DAQmxResetLoggingTDMSGroupName
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def logging_tdms_operation(self):
"""
:class:`nidaqmx.constants.LoggingOperation`: Specifies how to
open the TDMS file.
"""
val = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxGetLoggingTDMSOperation
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(ctypes.c_int)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return LoggingOperation(val.value)
@logging_tdms_operation.setter
def logging_tdms_operation(self, val):
val = val.value
cfunc = lib_importer.windll.DAQmxSetLoggingTDMSOperation
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@logging_tdms_operation.deleter
def logging_tdms_operation(self):
cfunc = lib_importer.windll.DAQmxResetLoggingTDMSOperation
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def num_chans(self):
"""
int: Indicates the number of channels that DAQmx Read reads from
the task. This value is the number of channels in the task
or the number of channels you specify with
**channels_to_read**.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetReadNumChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def offset(self):
"""
int: Specifies an offset in samples per channel at which to
begin a read operation. This offset is relative to the
location you specify with **relative_to**.
"""
val = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxGetReadOffset
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(ctypes.c_int)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@offset.setter
def offset(self, val):
cfunc = lib_importer.windll.DAQmxSetReadOffset
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@offset.deleter
def offset(self):
cfunc = lib_importer.windll.DAQmxResetReadOffset
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def open_chans(self):
"""
List[str]: Indicates a list of names of any open virtual
channels. You must read **open_chans_exist** before you read
this property. Otherwise you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadOpenChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def open_chans_details(self):
"""
List[str]: Indicates a list of details of any open virtual
channels. You must read **open_chans_exist** before you read
this property. Otherwise you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadOpenChansDetails
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def open_chans_exist(self):
"""
bool: Indicates if the device or devices detected an open
channel condition in any virtual channel in the task.
Reading this property clears the open channel status for all
channels in this task. You must read this property before
you read **open_chans**. Otherwise, you will receive an
error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadOpenChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def open_current_loop_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which the device(s) detected an open current
loop. You must read **open_current_loop_chans_exist** before
you read this property. Otherwise, you will receive an
error.
"""
cfunc = lib_importer.windll.DAQmxGetReadOpenCurrentLoopChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def open_current_loop_chans_exist(self):
"""
bool: Indicates if the device(s) detected an open current loop
for any virtual channel in the task. Reading this property
clears the open current loop status for all channels in the
task. You must read this property before you read
**open_current_loop_chans**. Otherwise, you will receive an
error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadOpenCurrentLoopChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def open_thrmcpl_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which the device(s) detected an open
thermcouple. You must read **open_thrmcpl_chans_exist**
before you read this property. Otherwise, you will receive
an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadOpenThrmcplChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def open_thrmcpl_chans_exist(self):
"""
bool: Indicates if the device(s) detected an open thermocouple
connected to any virtual channel in the task. Reading this
property clears the open thermocouple status for all
channels in the task. You must read this property before you
read **open_thrmcpl_chans**. Otherwise, you will receive an
error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadOpenThrmcplChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def over_write(self):
"""
:class:`nidaqmx.constants.OverwriteMode`: Specifies whether to
overwrite samples in the buffer that you have not yet read.
"""
val = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxGetReadOverWrite
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(ctypes.c_int)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return OverwriteMode(val.value)
@over_write.setter
def over_write(self, val):
val = val.value
cfunc = lib_importer.windll.DAQmxSetReadOverWrite
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@over_write.deleter
def over_write(self):
cfunc = lib_importer.windll.DAQmxResetReadOverWrite
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def overcurrent_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which the device(s) detected an overcurrent
condition. You must read **overcurrent_chans_exist** before
you read this property. Otherwise, you will receive an
error. On some devices, you must restart the task for all
overcurrent channels to recover.
"""
cfunc = lib_importer.windll.DAQmxGetReadOvercurrentChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def overcurrent_chans_exist(self):
"""
bool: Indicates if the device(s) detected an overcurrent
condition for any virtual channel in the task. Reading this
property clears the overcurrent status for all channels in
the task. You must read this property before you read
**overcurrent_chans**. Otherwise, you will receive an error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadOvercurrentChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def overloaded_chans(self):
"""
List[str]: Indicates a list of names of any overloaded virtual
channels in the task. You must read
**overloaded_chans_exist** before you read this property.
Otherwise, you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadOverloadedChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def overloaded_chans_exist(self):
"""
bool: Indicates if the device(s) detected an overload in any
virtual channel in the task. Reading this property clears
the overload status for all channels in the task. You must
read this property before you read **overloaded_chans**.
Otherwise, you will receive an error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadOverloadedChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def overtemperature_chans(self):
"""
List[str]: Indicates a list of names of any overtemperature
virtual channels. You must read
**overtemperature_chans_exist** before you read this
property. Otherwise, you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadOvertemperatureChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def overtemperature_chans_exist(self):
"""
bool: Indicates if the device(s) detected an overtemperature
condition in any virtual channel in the task. Reading this
property clears the overtemperature status for all channels
in the task. You must read this property before you read
**overtemperature_chans**. Otherwise, you will receive an
error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadOvertemperatureChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def pll_unlocked_chans(self):
"""
List[str]: Indicates the channels that had their PLLs unlock.
"""
cfunc = lib_importer.windll.DAQmxGetReadPLLUnlockedChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def pll_unlocked_chans_exist(self):
"""
bool: Indicates whether the PLL is currently locked, or whether
it became unlocked during the previous acquisition. Devices
may report PLL Unlock either during acquisition or after
acquisition.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadPLLUnlockedChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def power_supply_fault_chans(self):
"""
List[str]: Indicates the virtual channels that have detected a
power supply fault. You must read
**power_supply_fault_chans_exist** before you read this
property. Otherwise, you will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReadPowerSupplyFaultChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def power_supply_fault_chans_exist(self):
"""
bool: Indicates if the device or devices detected a power supply
fault condition in any virtual channel in the task. Reading
this property clears the power supply fault status for all
channels in this task. You must read this property before
you read **power_supply_fault_chans**. Otherwise, you will
receive an error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadPowerSupplyFaultChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def raw_data_width(self):
"""
int: Indicates in bytes the size of a raw sample from the task.
"""
val = ctypes.c_uint()
cfunc = lib_importer.windll.DAQmxGetReadRawDataWidth
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_uint)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def read_all_avail_samp(self):
"""
bool: Specifies whether subsequent read operations read all
samples currently available in the buffer or wait for the
buffer to become full before reading. NI-DAQmx uses this
setting for finite acquisitions and only when the number of
samples to read is -1. For continuous acquisitions when the
number of samples to read is -1, a read operation always
reads all samples currently available in the buffer.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadReadAllAvailSamp
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@read_all_avail_samp.setter
def read_all_avail_samp(self, val):
cfunc = lib_importer.windll.DAQmxSetReadReadAllAvailSamp
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, c_bool32]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@read_all_avail_samp.deleter
def read_all_avail_samp(self):
cfunc = lib_importer.windll.DAQmxResetReadReadAllAvailSamp
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def relative_to(self):
"""
:class:`nidaqmx.constants.ReadRelativeTo`: Specifies the point
in the buffer at which to begin a read operation. If you
also specify an offset with **offset**, the read operation
begins at that offset relative to the point you select with
this property. The default value is
**ReadRelativeTo.CURRENT_READ_POSITION** unless you
configure a Reference Trigger for the task. If you configure
a Reference Trigger, the default value is
**ReadRelativeTo.FIRST_PRETRIGGER_SAMPLE**.
"""
val = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxGetReadRelativeTo
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(ctypes.c_int)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return ReadRelativeTo(val.value)
@relative_to.setter
def relative_to(self, val):
val = val.value
cfunc = lib_importer.windll.DAQmxSetReadRelativeTo
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@relative_to.deleter
def relative_to(self):
cfunc = lib_importer.windll.DAQmxResetReadRelativeTo
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def remote_sense_error_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which a remote sense connection error condition
has been detected. You must read Remote Sense Error Channels
Exist before you read this property. Otherwise, you will
receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetRemoteSenseErrorChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def remote_sense_error_chans_exist(self):
"""
bool: Indicates if the device(s) detected an error condition of
the remote sense connection for any channel in the task. You
must disable the output and resolve the hardware connection
issue to clear the error condition. You must read this
property before you read the Remote Sense Error Channels
property. Otherwise, you will receive an error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetRemoteSenseErrorChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def reverse_voltage_error_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which reverse voltage error condition has been
detected. You must read the Reverse Voltage Error Channels
Exist property before you read this property. Otherwise, you
will receive an error.
"""
cfunc = lib_importer.windll.DAQmxGetReverseVoltageErrorChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def reverse_voltage_error_chans_exist(self):
"""
bool: Indicates if the device(s) detected reverse voltage error
for any channel in the task. Reverse voltage error will
occured if the local voltage is equal to negative saturated
voltage. Reading this property clears the error condition
status for all channels in the task. You must read this
property before you read the Reverse Voltage Error Channels
property. Otherwise, you will receive an error.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReverseVoltageErrorChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def sleep_time(self):
"""
float: Specifies in seconds the amount of time to sleep after
checking for available samples if **wait_mode** is
**WaitMode.SLEEP**.
"""
val = ctypes.c_double()
cfunc = lib_importer.windll.DAQmxGetReadSleepTime
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_double)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@sleep_time.setter
def sleep_time(self, val):
cfunc = lib_importer.windll.DAQmxSetReadSleepTime
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_double]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@sleep_time.deleter
def sleep_time(self):
cfunc = lib_importer.windll.DAQmxResetReadSleepTime
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
@property
def sync_unlocked_chans(self):
"""
List[str]: Indicates the channels from devices in an unlocked
target.
"""
cfunc = lib_importer.windll.DAQmxGetReadSyncUnlockedChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_char_p,
ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._handle, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
@property
def sync_unlocked_chans_exist(self):
"""
bool: Indicates whether the target is currently locked to the
grand master. Devices may report PLL Unlock either during
acquisition or after acquisition.
"""
val = c_bool32()
cfunc = lib_importer.windll.DAQmxGetReadSyncUnlockedChansExist
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(c_bool32)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def total_samp_per_chan_acquired(self):
"""
long: Indicates the total number of samples acquired by each
channel. NI-DAQmx returns a single value because this value
is the same for all channels. For retriggered acquisitions,
this value is the cumulative number of samples across all
retriggered acquisitions.
"""
val = ctypes.c_ulonglong()
cfunc = lib_importer.windll.DAQmxGetReadTotalSampPerChanAcquired
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle,
ctypes.POINTER(ctypes.c_ulonglong)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return val.value
@property
def wait_mode(self):
"""
:class:`nidaqmx.constants.WaitMode`: Specifies how DAQmx Read
waits for samples to become available.
"""
val = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxGetReadWaitMode
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.POINTER(ctypes.c_int)]
error_code = cfunc(
self._handle, ctypes.byref(val))
check_for_error(error_code)
return WaitMode(val.value)
@wait_mode.setter
def wait_mode(self, val):
val = val.value
cfunc = lib_importer.windll.DAQmxSetReadWaitMode
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int]
error_code = cfunc(
self._handle, val)
check_for_error(error_code)
@wait_mode.deleter
def wait_mode(self):
cfunc = lib_importer.windll.DAQmxResetReadWaitMode
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle]
error_code = cfunc(
self._handle)
check_for_error(error_code)
def _calculate_num_samps_per_chan(self, num_samps_per_chan):
if num_samps_per_chan == -1:
acq_type = self._task.timing.samp_quant_samp_mode
if (acq_type == AcquisitionType.FINITE and
not self.read_all_avail_samp):
return self._task.timing.samp_quant_samp_per_chan
else:
return self.avail_samp_per_chan
else:
return num_samps_per_chan
[docs] def read(self, number_of_samples_per_channel=READ_ALL_AVAILABLE):
"""
Reads raw samples from the task or virtual channels you specify.
Raw samples constitute the internal representation of samples in a
device, read directly from the device or buffer without scaling or
reordering. The native format of a device can be an 8-, 16-, or
32-bit integer, signed or unsigned.
NI-DAQmx does not separate raw data into channels. It returns data
in an interleaved or non-interleaved 1D array, depending on the
raw ordering of the device. Refer to your device documentation for
more information.
This method determines a NumPy array of appropriate size and data
type to create and return based on your device specifications.
Use the "timeout" property on the stream to specify the amount of
time in seconds to wait for samples to become available. If the
time elapses, the method returns an error and any samples read
before the timeout elapsed. The default timeout is 10 seconds.
If you set timeout to nidaqmx.WAIT_INFINITELY, the method waits
indefinitely. If you set timeout to 0, the method tries once to
read the requested samples and returns an error if it is unable
to.
Args:
number_of_samples_per_channel (int): Specifies the number of
samples to read.
If you set this input to nidaqmx.READ_ALL_AVAILABLE,
NI-DAQmx determines how many samples to read based on if
the task acquires samples continuously or acquires a
finite number of samples.
If the task acquires samples continuously and you set
this input to nidaqmx.READ_ALL_AVAILABLE, this method
reads all the samples currently available in the buffer.
If the task acquires a finite number of samples and you
set this input to nidaqmx.READ_ALL_AVAILABLE, the method
waits for the task to acquire all requested samples,
then reads those samples. If you set the
"read_all_avail_samp" property to TRUE, the method reads
the samples currently available in the buffer and does
not wait for the task to acquire all requested samples.
Returns:
numpy.ndarray:
The samples requested in the form of a 1D NumPy array. This
method determines a NumPy array of appropriate size and data
type to create and return based on your device specifications.
"""
channels_to_read = self.channels_to_read
number_of_channels = len(channels_to_read.channel_names)
samp_size_in_bits = channels_to_read.ai_raw_samp_size
has_negative_range = channels_to_read.ai_rng_low < 0
if samp_size_in_bits == 32:
if has_negative_range:
dtype = numpy.int32
else:
dtype = numpy.uint32
elif samp_size_in_bits == 16:
if has_negative_range:
dtype = numpy.int16
else:
dtype = numpy.uint16
else:
if has_negative_range:
dtype = numpy.int8
else:
dtype = numpy.uint8
num_samps_per_chan = self._calculate_num_samps_per_chan(
number_of_samples_per_channel)
number_of_samples = number_of_channels * num_samps_per_chan
numpy_array = numpy.zeros(number_of_samples, dtype=dtype)
samples_read, number_of_bytes_per_sample = _read_raw(
self._handle, numpy_array, num_samps_per_chan,
self.timeout)
if samples_read != number_of_samples:
return numpy_array[:samples_read]
return numpy_array
[docs] def readall(self):
"""
Reads all available raw samples from the task or virtual channels
you specify.
NI-DAQmx determines how many samples to read based on if the task
acquires samples continuously or acquires a finite number of
samples.
If the task acquires samples continuously, this method reads all
the samples currently available in the buffer.
If the task acquires a finite number of samples, the method
waits for the task to acquire all requested samples, then reads
those samples. If you set the "read_all_avail_samp" property to
TRUE, the method reads the samples currently available in the
buffer and does not wait for the task to acquire all requested
samples.
Raw samples constitute the internal representation of samples in a
device, read directly from the device or buffer without scaling or
reordering. The native format of a device can be an 8-, 16-, or
32-bit integer, signed or unsigned.
NI-DAQmx does not separate raw data into channels. It returns data
in an interleaved or non-interleaved 1D array, depending on the
raw ordering of the device. Refer to your device documentation for
more information.
This method determines a NumPy array of appropriate size and data
type to create and return based on your device specifications.
Use the "timeout" property on the stream to specify the amount of
time in seconds to wait for samples to become available. If the
time elapses, the method returns an error and any samples read
before the timeout elapsed. The default timeout is 10 seconds.
If you set timeout to nidaqmx.WAIT_INFINITELY, the method waits
indefinitely. If you set timeout to 0, the method tries once to
read the requested samples and returns an error if it is unable
to.
Returns:
numpy.ndarray:
The samples requested in the form of a 1D NumPy array. This
method determines a NumPy array of appropriate size and data
type to create and return based on your device specifications.
"""
return self.read(number_of_samples_per_channel=READ_ALL_AVAILABLE)
[docs] def readinto(self, numpy_array):
"""
Reads raw samples from the task or virtual channels you specify
into numpy_array.
The object numpy_array should be a pre-allocated, writable 1D
numpy array.
The number of samples per channel to read is determined using
the following equation:
number_of_samples_per_channel = math.floor(
numpy_array_size_in_bytes / (
number_of_channels_to_read * raw_sample_size_in_bytes))
Raw samples constitute the internal representation of samples in a
device, read directly from the device or buffer without scaling or
reordering. The native format of a device can be an 8-, 16-, or
32-bit integer, signed or unsigned.
If you use a different integer size than the native format of the
device, one integer can contain multiple samples or one sample can
stretch across multiple integers. For example, if you use 32-bit
integers, but the device uses 8-bit samples, one integer contains
up to four samples. If you use 8-bit integers, but the device uses
16-bit samples, a sample might require two integers. This behavior
varies from device to device. Refer to your device documentation
for more information.
NI-DAQmx does not separate raw data into channels. It returns data
in an interleaved or non-interleaved 1D array, depending on the
raw ordering of the device. Refer to your device documentation for
more information.
Use the "timeout" property on the stream to specify the amount of
time in seconds to wait for samples to become available. If the
time elapses, the method returns an error and any samples read
before the timeout elapsed. The default timeout is 10 seconds.
If you set timeout to -1, the method waits indefinitely. If you
set timeout to 0, the method tries once to read the requested
samples and returns an error if it is unable to.
Args:
numpy_array: Specifies the 1D NumPy array object into which
the samples requested are read.
Returns:
int: Indicates the total number of samples read.
"""
channels_to_read = self.channels_to_read
number_of_channels = len(channels_to_read.channel_names)
number_of_samples_per_channel, _ = divmod(
numpy_array.nbytes, (
number_of_channels * channels_to_read.ai_raw_samp_size / 8))
samples_read, _ = _read_raw(
self._handle, numpy_array, number_of_samples_per_channel,
self.timeout)
return samples_read
[docs] def start_new_file(self, file_path):
"""
Starts a new TDMS file the next time data is written to disk.
Args:
file_path (str): Specifies the path to the TDMS file to
which you want to log data.
"""
cfunc = lib_importer.windll.DAQmxStartNewFile
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes_byte_str]
error_code = cfunc(
self._handle, file_path)
check_for_error(error_code)