# Do not edit this file; it was automatically generated.
from nidaqmx.utils import unflatten_channel_string
from nidaqmx.constants import (
RegenerationMode, ResolutionType, WaitMode, WriteRelativeTo)
[docs]
class OutStream:
"""
Exposes an output data stream on a DAQmx task.
The output data stream be used to control writing behavior and can be
used in conjunction with writer classes to write samples to an
NI-DAQmx task.
"""
__slots__ = ('_task', '_handle', '_interpreter', '_auto_start', '_timeout')
[docs]
def __init__(self, task, interpreter):
self._task = task
self._handle = task._handle
self._interpreter = interpreter
self._auto_start = False
self._timeout = 10.0
super().__init__()
[docs]
def __eq__(self, other):
if isinstance(other, self.__class__):
return (self._handle == other._handle and
self._auto_start == other._auto_start and
self._timeout == other._timeout)
return False
[docs]
def __hash__(self):
return self._interpreter.hash_task_handle(self._handle) ^ hash((self._auto_start, self._timeout))
[docs]
def __ne__(self, other):
return not self.__eq__(other)
[docs]
def __repr__(self):
return f'OutStream(task={self._task.name})'
@property
def auto_start(self):
"""
bool: Specifies if the "write" method automatically starts the
stream's owning task if you did not explicitly start it
with the DAQmx Start Task method.
"""
return self._auto_start
@auto_start.setter
def auto_start(self, val):
self._auto_start = val
@auto_start.deleter
def auto_start(self):
self._auto_start = False
@property
def timeout(self):
"""
float: Specifies the amount of time in seconds to wait for
the write method to write all samples. NI-DAQmx performs a
timeout check only if the write method must wait before it
writes data. The write method returns an error if the time
elapses. The default timeout is 10 seconds. If you set
"timeout" to nidaqmx.WAIT_INFINITELY, the write method
waits indefinitely. If you set timeout to 0, the write
method tries once to write the submitted samples. If the
write method could not write all the submitted samples, it
returns an error and the number of samples successfully
written in the number of samples written per channel
output.
"""
return self._timeout
@timeout.setter
def timeout(self, val):
self._timeout = val
@timeout.deleter
def timeout(self):
self._timeout = 10.0
@property
def accessory_insertion_or_removal_detected(self):
"""
bool: Indicates if any devices in the task detected the
insertion or removal of an accessory since the task started.
Reading this property clears the accessory change status for
all channels in the task. You must read this property before
you read **devs_with_inserted_or_removed_accessories**.
Otherwise, you will receive an error.
"""
val = self._interpreter.get_write_attribute_bool(self._handle, 0x3053)
return val
@property
def curr_write_pos(self):
"""
int: Indicates the position in the buffer of the next sample to
generate. This value is identical for all channels in the
task.
"""
val = self._interpreter.get_write_attribute_uint64(self._handle, 0x1458)
return val
@property
def devs_with_inserted_or_removed_accessories(self):
"""
List[str]: Indicates the names of any devices that detected the
insertion or removal of an accessory since the task started.
You must read **accessory_insertion_or_removal_detected**
before you read this property. Otherwise, you will receive
an error.
"""
buffer_size = self.get_channels_buffer_size()
val = self._interpreter.get_write_attribute_string(self._handle, 0x3054, buffer_size)
return unflatten_channel_string(val)
@property
def do_num_booleans_per_chan(self):
"""
int: Indicates the number of Boolean values expected per channel
in a sample for line-based writes. This property is
determined by the channel in the task with the most digital
lines. If a channel has fewer lines than this number, NI-
DAQmx ignores the extra Boolean values.
"""
val = self._interpreter.get_write_attribute_uint32(self._handle, 0x217f)
return val
@property
def external_overvoltage_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which an External Overvoltage condition has
been detected. You must read External OvervoltageChansExist
before you read this property. Otherwise, you will receive
an error.
"""
buffer_size = self.get_channels_buffer_size()
val = self._interpreter.get_write_attribute_string(self._handle, 0x30bc, buffer_size)
return unflatten_channel_string(val)
@property
def external_overvoltage_chans_exist(self):
"""
bool: Indicates if the device(s) detected an External
Overvoltage condition for any channel in the task. Reading
this property clears the External Overvoltage status for all
channels in the task. You must read this property before you
read External OvervoltageChans. Otherwise, you will receive
an error.
"""
val = self._interpreter.get_write_attribute_bool(self._handle, 0x30bb)
return val
@property
def num_chans(self):
"""
int: Indicates the number of channels that DAQmx Write writes to
the task. This value is the number of channels in the task.
"""
val = self._interpreter.get_write_attribute_uint32(self._handle, 0x217e)
return val
@property
def offset(self):
"""
int: Specifies in samples per channel an offset at which a write
operation begins. This offset is relative to the location
you specify with **relative_to**.
"""
val = self._interpreter.get_write_attribute_int32(self._handle, 0x190d)
return val
@offset.setter
def offset(self, val):
self._interpreter.set_write_attribute_int32(self._handle, 0x190d, val)
@offset.deleter
def offset(self):
self._interpreter.reset_write_attribute(self._handle, 0x190d)
@property
def open_current_loop_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which the device(s) detected an open current
loop. You must read **open_current_loop_chans_exist** before
you read this property. Otherwise, you will receive an
error.
"""
buffer_size = self.get_channels_buffer_size()
val = self._interpreter.get_write_attribute_string(self._handle, 0x29eb, buffer_size)
return unflatten_channel_string(val)
@property
def open_current_loop_chans_exist(self):
"""
bool: Indicates if the device(s) detected an open current loop
for any channel in the task. Reading this property clears
the open current loop status for all channels in the task.
You must read this property before you read
**open_current_loop_chans**. Otherwise, you will receive an
error.
"""
val = self._interpreter.get_write_attribute_bool(self._handle, 0x29ea)
return val
@property
def output_buf_size(self):
"""
int: Specifies the number of samples the output buffer can hold
for each channel in the task. Zero indicates to allocate no
buffer. Use a buffer size of 0 to perform a hardware-timed
operation without using a buffer. Setting this property
overrides the automatic output buffer allocation that NI-
DAQmx performs.
"""
val = self._interpreter.get_buffer_attribute_uint32(self._handle, 0x186d)
return val
@output_buf_size.setter
def output_buf_size(self, val):
self._interpreter.set_buffer_attribute_uint32(self._handle, 0x186d, val)
@output_buf_size.deleter
def output_buf_size(self):
self._interpreter.reset_buffer_attribute(self._handle, 0x186d)
@property
def output_onbrd_buf_size(self):
"""
int: Specifies in samples per channel the size of the onboard
output buffer of the device.
"""
val = self._interpreter.get_buffer_attribute_uint32(self._handle, 0x230b)
return val
@output_onbrd_buf_size.setter
def output_onbrd_buf_size(self, val):
self._interpreter.set_buffer_attribute_uint32(self._handle, 0x230b, val)
@output_onbrd_buf_size.deleter
def output_onbrd_buf_size(self):
self._interpreter.reset_buffer_attribute(self._handle, 0x230b)
@property
def overcurrent_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task for which an overcurrent condition has been
detected. You must read **overcurrent_chans_exist** before
you read this property. Otherwise, you will receive an
error.
"""
buffer_size = self.get_channels_buffer_size()
val = self._interpreter.get_write_attribute_string(self._handle, 0x29e9, buffer_size)
return unflatten_channel_string(val)
@property
def overcurrent_chans_exist(self):
"""
bool: Indicates if the device(s) detected an overcurrent
condition for any channel in the task. Reading this property
clears the overcurrent status for all channels in the task.
You must read this property before you read
**overcurrent_chans**. Otherwise, you will receive an error.
"""
val = self._interpreter.get_write_attribute_bool(self._handle, 0x29e8)
return val
@property
def overloaded_chans(self):
"""
List[str]: Indicates a list of names of any overloaded virtual
channels in the task. You must read
**overloaded_chans_exist** before you read this property.
Otherwise, you will receive an error.
"""
buffer_size = self.get_channels_buffer_size()
val = self._interpreter.get_write_attribute_string(self._handle, 0x3085, buffer_size)
return unflatten_channel_string(val)
@property
def overloaded_chans_exist(self):
"""
bool: Indicates if the device(s) detected an overload in any
virtual channel in the task. Reading this property clears
the overload status for all channels in the task. You must
read this property before you read **overloaded_chans**.
Otherwise, you will receive an error.
"""
val = self._interpreter.get_write_attribute_bool(self._handle, 0x3084)
return val
@property
def overtemperature_chans(self):
"""
List[str]: Indicates a list of names of any overtemperature
virtual channels. You must read
**overtemperature_chans_exist** before you read this
property. Otherwise, you will receive an error. The list of
names may be empty if the device cannot determine the source
of the overtemperature.
"""
buffer_size = self.get_channels_buffer_size()
val = self._interpreter.get_write_attribute_string(self._handle, 0x3083, buffer_size)
return unflatten_channel_string(val)
@property
def overtemperature_chans_exist(self):
"""
bool: Indicates if the device(s) detected an overtemperature
condition in any virtual channel in the task. Reading this
property clears the overtemperature status for all channels
in the task. You must read this property before you read
**overtemperature_chans**. Otherwise, you will receive an
error.
"""
val = self._interpreter.get_write_attribute_bool(self._handle, 0x2a84)
return val
@property
def power_supply_fault_chans(self):
"""
List[str]: Indicates a list of names of any virtual channels in
the task that have a power supply fault. You must read
**power_supply_fault_chans_exist** before you read this
property. Otherwise, you will receive an error.
"""
buffer_size = self.get_channels_buffer_size()
val = self._interpreter.get_write_attribute_string(self._handle, 0x29ed, buffer_size)
return unflatten_channel_string(val)
@property
def power_supply_fault_chans_exist(self):
"""
bool: Indicates if the device(s) detected a power supply fault
for any channel in the task. Reading this property clears
the power supply fault status for all channels in the task.
You must read this property before you read
**power_supply_fault_chans**. Otherwise, you will receive an
error.
"""
val = self._interpreter.get_write_attribute_bool(self._handle, 0x29ec)
return val
@property
def raw_data_width(self):
"""
int: Indicates in bytes the required size of a raw sample to
write to the task.
"""
val = self._interpreter.get_write_attribute_uint32(self._handle, 0x217d)
return val
@property
def regen_mode(self):
"""
:class:`nidaqmx.constants.RegenerationMode`: Specifies whether
to allow NI-DAQmx to generate the same data multiple times.
"""
val = self._interpreter.get_write_attribute_int32(self._handle, 0x1453)
return RegenerationMode(val)
@regen_mode.setter
def regen_mode(self, val):
val = val.value
self._interpreter.set_write_attribute_int32(self._handle, 0x1453, val)
@regen_mode.deleter
def regen_mode(self):
self._interpreter.reset_write_attribute(self._handle, 0x1453)
@property
def relative_to(self):
"""
:class:`nidaqmx.constants.WriteRelativeTo`: Specifies the point
in the buffer at which to write data. If you also specify an
offset with **offset**, the write operation begins at that
offset relative to this point you select with this property.
"""
val = self._interpreter.get_write_attribute_int32(self._handle, 0x190c)
return WriteRelativeTo(val)
@relative_to.setter
def relative_to(self, val):
val = val.value
self._interpreter.set_write_attribute_int32(self._handle, 0x190c, val)
@relative_to.deleter
def relative_to(self):
self._interpreter.reset_write_attribute(self._handle, 0x190c)
@property
def sleep_time(self):
"""
float: Specifies in seconds the amount of time to sleep after
checking for available buffer space if **wait_mode** is
**WaitMode2.SLEEP**.
"""
val = self._interpreter.get_write_attribute_double(self._handle, 0x22b2)
return val
@sleep_time.setter
def sleep_time(self, val):
self._interpreter.set_write_attribute_double(self._handle, 0x22b2, val)
@sleep_time.deleter
def sleep_time(self):
self._interpreter.reset_write_attribute(self._handle, 0x22b2)
@property
def space_avail(self):
"""
int: Indicates in samples per channel the amount of available
space in the buffer.
"""
val = self._interpreter.get_write_attribute_uint32(self._handle, 0x1460)
return val
@property
def sync_unlocked_chans(self):
"""
List[str]: Indicates the channels from devices in an unlocked
target.
"""
buffer_size = self.get_channels_buffer_size()
val = self._interpreter.get_write_attribute_string(self._handle, 0x3140, buffer_size)
return unflatten_channel_string(val)
@property
def sync_unlocked_chans_exist(self):
"""
bool: Indicates whether the target is currently locked to the
grand master. Devices may report PLL Unlock either during
acquisition or after acquisition.
"""
val = self._interpreter.get_write_attribute_bool(self._handle, 0x313f)
return val
@property
def total_samp_per_chan_generated(self):
"""
int: Indicates the total number of samples generated by each
channel in the task. This value is identical for all
channels in the task.
"""
val = self._interpreter.get_write_attribute_uint64(self._handle, 0x192b)
return val
@property
def wait_mode(self):
"""
:class:`nidaqmx.constants.WaitMode`: Specifies how DAQmx Write
waits for space to become available in the buffer.
"""
val = self._interpreter.get_write_attribute_int32(self._handle, 0x22b1)
return WaitMode(val)
@wait_mode.setter
def wait_mode(self, val):
val = val.value
self._interpreter.set_write_attribute_int32(self._handle, 0x22b1, val)
@wait_mode.deleter
def wait_mode(self):
self._interpreter.reset_write_attribute(self._handle, 0x22b1)
[docs]
def write(self, numpy_array):
"""
Writes raw samples to the task or virtual channels you specify.
The number of samples per channel to write is determined using the
following equation:
number_of_samples_per_channel = math.floor(
numpy_array_size_in_bytes / (
number_of_channels_to_write * raw_sample_size_in_bytes))
Raw samples constitute the internal representation of samples in a
device, read directly from the device or buffer without scaling or
reordering. The native format of a device can be an 8-, 16-, or 32-bit
integer, signed or unsigned.
If you use a different integer size than the native format of the
device, one integer can contain multiple samples or one sample can
stretch across multiple integers. For example, if you use 32-bit
integers, but the device uses 8-bit samples, one integer contains up to
four samples. If you use 8-bit integers, but the device uses 16-bit
samples, a sample might require two integers. This behavior varies from
device to device. Refer to your device documentation for more
information.
NI-DAQmx does not separate raw data into channels. It accepts data in
an interleaved or non-interleaved 1D array, depending on the raw
ordering of the device. Refer to your device documentation for more
information.
If the task uses on-demand timing, this method returns only after the
device generates all samples. On-demand is the default timing type if
you do not use the timing property on the task to configure a sample
timing type. If the task uses any timing type other than on-demand,
this method returns immediately and does not wait for the device to
generate all samples. Your application must determine if the task is
done to ensure that the device generated all samples.
Use the "auto_start" property on the stream to specify if this method
automatically starts the stream's owning task if you did not explicitly
start it with the DAQmx Start Task method.
Use the "timeout" property on the stream to specify the amount of
time in seconds to wait for the method to write all samples. NI-DAQmx
performs a timeout check only if the method must wait before it writes
data. This method returns an error if the time elapses. The default
timeout is 10 seconds. If you set timeout to nidaqmx.WAIT_INFINITELY,
the method waits indefinitely. If you set timeout to 0, the method
tries once to write the submitted samples. If the method could not
write all the submitted samples, it returns an error and the number of
samples successfully written.
Args:
numpy_array (numpy.ndarray): Specifies a 1D NumPy array that
contains the raw samples to write to the task.
Returns:
int:
Specifies the actual number of samples per channel successfully
written to the buffer.
"""
channels_to_write = self._task.channels
number_of_channels = len(channels_to_write.channel_names)
channels_to_write.ao_resolution_units = ResolutionType.BITS
number_of_samples_per_channel, _ = divmod(
numpy_array.nbytes, (
number_of_channels * int(channels_to_write.ao_resolution) // 8))
return self._interpreter.write_raw(
self._handle, number_of_samples_per_channel,
self.auto_start, self.timeout, numpy_array)
def get_channels_buffer_size(self):
channel_names = self._task.channel_names
total_size = sum(len(name) + 2 for name in channel_names) + 1
return total_size