""" Abstraction layer and interface utilities for LabJack U3-HV """
import u3 # LabJackPython.u3
import json
from copy import copy
from os import path
from pandas import DataFrame
from functools import partial
from numpy import arange, array
from daq import DAQ
# Note: only FIO4-7 exist. FIO0-3 are replaced with AIN0-3 for the U3-HV.
# _apply_target_config(), _assemble_analog_conversion() and LABJACK_U3_AIN_REF_NUMBERS
# include elements specific to the U3-HV
LABJACK_U3_IO_CONFIG_MAP = {
"Analog": "analog_mode",
"Direction": "io_dir",
"State": "cmd"}
LABJACK_U3_IO_BITMASK_FORMATS = {
"FIOAnalog": "08b",
"FIODirection": "08b",
"FIOState": "08b",
"EIOAnalog": "08b",
"EIODirection": "08b",
"EIOState": "08b",
"CIODirection": "02b",
"CIOState": "02b"}
LABJACK_U3_IO_CONFIG_DEFAULT = {} # I/O Config values that are set using bitmasks
LABJACK_U3_IO_CONFIG_DEFAULT.update(
{"FIO{}".format(x): {"analog_mode": 0, "io_dir": 0, "cmd": 0} for x in range(0, 8)})
LABJACK_U3_IO_CONFIG_DEFAULT.update({"EIO{}".format(x): {"analog_mode": 0, "io_dir": 0, "cmd": 0} for x in range(8)})
LABJACK_U3_IO_CONFIG_DEFAULT.update({"CIO{}".format(x): {"io_dir": 0, "cmd": 0} for x in range(4)})
LABJACK_U3_IO_BITMASK_GROUPS = ["FIO", "EIO", "CIO"]
LABJACK_U3_DIRECT_CONFIG_DEFAULT = { # Config values that are set directly
"DAC1Enable": 1,
"DAC0": 0.0, # In volts
"DAC1": 0.0,
"TimerClockConfig": 2,
"TimerClockDivisor": 256}
LABJACK_U3_CYCLE_CONFIG_DEFAULT = {
"mode": "control",
"stream_rate": 300.0,
"channel_list": {
"AIN0": "", # "High-Voltage" for U3-HV
"AIN1": "", # "High-Voltage" for U3-HV
"AIN2": "", # "High-Voltage" for U3-HV
"AIN3": "", # "High-Voltage" for U3-HV
"AIN4": "",
"AIN5": "",
"AIN6": "",
"AIN7": "",
"AIN8": "",
"AIN9": "",
"AIN10": "",
"AIN11": "",
"AIN12": "",
"AIN13": "",
"AIN14": "",
"AIN15": ""}}
LABJACK_U3_CONFIG_DEFAULT = {
"io_config": LABJACK_U3_IO_CONFIG_DEFAULT,
"direct_config": LABJACK_U3_DIRECT_CONFIG_DEFAULT,
"cycle_config": LABJACK_U3_CYCLE_CONFIG_DEFAULT}
LABJACK_U3_COMMANDABLE_CHANNELS = [x for x in LABJACK_U3_IO_CONFIG_DEFAULT.keys()] + ["DAC0", "DAC1"]
LABJACK_U3_AIN_CHANNEL_NUMBERS = {
"AIN0": 0, "AIN1": 1, "AIN2": 2, "AIN3": 3, "AIN4": 4, "AIN5": 5, "AIN6": 6, "AIN7": 7, "AIN8": 8,
"AIN9": 9, "AIN10": 10, "AIN11": 11, "AIN12": 12, "AIN13": 13, "AIN14": 14, "AIN15": 15,
"Temp Sensor": 30,
"Vreg": 31, # Attach internal 3.3V regulator to ADC
} # https://labjack.com/support/datasheets/u3/hardware-description/ain/channel_numbers
LABJACK_U3_AIN_REF_NUMBERS = {
"AIN0": 0, "AIN1": 1, "AIN2": 2, "AIN3": 3, "AIN4": 4, "AIN5": 5, "AIN6": 6, "AIN7": 7, "AIN8": 8,
"AIN9": 9, "AIN10": 10, "AIN11": 11, "AIN12": 12, "AIN13": 13, "AIN14": 14, "AIN15": 15,
"Vref": 30, # About 2.44V
"Single-Ended": 31, # Ref to ground
"High-Voltage": 32, # High-voltage -10 to +20 range for U3-HV. Only valid for AIN0-3.
} # https://labjack.com/support/datasheets/u3/hardware-description/ain/channel_numbers
[docs]class LabJackU3(DAQ):
"""
Abstraction layer for LabJack U3 (U3-HV or U3-LV)
Each interface should take the same constructor arguments and have the same member functions
"""
def __init__(self, device, config_path=None):
"""
:param device: LabJackPython U3 object
:type device: u3.U3
:param config_path: Path to config.json
:type config_path: str
"""
DAQ.__init__(self, device, config_path)
self._device = device
self._target = {}
self._assembled_read = []
self._read_functions = []
self.channels = []
self.ref_channels = []
self.config = {} # Most recent confirmed hardware state
self.device_info = {} # Serial number, etc
self.calibration_data = {} # Slopes and offsets, etc
self.command_channels = [] # List of output channels to reassert at every command cycle
self.commandable_channels = LABJACK_U3_COMMANDABLE_CHANNELS
try:
io_config, direct_config, self.device_info, self.calibration_data = self._query_config()
self.config = {"io_config": io_config, "direct_config": direct_config,
"cycle_config": LABJACK_U3_CYCLE_CONFIG_DEFAULT}
if config_path is not None:
self._target = self._configure(config_path, apply_now=False)
else:
self._target = LABJACK_U3_CONFIG_DEFAULT
# Always apply config and command for every pin during initialization
_validate_config(self._target)
self._apply_target_config()
if self.mode == "control":
self.easy_command({c: 0.0 for c in self.command_channels})
except Exception as e:
print("Failed to initialize U3 interface. Applying default.")
self._target = LABJACK_U3_CONFIG_DEFAULT
self._apply_target_config()
raise e
def _configure(self, fp, apply_now=True):
"""
Load config from file
:param fp: Path to json file
:type fp: str
:param apply_now: Optional flag to apply loaded config immediately
:type apply_now: bool
"""
with open(fp, 'r') as f:
config = json.load(f)["config"]
_validate_config(config)
self._target = config
if apply_now:
self._apply_target_config()
return config
[docs] def read(self):
"""Read values from device"""
if self._stream_mode:
return next(self._stream_buffer) # TODO: print missed packets to log
else:
device_ret = self._device.getFeedback(self._assembled_read)
return [f(x) for f, x in zip(self._read_functions, device_ret)]
[docs] def command(self, assembled_commands):
"""
Send commands to device low-level interface
:param assembled_commands: assembled list of device operations
:type assembled_commands: list
"""
self._device.getFeedback(*assembled_commands)
[docs] def stream(self):
"""Open high-rate read stream"""
if self.mode == "stream":
self._device.streamStart()
self._stream_buffer = self._device.streamData(convert=False)
else:
print(str(self) + " not configured for streaming") # TODO: print to log
[docs] def process_stream_data(self, stream_data):
"""Apply device calibrations and reformat stream return"""
stream_data_processed = [self._device.processStreamData(x["result"]) for x in stream_data]
columns = self.channels
n = sum([len(x[columns[0]]) for x in stream_data_processed])
dt = 1. / self.config["cycle_config"]["stream_rate"]
times = arange(0.0, n * dt, dt)
df = DataFrame(columns=columns, index=times).T
i = 0
for packet in stream_data_processed:
packet_df = DataFrame.from_dict(packet, orient="index")
for j in packet_df:
df[times[i]] = packet_df[j]
i += 1
del packet_df
del packet
df = df.T
return df
[docs] def close(self):
"""
1. End stream
2. Return to default settings
3. Close connection to device
"""
try:
if self._stream_buffer is not None:
self._device.streamStop()
else:
self.easy_command({c: 0.0 for c in self.command_channels})
self._apply_target_config()
finally:
self._device.close()
[docs] def assemble_command_table(self, command_table):
"""
Assemble a command block into device-friendly commands
:param command_table: Time-series table of commands
:type command_table: pandas.DataFrame
"""
# Interpret table from standard format
channels = [c[0] for c in command_table.columns]
command_table.columns = channels # Eliminate multi-indexing
self.command_channels += [c for c in channels if c not in self.command_channels]
# Assemble command for each time step
times = command_table.index.values
assembled_commands = [[]] * len(times)
for i, t in enumerate(times):
assembled_commands[i] = self._assemble_commands(command_table.T[t].to_dict())
return times, assembled_commands
[docs] def easy_command(self, command_dict):
"""
!!SLOW!! Intended for manually commanding channels that may not be on the nominal list to reassert
:param command_dict: Commands indexed by channel
:type command_dict: dict
"""
self.command(self._assemble_commands(command_dict, force=True))
[docs] def get_config(self, query=False):
"""
Return current config as json
:param query: Optional flag to get config directly from device instead of interface class
:type query: bool
"""
if query:
self._query_config()
json_dict = {}
json_dict.update({"device_info": self.device_info,
"config": self.config,
"calibration_data": self.calibration_data})
return json.dumps(json_dict, indent=2)
[docs] def save_config(self, dst="./"):
"""
Save json of pin config, device info, and calibrations
:param dst: destination directory
:type dst: str
"""
json_string = self.get_config()
with open(path.join(dst, "config.json"), "w+") as f:
f.write(json_string)
def _assemble_commands(self, command_dict, force=False):
"""
Apply commanded values to hardware
:param command_dict: Commands indexed by channel
:type command_dict: dict
:param force: Optional flag to skip checking channel list
:type force: bool
:return: Commands in the format accepted by the device
:type return: list
"""
if not force:
assert set(command_dict.keys()) == set(self.command_channels) # All present?
assembled_commands = []
for channel, cmd in command_dict.items():
if channel[0] == "D": # DAC pins
dac_number = int(channel[3])
dac_bits = self._device.voltageToDACBits(cmd, dacNumber=dac_number, is16Bits=True)
assembled_commands.append(u3.DAC16(Dac=dac_number, Value=dac_bits))
elif channel[1:3] == "IO": # FIO, EIO, or CIO pins
io_number = getattr(u3, channel)
# assembled_commands.append(u3.BitDirWrite(io_number, 1)) # Set output mode
assembled_commands.append(u3.BitStateWrite(io_number, int(cmd))) # Set command
else:
print("Unable to command unrecognized channel {}".format(channel))
return assembled_commands
def _query_config(self):
""" Convert actionable parts of return of U3's default config format to more general format """
# Get and format I/O bitmasks and direct config values
device_config = self._device.configU3()
bitmasks = {state: format(device_config[state], fmt) for state, fmt in LABJACK_U3_IO_BITMASK_FORMATS.items()}
io_config = {}
for state, mask in bitmasks.items():
pin_group = state[:3]
state_type = state[3:]
for i in range(len(bitmasks[state])):
pin = pin_group + str(i)
if pin not in io_config.keys():
io_config[pin] = {}
io_config[pin].update({LABJACK_U3_IO_CONFIG_MAP[state_type]: int(mask[i])})
direct_config = {key: device_config[key] for key in LABJACK_U3_DIRECT_CONFIG_DEFAULT.keys()}
device_info = {key: value for key, value in device_config.items() if # Collect anything not already captured
key not in LABJACK_U3_IO_BITMASK_FORMATS.keys() and
key not in LABJACK_U3_DIRECT_CONFIG_DEFAULT.keys()}
return io_config, direct_config, device_info, self._device.getCalibrationData()
def _apply_target_config(self):
""" Apply desired state & confirm result """
device_formatted_config = {} # Config formatted as needed by U3.configU3()
# Apply target read config
# TODO: add digital pins
cycle_config = self._target["cycle_config"]
self.mode = cycle_config["mode"]
self._stream_mode = True if self.mode == "stream" else False
# Process channel list
long_settle, quick_sample = (False, True) # Use quickSample mode by default
num_channels = len(cycle_config["channel_list"].items())
positive_channels = []
negative_channels = []
for channel, ref in cycle_config["channel_list"].items():
ref = ref if ref != "" else "Single-Ended"
self.channels.append(channel)
self.ref_channels.append(ref)
p = LABJACK_U3_AIN_CHANNEL_NUMBERS[channel]
n = LABJACK_U3_AIN_REF_NUMBERS[ref]
n = 30 if n == 32 else n # U3-HV SPECIFIC HANDLING OF HIGH VOLTAGE AIN
positive_channels.append(p)
negative_channels.append(n)
stream_config_formatted = {
"Resolution": 3,
"ScanFrequency": cycle_config["stream_rate"],
"PChannels": positive_channels, # Positive channels
"NChannels": negative_channels, # Negative/reference channels
"NumChannels": num_channels}
for p, n in zip(positive_channels, negative_channels):
self._assembled_read.append(u3.AIN(p, n, long_settle, quick_sample)) # Bits to communicate with device
self._read_functions.append(self._assemble_analog_conversion(p, n)) # Conversion to voltage
# Interpret I/O config
config_map_inverted = {value: key for key, value in LABJACK_U3_IO_CONFIG_MAP.items()}
bitmasks_as_int = {state: format(0, fmt) for state, fmt in LABJACK_U3_IO_BITMASK_FORMATS.items()}
for pin, pin_config in self._target["io_config"].items():
pin_group = pin[0:3] # Like FIO
pin_number = int(pin[3:]) # 0-7
for key, value in pin_config.items():
state = ''.join([pin_group, config_map_inverted[key]]) # like FIO + Analog -> FIOAnalog
bitmasks_as_int[state] = replace_char(bitmasks_as_int[state], pin_number, str(value))
for key, value in bitmasks_as_int.items():
bitmasks_as_int[key] = int(value[::-1], 2) # Convert back to int from string representation of bitmask
device_formatted_config.update(bitmasks_as_int)
# Interpret direct config
direct_config_formatted = copy(self._target["direct_config"])
for analog_out_channel in ["DAC0", "DAC1"]:
direct_config_formatted[analog_out_channel] = self._device.voltageToDACBits(
self._target["direct_config"][analog_out_channel],
dacNumber=int(analog_out_channel[-1]), is16Bits=True)
device_formatted_config.update(direct_config_formatted)
# Communicate with device
if self.mode == "stream":
self._device.configU3()
self._device.configIO(FIOAnalog=255, EIOAnalog=255)
self._device.streamConfig(**stream_config_formatted)
else:
self._device.configU3(**device_formatted_config)
self.config = copy(self._target)
def _assemble_analog_conversion(self, positive_channel, negative_channel):
""" Partial reimplementation of u3.U3.getAIN() """
single_ended = True
if negative_channel != 31:
single_ended = False
low_voltage = True
try:
if self._device.isHV and positive_channel < 4:
low_voltage = False
except AttributeError:
pass
# Build function that will apply calibrations in the loop
high_voltage_mode = True if negative_channel == 30 else False # U3-HV SPECIFIC HANDLING OF HIGH VOLTAGE AIN
calibration_function = partial(self._device.binaryToCalibratedAnalogVoltage, isLowVoltage=low_voltage,
isSingleEnded=single_ended, isSpecialSetting=high_voltage_mode,
channelNumber=positive_channel)
return calibration_function
def __str__(self):
""" Return a unique identifier for the DAQ hardware """
return "{}_SN_{}".format(self.device_info["DeviceName"], self.device_info["SerialNumber"])
def _compare_keys(d_present, d_expected):
""" Compare keys between two dictionaries """
expected_keys = d_expected.keys()
present_keys = d_present.keys()
extra_keys = list(set(present_keys) - set(expected_keys))
missing_keys = list(set(expected_keys) - set(present_keys))
return extra_keys, missing_keys
def _validate_io_config(io_config):
""" Make sure io_config dict is in the right format """
# Make sure all pins are addressed
extra_keys, missing_keys = _compare_keys(io_config, LABJACK_U3_IO_CONFIG_DEFAULT)
if any(extra_keys):
raise ValueError("Unknown pin names: {}".format(extra_keys))
elif any(missing_keys):
raise ValueError("Missing config for pins: {}".format(missing_keys))
# Compare each pin's config
for pin, pin_config in io_config.items():
extra_keys, missing_keys = _compare_keys(pin_config, LABJACK_U3_IO_CONFIG_DEFAULT[pin])
if any(extra_keys):
raise ValueError("{} has unknown config for {}".format(pin, extra_keys))
elif any(missing_keys):
raise ValueError("{} is missing config for {}".format(pin, missing_keys))
for key, value in pin_config.items():
type_present = type(value)
type_expected = type(LABJACK_U3_IO_CONFIG_DEFAULT[pin][key])
if type_present is not type_expected:
raise TypeError(
"Expecting {}[{}] to be of type {}; got type {}".format(pin, key, type_expected, type_present))
def _validate_direct_or_cycle_config(config, default, config_type=""):
""" Make sure direct_config is in the right format """
extra_keys, missing_keys = _compare_keys(config, default)
if any(extra_keys):
raise ValueError("Unknown parameter in {} config: {}".format(config_type, extra_keys))
elif any(missing_keys):
raise ValueError("Missing parameter in {} config: {}".format(config_type, missing_keys))
for key, value in config.items():
type_present = type(value)
type_expected = type(default[key])
if type_present is not type_expected:
raise TypeError(
"Expecting {} to be of type {}; got type {}".format(key, type_expected, type_present))
def _validate_config(config):
""" Make sure I/O bitmasks, direct config values, and stream config are in the right format """
io_config = config["io_config"]
direct_config = config["direct_config"]
cycle_config = config["cycle_config"]
_validate_io_config(io_config)
_validate_direct_or_cycle_config(direct_config, default=LABJACK_U3_DIRECT_CONFIG_DEFAULT, config_type="direct")
_validate_direct_or_cycle_config(cycle_config, default=LABJACK_U3_CYCLE_CONFIG_DEFAULT, config_type="stream")
[docs]def replace_char(string, ind, char):
if ind <= len(string) - 1:
return string[:ind] + char + string[ind + 1:]
else:
return string[:-1] + char