import ctypes
import six
from collections.abc import Sequence
from nidaqmx._lib import lib_importer, ctypes_byte_str
from nidaqmx.errors import (
check_for_error, is_string_buffer_too_small, DaqError)
from nidaqmx.error_codes import DAQmxErrors
from nidaqmx.system.physical_channel import PhysicalChannel
from nidaqmx.utils import unflatten_channel_string, flatten_channel_string
[docs]class PhysicalChannelCollection(Sequence):
"""
Contains the collection of physical channels for a DAQmx device.
This class defines methods that implements a container object.
"""
def __init__(self, device_name):
self._name = device_name
def __contains__(self, item):
channel_names = self.channel_names
if isinstance(item, six.string_types):
items = unflatten_channel_string(item)
return all([i in channel_names for i in items])
elif isinstance(item, PhysicalChannel):
return item._name in channel_names
return False
def __eq__(self, other):
if isinstance(other, self.__class__):
return self._name == other._name
return False
def __getitem__(self, index):
"""
Indexes a subset of physical channels on this physical channel
collection.
Args:
index: The value of the index. The following index types
are supported:
- str: Name of the physical channel, without the
device name prefix, e.g. 'ai0'. You also can
specify a string that contains a list or range of
names to this input. If you have a list of names,
use the DAQmx Flatten Channel String function to
convert the list to a string.
- int: Index/position of the physical channel in the
collection.
- slice: Range of the indexes/positions of physical
channels in the collection.
Returns:
nidaqmx.system.physical_channel.PhysicalChannel:
Indicates the subset of physical channels indexed.
"""
if isinstance(index, six.integer_types):
return PhysicalChannel(self.channel_names[index])
elif isinstance(index, slice):
return PhysicalChannel(self.channel_names[index])
elif isinstance(index, six.string_types):
return PhysicalChannel('{0}/{1}'.format(self._name, index))
else:
raise DaqError(
'Invalid index type "{0}" used to access collection.'
.format(type(index)), DAQmxErrors.UNKNOWN)
def __iter__(self):
for channel_name in self.channel_names:
yield PhysicalChannel(channel_name)
def __len__(self):
return len(self.channel_names)
def __ne__(self, other):
return not self.__eq__(other)
def __reversed__(self):
channel_names = self.channel_names
channel_names.reverse()
for channel_name in channel_names:
yield PhysicalChannel(channel_name)
@property
def all(self):
"""
nidaqmx.system.physical_channel.PhysicalChannel: Specifies a
physical channel object that represents the entire list of
physical channels on this channel collection.
"""
return PhysicalChannel(flatten_channel_string(self.channel_names))
@property
def channel_names(self):
"""
List[str]: Specifies the entire list of physical channels on this
collection.
"""
raise NotImplementedError()
[docs]class AIPhysicalChannelCollection(PhysicalChannelCollection):
"""
Contains the collection of analog input physical channels for a
DAQmx device.
This class defines methods that implements a container object.
"""
@property
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevAIPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
[docs]class AOPhysicalChannelCollection(PhysicalChannelCollection):
"""
Contains the collection of analog output physical channels for a
DAQmx device.
This class defines methods that implements a container object.
"""
@property
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevAOPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
[docs]class CIPhysicalChannelCollection(PhysicalChannelCollection):
"""
Contains the collection of counter input physical channels for a
DAQmx device.
This class defines methods that implements a container object.
"""
@property
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevCIPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
[docs]class COPhysicalChannelCollection(PhysicalChannelCollection):
"""
Contains the collection of counter output physical channels for a
DAQmx device.
This class defines methods that implements a container object.
"""
@property
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevCOPhysicalChans
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
[docs]class DILinesCollection(PhysicalChannelCollection):
"""
Contains the collection of digital input lines for a DAQmx device.
This class defines methods that implements a container object.
"""
@property
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDILines
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
[docs]class DOLinesCollection(PhysicalChannelCollection):
"""
Contains the collection of digital output lines for a DAQmx device.
This class defines methods that implements a container object.
"""
@property
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDOLines
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
[docs]class DIPortsCollection(PhysicalChannelCollection):
"""
Contains the collection of digital input ports for a DAQmx device.
This class defines methods that implements a container object.
"""
@property
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDIPorts
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))
[docs]class DOPortsCollection(PhysicalChannelCollection):
"""
Contains the collection of digital output ports for a DAQmx device.
This class defines methods that implements a container object.
"""
@property
def channel_names(self):
cfunc = lib_importer.windll.DAQmxGetDevDOPorts
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes.c_char_p, ctypes.c_uint]
temp_size = 0
while True:
val = ctypes.create_string_buffer(temp_size)
size_or_code = cfunc(
self._name, val, temp_size)
if is_string_buffer_too_small(size_or_code):
# Buffer size must have changed between calls; check again.
temp_size = 0
elif size_or_code > 0 and temp_size == 0:
# Buffer size obtained, use to retrieve data.
temp_size = size_or_code
else:
break
check_for_error(size_or_code)
return unflatten_channel_string(val.value.decode('ascii'))