Source code for nidaqmx.task

import threading
import warnings
from enum import Enum

import numpy

from nidaqmx import utils
from nidaqmx._task_modules.channels.channel import Channel
from nidaqmx._task_modules.export_signals import ExportSignals
from nidaqmx._task_modules.in_stream import InStream
from nidaqmx._task_modules.timing import Timing
from nidaqmx._task_modules.triggers import Triggers
from nidaqmx._task_modules.out_stream import OutStream
from nidaqmx._task_modules.ai_channel_collection import (
    AIChannelCollection)
from nidaqmx._task_modules.ao_channel_collection import (
    AOChannelCollection)
from nidaqmx._task_modules.ci_channel_collection import (
    CIChannelCollection)
from nidaqmx._task_modules.co_channel_collection import (
    COChannelCollection)
from nidaqmx._task_modules.di_channel_collection import (
    DIChannelCollection)
from nidaqmx._task_modules.do_channel_collection import (
    DOChannelCollection)
from nidaqmx.constants import (
    AcquisitionType, ChannelType, FillMode, UsageTypeAI, UsageTypeCI, EveryNSamplesEventType,
    READ_ALL_AVAILABLE, UsageTypeCO, _Save)
from nidaqmx.error_codes import DAQmxErrors
from nidaqmx.errors import (
    DaqError, DaqResourceWarning)
from nidaqmx.system.device import _DeviceAlternateConstructor
from nidaqmx.types import CtrFreq, CtrTick, CtrTime, PowerMeasurement
from nidaqmx.utils import unflatten_channel_string, flatten_channel_string

__all__ = ['Task']


class UnsetNumSamplesSentinel:
    pass


class UnsetAutoStartSentinel:
    pass


NUM_SAMPLES_UNSET = UnsetNumSamplesSentinel()
AUTO_START_UNSET = UnsetAutoStartSentinel()

del UnsetNumSamplesSentinel
del UnsetAutoStartSentinel


[docs]class Task: """ Represents a DAQmx Task. """
[docs] def __init__(self, new_task_name='', *, grpc_options=None): """ Creates a DAQmx task. Args: new_task_name (Optional[str]): Specifies the name to assign to the task. If you use this method in a loop and specify a name for the task, you must use the DAQmx Clear Task method within the loop after you are finished with the task. Otherwise, NI-DAQmx attempts to create multiple tasks with the same name, which results in an error. grpc_options (Optional[:class:`~nidaqmx.GrpcSessionOptions`]): Specifies the gRPC session options. """ # Initialize the fields that __del__ accesses so it doesn't crash when __init__ raises an exception. self._handle = None self._close_on_exit = False self._saved_name = new_task_name # _initialize sets this to the name assigned by DAQmx. self._grpc_options = grpc_options self._event_handlers = {} if grpc_options and not ( grpc_options.session_name == "" or grpc_options.session_name == new_task_name ): raise DaqError( f'Unsupported session name: "{grpc_options.session_name}". If a session name is specified, it must match the task name.', DAQmxErrors.UNKNOWN, task_name=new_task_name) self._interpreter = utils._select_interpreter(grpc_options) self._handle, self._close_on_exit = self._interpreter.create_task(new_task_name) self._initialize(self._handle, self._interpreter)
def __del__(self): if self._handle is not None and self._close_on_exit and self._grpc_options is None: warnings.warn( 'Task of name "{}" was not explicitly closed before it was ' 'destructed. Resources on the task device may still be ' 'reserved.'.format(self._saved_name), DaqResourceWarning ) elif ( self._grpc_options is not None and self._event_handlers ): warnings.warn( 'Task of name "{}" was not explicitly closed before it was ' 'destructed. Event handlers may still be active.'.format(self._saved_name), DaqResourceWarning ) def __enter__(self): return self
[docs] def __eq__(self, other): if isinstance(other, self.__class__): return self._handle == other._handle return False
def __exit__(self, type, value, traceback): if self._close_on_exit: self.close()
[docs] def __hash__(self): return self._interpreter.hash_task_handle(self._handle)
[docs] def __ne__(self, other): return not self.__eq__(other)
[docs] def __repr__(self): return f'Task(name={self._saved_name})'
@property def name(self): """ str: Indicates the name of the task. """ val = self._interpreter.get_task_attribute_string(self._handle, 0x1276) return val @property def channels(self): """ :class:`nidaqmx._task_modules.channels.channel.Channel`: Specifies a channel object that represents the entire list of virtual channels in this task. """ return Channel._factory( self._handle, flatten_channel_string(self.channel_names), self._interpreter) @property def channel_names(self): """ List[str]: Indicates the names of all virtual channels in the task. """ val = self._interpreter.get_task_attribute_string(self._handle, 0x1273) return unflatten_channel_string(val) @property def number_of_channels(self): """ int: Indicates the number of virtual channels in the task. """ val = self._interpreter.get_task_attribute_uint32(self._handle, 0x2181) return val @property def devices(self): """ List[:class:`nidaqmx.system.device.Device`]: Indicates a list of Device objects representing all the devices in the task. """ val = self._interpreter.get_task_attribute_string(self._handle, 0x230e) return [_DeviceAlternateConstructor(v, self._interpreter) for v in unflatten_channel_string(val)] @property def number_of_devices(self): """ int: Indicates the number of devices in the task. """ val = self._interpreter.get_task_attribute_uint32(self._handle, 0x29ba) return val @property def ai_channels(self): """ :class:`nidaqmx._task_modules.ai_channel_collection.AIChannelCollection`: Gets the collection of analog input channels for this task. """ return self._ai_channels @property def ao_channels(self): """ :class:`nidaqmx._task_modules.ao_channel_collection.AOChannelCollection`: Gets the collection of analog output channels for this task. """ return self._ao_channels @property def ci_channels(self): """ :class:`nidaqmx._task_modules.ci_channel_collection.CIChannelCollection`: Gets the collection of counter input channels for this task. """ return self._ci_channels @property def co_channels(self): """ :class:`nidaqmx._task_modules.co_channel_collection.COChannelCollection`: Gets the collection of counter output channels for this task. """ return self._co_channels @property def di_channels(self): """ :class:`nidaqmx._task_modules.di_channel_collection.DIChannelCollection`: Gets the collection of digital input channels for this task. """ return self._di_channels @property def do_channels(self): """ :class:`nidaqmx._task_modules.do_channel_collection.DOChannelCollection`: Gets the collection of digital output channels for this task. """ return self._do_channels @property def export_signals(self): """ :class:`nidaqmx._task_modules.export_signals.ExportSignals`: Gets the exported signal configurations for the task. """ return self._export_signals @property def in_stream(self): """ :class:`nidaqmx._task_modules.in_stream.InStream`: Gets the read configurations for the task. """ return self._in_stream @property def out_stream(self): """ :class:`nidaqmx._task_modules.out_stream.OutStream`: Gets the write configurations for the task. """ return self._out_stream @property def timing(self): """ :class:`nidaqmx._task_modules.timing.Timing`: Gets the timing configurations for the task. """ return self._timing @property def triggers(self): """ :class:`nidaqmx._task_modules.triggers.Triggers`: Gets the trigger configurations for the task. """ return self._triggers def _initialize(self, task_handle, interpreter): """ Instantiates and populates various attributes used by this task. Args: task_handle (TaskHandle): Specifies the handle for this task. """ # Saved name is used in self.close() to throw graceful error on # double closes. self._saved_name = self.name self._ai_channels = AIChannelCollection(task_handle, interpreter) self._ao_channels = AOChannelCollection(task_handle, interpreter) self._ci_channels = CIChannelCollection(task_handle, interpreter) self._co_channels = COChannelCollection(task_handle, interpreter) self._di_channels = DIChannelCollection(task_handle, interpreter) self._do_channels = DOChannelCollection(task_handle, interpreter) self._export_signals = ExportSignals(task_handle, interpreter) self._in_stream = InStream(self, interpreter) self._timing = Timing(task_handle, interpreter) self._triggers = Triggers(task_handle, interpreter) self._out_stream = OutStream(self, interpreter) self._event_handler_lock = threading.Lock() def _calculate_num_samps_per_chan(self, num_samps_per_chan): """ Calculates the actual number of samples per channel to read. This method is necessary because the number of samples per channel can be set to NUM_SAMPLES_UNSET or -1, where each value entails a different method of calculating the actual number of samples per channel to read. Args: num_samps_per_chan (int): Specifies the number of samples per channel. """ if num_samps_per_chan is NUM_SAMPLES_UNSET: return 1 elif num_samps_per_chan == READ_ALL_AVAILABLE: acq_type = self.timing.samp_quant_samp_mode if (acq_type == AcquisitionType.FINITE and not self.in_stream.read_all_avail_samp): return self.timing.samp_quant_samp_per_chan else: return self.in_stream.avail_samp_per_chan else: return num_samps_per_chan
[docs] def add_global_channels(self, global_channels): """ Adds global virtual channels from MAX to the given task. Args: global_channels (List[nidaqmx.system.storage.persisted_channel.PersistedChannel]): Specifies the channels to add to the task. These channels must be valid channels available from MAX. If you pass an invalid channel, NI-DAQmx returns an error. This value is ignored if it is empty. """ channels = flatten_channel_string([g._name for g in global_channels]) self._interpreter.add_global_chans_to_task(self._handle, channels)
[docs] def close(self): """ Clears the task. Before clearing, this method aborts the task, if necessary, and releases any resources the task reserved. You cannot use a task after you clear it unless you recreate the task. If you create a DAQmx Task object within a loop, use this method within the loop after you are finished with the task to avoid allocating unnecessary memory. """ if self._handle is None: warnings.warn( 'Attempted to close NI-DAQmx task of name "{}" but task was ' 'already closed.'.format(self._saved_name), DaqResourceWarning) return first_exception = None try: self._interpreter.clear_task(self._handle) except Exception as ex: first_exception = first_exception or ex self._handle = None with self._event_handler_lock: event_handlers = self._event_handlers self._event_handlers = {} for event_handler in event_handlers.values(): try: event_handler.close() except Exception as ex: first_exception = first_exception or ex if first_exception: raise first_exception
[docs] def control(self, action): """ Alters the state of a task according to the action you specify. Args: action (nidaqmx.constants.TaskMode): Specifies how to alter the task state. """ self._interpreter.task_control(self._handle, action.value)
[docs] def is_task_done(self): """ Queries the status of the task and indicates if it completed execution. Use this function to ensure that the specified operation is complete before you stop the task. Returns: bool: Indicates if the measurement or generation completed. """ is_task_done = self._interpreter.is_task_done(self._handle) return is_task_done
[docs] def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET, timeout=10.0): """ Reads samples from the task or virtual channels you specify. This read method is dynamic, and is capable of inferring an appropriate return type based on these factors: - The channel type of the task. - The number of channels to read. - The number of samples per channel. The data type of the samples returned is independently determined by the channel type of the task. For digital input measurements, the data type of the samples returned is determined by the line grouping format of the digital lines. If the line grouping format is set to "one channel for all lines", the data type of the samples returned is int. If the line grouping format is set to "one channel per line", the data type of the samples returned is boolean. If you do not set the number of samples per channel, this method assumes one sample was requested. This method then returns either a scalar (1 channel to read) or a list (N channels to read). If you set the number of samples per channel to ANY value (even 1), this method assumes multiple samples were requested. This method then returns either a list (1 channel to read) or a list of lists (N channels to read). Args: number_of_samples_per_channel (Optional[int]): Specifies the number of samples to read. If this input is not set, assumes samples to read is 1. Conversely, if this input is set, assumes there are multiple samples to read. If you set this input to nidaqmx.constants. READ_ALL_AVAILABLE, NI-DAQmx determines how many samples to read based on if the task acquires samples continuously or acquires a finite number of samples. If the task acquires samples continuously and you set this input to nidaqmx.constants.READ_ALL_AVAILABLE, this method reads all the samples currently available in the buffer. If the task acquires a finite number of samples and you set this input to nidaqmx.constants.READ_ALL_AVAILABLE, the method waits for the task to acquire all requested samples, then reads those samples. If you set the "read_all_avail_samp" property to True, the method reads the samples currently available in the buffer and does not wait for the task to acquire all requested samples. timeout (Optional[float]): Specifies the amount of time in seconds to wait for samples to become available. If the time elapses, the method returns an error and any samples read before the timeout elapsed. The default timeout is 10 seconds. If you set timeout to nidaqmx.constants.WAIT_INFINITELY, the method waits indefinitely. If you set timeout to 0, the method tries once to read the requested samples and returns an error if it is unable to. Returns: dynamic: The samples requested in the form of a scalar, a list, or a list of lists. See method docstring for more info. NI-DAQmx scales the data to the units of the measurement, including any custom scaling you apply to the channels. Use a DAQmx Create Channel method to specify these units. Example: >>> task = Task() >>> task.ai_channels.add_ai_voltage_chan('Dev1/ai0:3') >>> data = task.read() >>> type(data) <type 'list'> >>> type(data[0]) <type 'float'> """ channels_to_read = self.in_stream.channels_to_read number_of_channels = len(channels_to_read.channel_names) read_chan_type = channels_to_read.chan_type num_samples_not_set = (number_of_samples_per_channel is NUM_SAMPLES_UNSET) number_of_samples_per_channel = self._calculate_num_samps_per_chan( number_of_samples_per_channel) # Determine the array shape and size to create if number_of_channels > 1: if not num_samples_not_set: array_shape = (number_of_channels, number_of_samples_per_channel) else: array_shape = number_of_channels else: array_shape = number_of_samples_per_channel if read_chan_type == ChannelType.ANALOG_INPUT: has_power_chan = False for chan in channels_to_read: meas_type = chan.ai_meas_type has_power_chan = meas_type == UsageTypeAI.POWER if has_power_chan: break if has_power_chan: voltages = numpy.zeros(array_shape, dtype=numpy.float64) currents = numpy.zeros(array_shape, dtype=numpy.float64) _, _, samples_read = self._interpreter.read_power_f64( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, voltages, currents) if number_of_channels > 1: if number_of_samples_per_channel == 1: # n channel, 1 sample data = [] for v, i in zip(voltages, currents): data.append(PowerMeasurement(voltage=v, current=i)) return data else: # n channel, n samples data = [] for channel_index in range(number_of_channels): channel_data = [] channel_voltages = voltages[channel_index] channel_currents = currents[channel_index] for v, i in zip(channel_voltages, channel_currents): channel_data.append(PowerMeasurement(voltage=v, current=i)) data.append(channel_data) return data else: if number_of_samples_per_channel == 1: # 1 channel, 1 sample return PowerMeasurement(voltage=voltages[0], current=currents[0]) else: # 1 channel, n samples data = [] for v, i in zip(voltages, currents): data.append(PowerMeasurement(voltage=v, current=i)) return data[:samples_read] else: data = numpy.zeros(array_shape, dtype=numpy.float64) _, samples_read = self._interpreter.read_analog_f64( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, data) elif (read_chan_type == ChannelType.DIGITAL_INPUT or read_chan_type == ChannelType.DIGITAL_OUTPUT): if self.in_stream.di_num_booleans_per_chan == 1: data = numpy.zeros(array_shape, dtype=bool) _, samples_read, _ = self._interpreter.read_digital_lines( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, data) else: data = numpy.zeros(array_shape, dtype=numpy.uint32) _, samples_read = self._interpreter.read_digital_u32( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, data) elif read_chan_type == ChannelType.COUNTER_INPUT: meas_type = channels_to_read.ci_meas_type if meas_type == UsageTypeCI.PULSE_FREQ: frequencies = numpy.zeros(array_shape, dtype=numpy.float64) duty_cycles = numpy.zeros(array_shape, dtype=numpy.float64) _, _, samples_read = self._interpreter.read_ctr_freq( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, frequencies, duty_cycles) data = [] for f, d in zip(frequencies, duty_cycles): data.append(CtrFreq(freq=f, duty_cycle=d)) elif meas_type == UsageTypeCI.PULSE_TIME: high_times = numpy.zeros(array_shape, dtype=numpy.float64) low_times = numpy.zeros(array_shape, dtype=numpy.float64) _, _, samples_read = self._interpreter.read_ctr_time( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, high_times, low_times) data = [] for h, l in zip(high_times, low_times): data.append(CtrTime(high_time=h, low_time=l)) elif meas_type == UsageTypeCI.PULSE_TICKS: high_ticks = numpy.zeros(array_shape, dtype=numpy.uint32) low_ticks = numpy.zeros(array_shape, dtype=numpy.uint32) _, _ , samples_read = self._interpreter.read_ctr_ticks( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, high_ticks, low_ticks) data = [] for h, l in zip(high_ticks, low_ticks): data.append(CtrTick(high_tick=h, low_tick=l)) elif meas_type == UsageTypeCI.COUNT_EDGES: data = numpy.zeros(array_shape, dtype=numpy.uint32) _, samples_read = self._interpreter.read_counter_u32_ex( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, data) else: data = numpy.zeros(array_shape, dtype=numpy.float64) _, samples_read = self._interpreter.read_counter_f64_ex( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, data) else: raise DaqError( 'Read failed, because there are no channels in this task from ' 'which data can be read.', DAQmxErrors.READ_NO_INPUT_CHANS_IN_TASK, task_name=self.name) if (read_chan_type == ChannelType.COUNTER_INPUT and (meas_type == UsageTypeCI.PULSE_FREQ or meas_type == UsageTypeCI.PULSE_TICKS or meas_type == UsageTypeCI.PULSE_TIME)): if num_samples_not_set and array_shape == 1: return data[0] # Counter pulse measurements should not have N channel versions. if samples_read != number_of_samples_per_channel: return data[:samples_read] return data if num_samples_not_set and array_shape == 1: return data.tolist()[0] if samples_read != number_of_samples_per_channel: if number_of_channels > 1: return data[:,:samples_read].tolist() else: return data[:samples_read].tolist() return data.tolist()
[docs] def register_done_event(self, callback_method): """ Registers a callback function to receive an event when a task stops due to an error or when a finite acquisition task or finite generation task completes execution. A Done event does not occur when a task is stopped explicitly, such as by calling DAQmx Stop Task. Args: callback_method (function): Specifies the function that you want DAQmx to call when the event occurs. The function you pass in this parameter must have the following prototype: >>> def callback(task_handle, status, callback_data): >>> return 0 Upon entry to the callback, the task_handle parameter contains the handle to the task on which the event occurred. The status parameter contains the status of the task when the event occurred. If the status value is negative, it indicates an error. If the status value is zero, it indicates no error. If the status value is positive, it indicates a warning. The callbackData parameter contains the value you passed in the callbackData parameter of this function. Passing None for this parameter unregisters the event callback function. """ if callback_method is not None: # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.DONE_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_done_event(self._handle, 0, callback_method, None) with self._event_handler_lock: assert _TaskEventType.DONE not in self._event_handlers, "Event already registered." self._event_handlers[_TaskEventType.DONE] = event_handler else: self._interpreter.unregister_done_event(self._handle) with self._event_handler_lock: event_handler = self._event_handlers.pop(_TaskEventType.DONE, None) if event_handler is not None: event_handler.close() # may raise an exception
[docs] def register_every_n_samples_acquired_into_buffer_event( self, sample_interval, callback_method): """ Registers a callback function to receive an event when the specified number of samples is written from the device to the buffer. This function only works with devices that support buffered tasks. When you stop a task explicitly any pending events are discarded. For example, if you call DAQmx Stop Task then you do not receive any pending events. Args: sample_interval (int): Specifies the number of samples after which each event should occur. callback_method (function): Specifies the function that you want DAQmx to call when the event occurs. The function you pass in this parameter must have the following prototype: >>> def callback(task_handle, every_n_samples_event_type, >>> number_of_samples, callback_data): >>> return 0 Upon entry to the callback, the task_handle parameter contains the handle to the task on which the event occurred. The every_n_samples_event_type parameter contains the EveryNSamplesEventType.ACQUIRED_INTO_BUFFER value. The number_of_samples parameter contains the value you passed in the sample_interval parameter of this function. The callback_data parameter contains the value you passed in the callback_data parameter of this function. Passing None for this parameter unregisters the event callback function. """ if callback_method is not None: # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.EVERY_N_SAMPS_ACQ_INTO_BUFFER_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_every_n_samples_event( self._handle, EveryNSamplesEventType.ACQUIRED_INTO_BUFFER.value, sample_interval, 0, callback_method, None) with self._event_handler_lock: assert _TaskEventType.EVERY_N_SAMPLES_ACQUIRED_INTO_BUFFER not in self._event_handlers, "Event already registered." self._event_handlers[_TaskEventType.EVERY_N_SAMPLES_ACQUIRED_INTO_BUFFER] = event_handler else: self._interpreter.unregister_every_n_samples_event( self._handle, EveryNSamplesEventType.ACQUIRED_INTO_BUFFER.value) with self._event_handler_lock: event_handler = self._event_handlers.pop(_TaskEventType.EVERY_N_SAMPLES_ACQUIRED_INTO_BUFFER, None) if event_handler is not None: event_handler.close() # may raise an exception
[docs] def register_every_n_samples_transferred_from_buffer_event( self, sample_interval, callback_method): """ Registers a callback function to receive an event when the specified number of samples is written from the buffer to the device. This function only works with devices that support buffered tasks. When you stop a task explicitly any pending events are discarded. For example, if you call DAQmx Stop Task then you do not receive any pending events. Args: sample_interval (int): Specifies the number of samples after which each event should occur. callback_method (function): Specifies the function that you want DAQmx to call when the event occurs. The function you pass in this parameter must have the following prototype: >>> def callback(task_handle, every_n_samples_event_type, >>> number_of_samples, callback_data): >>> return 0 Upon entry to the callback, the task_handle parameter contains the handle to the task on which the event occurred. The every_n_samples_event_type parameter contains the EveryNSamplesEventType.TRANSFERRED_FROM_BUFFER value. The number_of_samples parameter contains the value you passed in the sample_interval parameter of this function. The callback_data parameter contains the value you passed in the callback_data parameter of this function. Passing None for this parameter unregisters the event callback function. """ if callback_method is not None: # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.EVERY_N_SAMPS_TRANSFERRED_FROM_BUFFER_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_every_n_samples_event( self._handle, EveryNSamplesEventType.TRANSFERRED_FROM_BUFFER.value, sample_interval, 0, callback_method, None) with self._event_handler_lock: assert _TaskEventType.EVERY_N_SAMPLES_TRANSFERRED_FROM_BUFFER not in self._event_handlers, "Event already registered." self._event_handlers[_TaskEventType.EVERY_N_SAMPLES_TRANSFERRED_FROM_BUFFER] = event_handler else: self._interpreter.unregister_every_n_samples_event( self._handle, EveryNSamplesEventType.TRANSFERRED_FROM_BUFFER.value) with self._event_handler_lock: event_handler = self._event_handlers.pop(_TaskEventType.EVERY_N_SAMPLES_TRANSFERRED_FROM_BUFFER, None) if event_handler is not None: event_handler.close() # may raise an exception
[docs] def register_signal_event(self, signal_type, callback_method): """ Registers a callback function to receive an event when the specified hardware event occurs. When you stop a task explicitly any pending events are discarded. For example, if you call DAQmx Stop Task then you do not receive any pending events. Args: signal_type (nidaqmx.constants.Signal): Specifies the type of signal for which you want to receive results. callback_method (function): Specifies the function that you want DAQmx to call when the event occurs. The function you pass in this parameter must have the following prototype: >>> def callback(task_handle, signal_type, callback_data): >>> return 0 Upon entry to the callback, the task_handle parameter contains the handle to the task on which the event occurred. The signal_type parameter contains the integer value you passed in the signal_type parameter of this function. The callback_data parameter contains the value you passed in the callback_data parameter of this function. Passing None for this parameter unregisters the event callback function. """ if callback_method is not None: # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.SIGNAL_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_signal_event( self._handle, signal_type.value, 0, callback_method, None) with self._event_handler_lock: assert _TaskEventType.SIGNAL not in self._event_handlers, "Event already registered." self._event_handlers[_TaskEventType.SIGNAL] = event_handler else: self._interpreter.unregister_signal_event(self._handle, signal_type.value) with self._event_handler_lock: event_handler = self._event_handlers.pop(_TaskEventType.SIGNAL, None) if event_handler is not None: event_handler.close() # may raise an exception
[docs] def save(self, save_as="", author="", overwrite_existing_task=False, allow_interactive_editing=True, allow_interactive_deletion=True): """ Saves this task and any local channels it contains to MAX. This function does not save global channels. Use the DAQmx Save Global Channel function to save global channels. Args: save_as (Optional[str]): Is the name to save the task, global channel, or custom scale as. If you do not specify a value for this input, NI-DAQmx uses the name currently assigned to the task, global channel, or custom scale. author (Optional[str]): Is a name to store with the task, global channel, or custom scale. overwrite_existing_task (Optional[bool]): Specifies whether to overwrite a task of the same name if one is already saved in MAX. If this input is False and a task of the same name is already saved in MAX, this function returns an error. allow_interactive_editing (Optional[bool]): Specifies whether to allow the task, global channel, or custom scale to be edited in the DAQ Assistant. If allow_interactive_editing is True, the DAQ Assistant must support all task or global channel settings. allow_interactive_deletion (Optional[bool]): Specifies whether to allow the task, global channel, or custom scale to be deleted through MAX. """ options = 0 if overwrite_existing_task: options |= _Save.OVERWRITE.value if allow_interactive_editing: options |= _Save.ALLOW_INTERACTIVE_EDITING.value if allow_interactive_deletion: options |= _Save.ALLOW_INTERACTIVE_DELETION.value self._interpreter.save_task(self._handle, save_as, author, options)
[docs] def start(self): """ Transitions the task to the running state to begin the measurement or generation. Using this method is required for some applications and is optional for others. If you do not use this method, a measurement task starts automatically when the DAQmx Read method runs. The autostart input of the DAQmx Write method determines if a generation task starts automatically when the DAQmx Write method runs. If you do not use the DAQmx Start Task method and the DAQmx Stop Task method when you use the DAQmx Read method or the DAQmx Write method multiple times, such as in a loop, the task starts and stops repeatedly. Starting and stopping a task repeatedly reduces the performance of the application. """ self._interpreter.start_task(self._handle)
[docs] def stop(self): """ Stops the task and returns it to the state the task was in before the DAQmx Start Task method ran or the DAQmx Write method ran with the autostart input set to TRUE. If you do not use the DAQmx Start Task method and the DAQmx Stop Task method when you use the DAQmx Read method or the DAQmx Write method multiple times, such as in a loop, the task starts and stops repeatedly. Starting and stopping a task repeatedly reduces the performance of the application. """ self._interpreter.stop_task(self._handle)
[docs] def wait_until_done(self, timeout=10.0): """ Waits for the measurement or generation to complete. Use this method to ensure that the specified operation is complete before you stop the task. Args: timeout (Optional[float]): Specifies the maximum amount of time in seconds to wait for the measurement or generation to complete. This method returns an error if the time elapses. The default is 10. If you set timeout (sec) to nidaqmx.WAIT_INFINITELY, the method waits indefinitely. If you set timeout (sec) to 0, the method checks once and returns an error if the measurement or generation is not done. """ self._interpreter.wait_until_task_done(self._handle, timeout)
def _raise_invalid_num_lines_error( self, num_lines_expected, num_lines_in_data): raise DaqError( 'Specified read or write operation failed, because the number ' 'of lines in the data for a channel does not match the number ' 'of lines in the channel.\n\n' 'If you are using boolean data, make sure the array dimension ' 'for lines in the data matches the number of lines in the ' 'channel.\n\n' 'Number of Lines Per Channel in Task: {}\n' 'Number of Lines Per Channel in Data: {}' .format(num_lines_expected, num_lines_in_data), DAQmxErrors.NUM_LINES_MISMATCH_IN_READ_OR_WRITE, task_name=self.name) def _raise_invalid_write_num_chans_error( self, number_of_channels, number_of_channels_in_data): raise DaqError( 'Write cannot be performed, because the number of channels in the ' 'data does not match the number of channels in the task.\n\n' 'When writing, supply data for all channels in the task. ' 'Alternatively, modify the task to contain the same number of ' 'channels as the data written.\n\n' 'Number of Channels in Task: {}\n' 'Number of Channels in Data: {}' .format(number_of_channels, number_of_channels_in_data), DAQmxErrors.WRITE_NUM_CHANS_MISMATCH, task_name=self.name)
[docs] def write(self, data, auto_start=AUTO_START_UNSET, timeout=10.0): """ Writes samples to the task or virtual channels you specify. This write method is dynamic, and is capable of accepting the samples to write in the various forms for most operations: - Scalar: Single sample for 1 channel. - List/1D numpy.ndarray: Multiple samples for 1 channel or 1 sample for multiple channels. - List of lists/2D numpy.ndarray: Multiple samples for multiple channels. The data type of the samples passed in must be appropriate for the channel type of the task. For counter output pulse operations, this write method only accepts samples in these forms: - Scalar CtrFreq, CtrTime, CtrTick (from nidaqmx.types): Single sample for 1 channel. - List of CtrFreq, CtrTime, CtrTick (from nidaqmx.types): Multiple samples for 1 channel or 1 sample for multiple channels. If the task uses on-demand timing, this method returns only after the device generates all samples. On-demand is the default timing type if you do not use the timing property on the task to configure a sample timing type. If the task uses any timing type other than on-demand, this method returns immediately and does not wait for the device to generate all samples. Your application must determine if the task is done to ensure that the device generated all samples. Args: data (dynamic): Contains the samples to write to the task. The data you write must be in the units of the generation, including any custom scales. Use the DAQmx Create Channel methods to specify these units. auto_start (Optional[bool]): Specifies if this method automatically starts the task if you did not explicitly start it with the DAQmx Start Task method. The default value of this parameter depends on whether you specify one sample or many samples to write to each channel. If one sample per channel was specified, the default value is True. If multiple samples per channel were specified, the default value is False. timeout (Optional[float]): Specifies the amount of time in seconds to wait for the method to write all samples. NI-DAQmx performs a timeout check only if the method must wait before it writes data. This method returns an error if the time elapses. The default timeout is 10 seconds. If you set timeout to nidaqmx.constants.WAIT_INFINITELY, the method waits indefinitely. If you set timeout to 0, the method tries once to write the submitted samples. If the method could not write all the submitted samples, it returns an error and the number of samples successfully written. Returns: int: Specifies the actual number of samples this method successfully wrote. """ channels_to_write = self.channels number_of_channels = len(channels_to_write.channel_names) write_chan_type = channels_to_write.chan_type element = None if number_of_channels == 1: if isinstance(data, list): if isinstance(data[0], list): self._raise_invalid_write_num_chans_error( number_of_channels, len(data)) number_of_samples_per_channel = len(data) element = data[0] elif isinstance(data, numpy.ndarray): if len(data.shape) == 2: self._raise_invalid_write_num_chans_error( number_of_channels, data.shape[0]) number_of_samples_per_channel = len(data) element = data[0] else: number_of_samples_per_channel = 1 element = data else: if isinstance(data, list): if len(data) != number_of_channels: self._raise_invalid_write_num_chans_error( number_of_channels, len(data)) if isinstance(data[0], list): number_of_samples_per_channel = len(data[0]) element = data[0][0] else: number_of_samples_per_channel = 1 element = data[0] elif isinstance(data, numpy.ndarray): if data.shape[0] != number_of_channels: self._raise_invalid_write_num_chans_error( number_of_channels, data.shape[0]) if len(data.shape) == 2: number_of_samples_per_channel = data.shape[1] element = data[0][0] else: number_of_samples_per_channel = 1 element = data[0] else: self._raise_invalid_write_num_chans_error( number_of_channels, 1) if auto_start is AUTO_START_UNSET: if number_of_samples_per_channel > 1: auto_start = False else: auto_start = True if write_chan_type == ChannelType.ANALOG_OUTPUT: data = numpy.asarray(data, dtype=numpy.float64) return self._interpreter.write_analog_f64( self._handle, number_of_samples_per_channel, auto_start, timeout, FillMode.GROUP_BY_CHANNEL.value, data) elif write_chan_type == ChannelType.DIGITAL_OUTPUT: if self.out_stream.do_num_booleans_per_chan == 1: if (not isinstance(element, bool) and not isinstance(element, numpy.bool_)): raise DaqError( 'Write failed, because this write method only accepts ' 'boolean samples when there is one digital line per ' 'channel in a task.\n\n' 'Requested sample type: {}'.format(type(element)), DAQmxErrors.UNKNOWN, task_name=self.name) data = numpy.asarray(data, dtype=bool) return self._interpreter.write_digital_lines( self._handle, number_of_samples_per_channel, auto_start, timeout, FillMode.GROUP_BY_CHANNEL.value, data) else: if (not isinstance(element, int) and not isinstance(element, numpy.uint32)): raise DaqError( 'Write failed, because this write method only accepts ' 'unsigned 32-bit integer samples when there are ' 'multiple digital lines per channel in a task.\n\n' 'Requested sample type: {}'.format(type(element)), DAQmxErrors.UNKNOWN, task_name=self.name) data = numpy.asarray(data, dtype=numpy.uint32) return self._interpreter.write_digital_u32( self._handle, number_of_samples_per_channel, auto_start, timeout, FillMode.GROUP_BY_CHANNEL.value, data) elif write_chan_type == ChannelType.COUNTER_OUTPUT: output_type = channels_to_write.co_output_type if number_of_samples_per_channel == 1: data = [data] if output_type == UsageTypeCO.PULSE_FREQUENCY: frequencies = [] duty_cycles = [] for sample in data: frequencies.append(sample.freq) duty_cycles.append(sample.duty_cycle) frequencies = numpy.asarray(frequencies, dtype=numpy.float64) duty_cycles = numpy.asarray(duty_cycles, dtype=numpy.float64) return self._interpreter.write_ctr_freq( self._handle, number_of_samples_per_channel, auto_start, timeout, FillMode.GROUP_BY_CHANNEL.value, frequencies, duty_cycles) elif output_type == UsageTypeCO.PULSE_TIME: high_times = [] low_times = [] for sample in data: high_times.append(sample.high_time) low_times.append(sample.low_time) high_times = numpy.asarray(high_times, dtype=numpy.float64) low_times = numpy.asarray(low_times, dtype=numpy.float64) return self._interpreter.write_ctr_time( self._handle, number_of_samples_per_channel, auto_start, timeout, FillMode.GROUP_BY_CHANNEL.value, high_times, low_times) elif output_type == UsageTypeCO.PULSE_TICKS: high_ticks = [] low_ticks = [] for sample in data: high_ticks.append(sample.high_tick) low_ticks.append(sample.low_tick) high_ticks = numpy.asarray(high_ticks, dtype=numpy.uint32) low_ticks = numpy.asarray(low_ticks, dtype=numpy.uint32) return self._interpreter.write_ctr_ticks( self._handle, number_of_samples_per_channel, auto_start, timeout, FillMode.GROUP_BY_CHANNEL.value, high_ticks, low_ticks) else: raise DaqError( 'Write failed, because there are no output channels in this ' 'task to which data can be written.', DAQmxErrors.WRITE_NO_OUTPUT_CHANS_IN_TASK, task_name=self.name)
class _TaskAlternateConstructor(Task): """ Provide an alternate constructor for the Task object. This is a private API used to instantiate a Task with an existing task handle and interpreter. """ # Setting __slots__ avoids TypeError: __class__ assignment: 'Base' object layout differs from 'Derived'. __slots__ = () def __init__(self, task_handle, interpreter, close_on_exit): """ Args: task_handle: Specifies the task handle from which to create a Task object. interpreter: Specifies the interpreter instance. close_on_exit: Specifies whether the task's context manager closes the task. """ # Initialize the fields that __del__ accesses so it doesn't crash when _initialize raises an exception. self._handle = task_handle self._close_on_exit = close_on_exit self._saved_name = "" # _initialize sets this to the name assigned by DAQmx. self._grpc_options = getattr(interpreter, "_grpc_options", None) self._event_handlers = {} self._interpreter = interpreter self._initialize(self._handle, self._interpreter) # Use meta-programming to change the type of this object to Task, # so the user isn't confused when doing introspection. self.__class__ = Task class _TaskEventType(Enum): """Internal enum for task event bookkeeping.""" DONE = 1 EVERY_N_SAMPLES_ACQUIRED_INTO_BUFFER = 2 EVERY_N_SAMPLES_TRANSFERRED_FROM_BUFFER = 3 SIGNAL = 4