import re
from nidaqmx.errors import DaqError
# Method logic adapted from
# //Measurements/Infrastructure/dmxf/trunk/2.5/source/nimuck/parseUtilities.cpp
_invalid_range_syntax_message = (
"Syntax for a range of objects in the input string is invalid.\n\n"
"For ranges of objects, specify a number immediately before and after "
"every colon (':') in the input string. Or, if a name is specified after "
"the colon, it must be identical to the name specified immediately before "
"the colon. Colons are not allowed within the names of the individual "
"objects.")
[docs]def flatten_channel_string(channel_names):
"""
Converts a list of channel names to a comma-delimited list of names.
You can use this method to convert a list of physical or virtual channel
names to a single string prior to using the DAQmx Create Channel methods or
instantiating a DAQmx Task object.
Note: For simplicity, this implementation is not fully compatible with the
NI-DAQmx driver implementation, which is generally more permissive. For
example, the driver is more graceful with whitespace padding. It was deemed
valuable to implement this natively in Python, so it can be leveraged in
workflows that don't have the driver installed. If we have specific examples
where this approximation is a problem, we can revisit this in the future.
Args:
channel_names (List[str]): The list of physical or virtual channel
names.
Returns:
str:
The resulting comma-delimited list of physical or virtual channel
names.
"""
unflattened_channel_names = []
for channel_name in channel_names:
unflattened_channel_names.extend(unflatten_channel_string(channel_name))
# Go through the channel names and flatten them.
flattened_channel_list = []
previous = {
'base_name': '',
'start_index': -1,
'end_index': -1
}
for channel_name in unflattened_channel_names:
m = re.search('(.*[^0-9])?([0-9]+)$', channel_name)
if not m:
# If the channel name doesn't end in a valid number, just use the
# channel name as-is.
flattened_channel_list.append(
_channel_info_to_flattened_name(previous))
previous = {
'base_name': channel_name,
'start_index': -1,
'start_index_str': "",
'end_index': -1,
'end_index_str': "",
}
else:
# If the channel name ends in a valid number, we may need to flatten
# this channel with subsequent channels in the x:y format.
current_base_name = m.group(1)
current_index_str = m.group(2)
current_index = int(current_index_str)
if current_base_name == previous['base_name'] and (
(current_index == previous['end_index'] + 1 and
previous['end_index'] >= previous['start_index']) or
(current_index == previous['end_index'] - 1 and
previous['end_index'] <= previous['start_index'])):
# If the current channel name has the same base name as the
# previous and it's end index differs by 1, change the end
# index value. It gets flattened later.
previous['end_index'] = current_index
previous['end_index_str'] = current_index_str
else:
# If the current channel name has the same base name as the
# previous or it's end index differs by more than 1, it doesn't
# get flattened with the previous channel.
flattened_channel_list.append(
_channel_info_to_flattened_name(previous))
previous = {
'base_name': current_base_name,
'start_index': current_index,
'start_index_str': current_index_str,
'end_index': current_index,
'end_index_str': current_index_str,
}
# Convert the final channel dictionary to a flattened string
flattened_channel_list.append(
_channel_info_to_flattened_name(previous))
# Remove empty strings in list, convert to comma-delimited string, then trim
# whitespace.
return ','.join([_f for _f in flattened_channel_list if _f]).strip()
def _channel_info_to_flattened_name(channel_info):
"""
Simple method to generate a flattened channel name.
"""
if channel_info['start_index'] == -1:
return channel_info['base_name']
elif channel_info['start_index'] == channel_info['end_index']:
return '{}{}'.format(channel_info['base_name'],
channel_info['start_index_str'])
else:
return '{}{}:{}'.format(channel_info['base_name'],
channel_info['start_index_str'],
channel_info['end_index_str'])
[docs]def unflatten_channel_string(channel_names):
"""
Converts a comma-delimited list of channel names to a list of names.
You can use this method to convert a comma-delimited list or range of
physical or virtual channels into a list of physical or virtual channel
names.
Note: For simplicity, this implementation is not fully compatible with the
NI-DAQmx driver implementation, which is generally more permissive. For
example, the driver is more graceful with whitespace padding. It was deemed
valuable to implement this natively in Python, so it can be leveraged in
workflows that don't have the driver installed. If we have specific examples
where this approximation is a problem, we can revisit this in the future.
Args:
channel_names (str): The list or range of physical or virtual channels.
Returns:
List[str]:
The list of physical or virtual channel names. Each element of the
list contains a single channel.
"""
channel_list_to_return = []
channel_list = [c for c in channel_names.strip().split(',') if c]
for channel in channel_list:
channel = channel.strip()
colon_index = channel.find(':')
if colon_index == -1:
channel_list_to_return.append(channel)
else:
before = channel[:colon_index]
after = channel[colon_index+1:]
m_before = re.match('(.*?)([0-9]+)$', before)
m_after = re.match('(.*?)([0-9]+)$', after)
if not m_before or not m_after:
raise DaqError(_invalid_range_syntax_message,
error_code=-200498)
if m_after.group(1) and (
m_before.group(1).lower() != m_after.group(1).lower()):
raise DaqError(_invalid_range_syntax_message,
error_code=-200498)
num_before_str = m_before.group(2)
num_before = int(num_before_str)
num_after_str = m_after.group(2)
num_after = int(num_after_str)
num_min_width = 0
# If there are any leading 0s in the first number, we want to ensure
# match that width. This is established precedence in the DAQmx
# algorithm.
if num_before > 0 and len(num_before_str.lstrip('0')) < len(num_before_str):
num_min_width = len(num_before_str)
num_max = max([num_before, num_after])
num_min = min([num_before, num_after])
number_of_channels = (num_max - num_min) + 1
if number_of_channels >= 15000:
raise DaqError(_invalid_range_syntax_message,
error_code=-200498)
colon_expanded_channel = []
for i in range(number_of_channels):
current_number = num_min + i
if num_min_width > 0:
# Using fstrings to create format strings. Braces for days!
zero_padded_format_specifier = f"{{:0{num_min_width}d}}"
current_number_str = zero_padded_format_specifier.format(current_number)
else:
current_number_str = str(current_number)
colon_expanded_channel.append(f"{m_before.group(1)}{current_number_str}")
if num_after < num_before:
colon_expanded_channel.reverse()
channel_list_to_return.extend(colon_expanded_channel)
return channel_list_to_return