Source code for daq.hardware.labjack_u3

""" Abstraction layer and interface utilities for LabJack U3-HV """

import u3  # LabJackPython.u3
import json
from copy import copy
from os import path
from pandas import DataFrame
from functools import partial
from numpy import arange, array
from daq import DAQ

# Note: only FIO4-7 exist. FIO0-3 are replaced with AIN0-3 for the U3-HV.
# _apply_target_config(), _assemble_analog_conversion() and LABJACK_U3_AIN_REF_NUMBERS
# include elements specific to the U3-HV
LABJACK_U3_IO_CONFIG_MAP = {
    "Analog": "analog_mode",
    "Direction": "io_dir",
    "State": "cmd"}
LABJACK_U3_IO_BITMASK_FORMATS = {
    "FIOAnalog": "08b",
    "FIODirection": "08b",
    "FIOState": "08b",
    "EIOAnalog": "08b",
    "EIODirection": "08b",
    "EIOState": "08b",
    "CIODirection": "02b",
    "CIOState": "02b"}
LABJACK_U3_IO_CONFIG_DEFAULT = {}  # I/O Config values that are set using bitmasks
LABJACK_U3_IO_CONFIG_DEFAULT.update(
    {"FIO{}".format(x): {"analog_mode": 0, "io_dir": 0, "cmd": 0} for x in range(0, 8)})
LABJACK_U3_IO_CONFIG_DEFAULT.update({"EIO{}".format(x): {"analog_mode": 0, "io_dir": 0, "cmd": 0} for x in range(8)})
LABJACK_U3_IO_CONFIG_DEFAULT.update({"CIO{}".format(x): {"io_dir": 0, "cmd": 0} for x in range(4)})
LABJACK_U3_IO_BITMASK_GROUPS = ["FIO", "EIO", "CIO"]
LABJACK_U3_DIRECT_CONFIG_DEFAULT = {  # Config values that are set directly
    "DAC1Enable": 1,
    "DAC0": 0.0,  # In volts
    "DAC1": 0.0,
    "TimerClockConfig": 2,
    "TimerClockDivisor": 256}
LABJACK_U3_CYCLE_CONFIG_DEFAULT = {
    "mode": "control",
    "stream_rate": 300.0,
    "channel_list": {
        "AIN0": "",  # "High-Voltage" for U3-HV
        "AIN1": "",  # "High-Voltage" for U3-HV
        "AIN2": "",  # "High-Voltage" for U3-HV
        "AIN3": "",  # "High-Voltage" for U3-HV
        "AIN4": "",
        "AIN5": "",
        "AIN6": "",
        "AIN7": "",
        "AIN8": "",
        "AIN9": "",
        "AIN10": "",
        "AIN11": "",
        "AIN12": "",
        "AIN13": "",
        "AIN14": "",
        "AIN15": ""}}
LABJACK_U3_CONFIG_DEFAULT = {
    "io_config": LABJACK_U3_IO_CONFIG_DEFAULT,
    "direct_config": LABJACK_U3_DIRECT_CONFIG_DEFAULT,
    "cycle_config": LABJACK_U3_CYCLE_CONFIG_DEFAULT}
LABJACK_U3_COMMANDABLE_CHANNELS = [x for x in LABJACK_U3_IO_CONFIG_DEFAULT.keys()] + ["DAC0", "DAC1"]
LABJACK_U3_AIN_CHANNEL_NUMBERS = {
    "AIN0": 0, "AIN1": 1, "AIN2": 2, "AIN3": 3, "AIN4": 4, "AIN5": 5, "AIN6": 6, "AIN7": 7, "AIN8": 8,
    "AIN9": 9, "AIN10": 10, "AIN11": 11, "AIN12": 12, "AIN13": 13, "AIN14": 14, "AIN15": 15,
    "Temp Sensor": 30,
    "Vreg": 31,  # Attach internal 3.3V regulator to ADC
}  # https://labjack.com/support/datasheets/u3/hardware-description/ain/channel_numbers
LABJACK_U3_AIN_REF_NUMBERS = {
    "AIN0": 0, "AIN1": 1, "AIN2": 2, "AIN3": 3, "AIN4": 4, "AIN5": 5, "AIN6": 6, "AIN7": 7, "AIN8": 8,
    "AIN9": 9, "AIN10": 10, "AIN11": 11, "AIN12": 12, "AIN13": 13, "AIN14": 14, "AIN15": 15,
    "Vref": 30,  # About 2.44V
    "Single-Ended": 31,  # Ref to ground
    "High-Voltage": 32,  # High-voltage -10 to +20 range for U3-HV. Only valid for AIN0-3.
}  # https://labjack.com/support/datasheets/u3/hardware-description/ain/channel_numbers


[docs]class LabJackU3(DAQ): """ Abstraction layer for LabJack U3 (U3-HV or U3-LV) Each interface should take the same constructor arguments and have the same member functions """ def __init__(self, device, config_path=None): """ :param device: LabJackPython U3 object :type device: u3.U3 :param config_path: Path to config.json :type config_path: str """ DAQ.__init__(self, device, config_path) self._device = device self._target = {} self._assembled_read = [] self._read_functions = [] self.channels = [] self.ref_channels = [] self.config = {} # Most recent confirmed hardware state self.device_info = {} # Serial number, etc self.calibration_data = {} # Slopes and offsets, etc self.command_channels = [] # List of output channels to reassert at every command cycle self.commandable_channels = LABJACK_U3_COMMANDABLE_CHANNELS try: io_config, direct_config, self.device_info, self.calibration_data = self._query_config() self.config = {"io_config": io_config, "direct_config": direct_config, "cycle_config": LABJACK_U3_CYCLE_CONFIG_DEFAULT} if config_path is not None: self._target = self._configure(config_path, apply_now=False) else: self._target = LABJACK_U3_CONFIG_DEFAULT # Always apply config and command for every pin during initialization _validate_config(self._target) self._apply_target_config() if self.mode == "control": self.easy_command({c: 0.0 for c in self.command_channels}) except Exception as e: print("Failed to initialize U3 interface. Applying default.") self._target = LABJACK_U3_CONFIG_DEFAULT self._apply_target_config() raise e def _configure(self, fp, apply_now=True): """ Load config from file :param fp: Path to json file :type fp: str :param apply_now: Optional flag to apply loaded config immediately :type apply_now: bool """ with open(fp, 'r') as f: config = json.load(f)["config"] _validate_config(config) self._target = config if apply_now: self._apply_target_config() return config
[docs] def read(self): """Read values from device""" if self._stream_mode: return next(self._stream_buffer) # TODO: print missed packets to log else: device_ret = self._device.getFeedback(self._assembled_read) return [f(x) for f, x in zip(self._read_functions, device_ret)]
[docs] def command(self, assembled_commands): """ Send commands to device low-level interface :param assembled_commands: assembled list of device operations :type assembled_commands: list """ self._device.getFeedback(*assembled_commands)
[docs] def stream(self): """Open high-rate read stream""" if self.mode == "stream": self._device.streamStart() self._stream_buffer = self._device.streamData(convert=False) else: print(str(self) + " not configured for streaming") # TODO: print to log
[docs] def process_stream_data(self, stream_data): """Apply device calibrations and reformat stream return""" stream_data_processed = [self._device.processStreamData(x["result"]) for x in stream_data] columns = self.channels n = sum([len(x[columns[0]]) for x in stream_data_processed]) dt = 1. / self.config["cycle_config"]["stream_rate"] times = arange(0.0, n * dt, dt) df = DataFrame(columns=columns, index=times).T i = 0 for packet in stream_data_processed: packet_df = DataFrame.from_dict(packet, orient="index") for j in packet_df: df[times[i]] = packet_df[j] i += 1 del packet_df del packet df = df.T return df
[docs] def close(self): """ 1. End stream 2. Return to default settings 3. Close connection to device """ try: if self._stream_buffer is not None: self._device.streamStop() else: self.easy_command({c: 0.0 for c in self.command_channels}) self._apply_target_config() finally: self._device.close()
[docs] def assemble_command_table(self, command_table): """ Assemble a command block into device-friendly commands :param command_table: Time-series table of commands :type command_table: pandas.DataFrame """ # Interpret table from standard format channels = [c[0] for c in command_table.columns] command_table.columns = channels # Eliminate multi-indexing self.command_channels += [c for c in channels if c not in self.command_channels] # Assemble command for each time step times = command_table.index.values assembled_commands = [[]] * len(times) for i, t in enumerate(times): assembled_commands[i] = self._assemble_commands(command_table.T[t].to_dict()) return times, assembled_commands
[docs] def easy_command(self, command_dict): """ !!SLOW!! Intended for manually commanding channels that may not be on the nominal list to reassert :param command_dict: Commands indexed by channel :type command_dict: dict """ self.command(self._assemble_commands(command_dict, force=True))
[docs] def get_config(self, query=False): """ Return current config as json :param query: Optional flag to get config directly from device instead of interface class :type query: bool """ if query: self._query_config() json_dict = {} json_dict.update({"device_info": self.device_info, "config": self.config, "calibration_data": self.calibration_data}) return json.dumps(json_dict, indent=2)
[docs] def save_config(self, dst="./"): """ Save json of pin config, device info, and calibrations :param dst: destination directory :type dst: str """ json_string = self.get_config() with open(path.join(dst, "config.json"), "w+") as f: f.write(json_string)
def _assemble_commands(self, command_dict, force=False): """ Apply commanded values to hardware :param command_dict: Commands indexed by channel :type command_dict: dict :param force: Optional flag to skip checking channel list :type force: bool :return: Commands in the format accepted by the device :type return: list """ if not force: assert set(command_dict.keys()) == set(self.command_channels) # All present? assembled_commands = [] for channel, cmd in command_dict.items(): if channel[0] == "D": # DAC pins dac_number = int(channel[3]) dac_bits = self._device.voltageToDACBits(cmd, dacNumber=dac_number, is16Bits=True) assembled_commands.append(u3.DAC16(Dac=dac_number, Value=dac_bits)) elif channel[1:3] == "IO": # FIO, EIO, or CIO pins io_number = getattr(u3, channel) # assembled_commands.append(u3.BitDirWrite(io_number, 1)) # Set output mode assembled_commands.append(u3.BitStateWrite(io_number, int(cmd))) # Set command else: print("Unable to command unrecognized channel {}".format(channel)) return assembled_commands def _query_config(self): """ Convert actionable parts of return of U3's default config format to more general format """ # Get and format I/O bitmasks and direct config values device_config = self._device.configU3() bitmasks = {state: format(device_config[state], fmt) for state, fmt in LABJACK_U3_IO_BITMASK_FORMATS.items()} io_config = {} for state, mask in bitmasks.items(): pin_group = state[:3] state_type = state[3:] for i in range(len(bitmasks[state])): pin = pin_group + str(i) if pin not in io_config.keys(): io_config[pin] = {} io_config[pin].update({LABJACK_U3_IO_CONFIG_MAP[state_type]: int(mask[i])}) direct_config = {key: device_config[key] for key in LABJACK_U3_DIRECT_CONFIG_DEFAULT.keys()} device_info = {key: value for key, value in device_config.items() if # Collect anything not already captured key not in LABJACK_U3_IO_BITMASK_FORMATS.keys() and key not in LABJACK_U3_DIRECT_CONFIG_DEFAULT.keys()} return io_config, direct_config, device_info, self._device.getCalibrationData() def _apply_target_config(self): """ Apply desired state & confirm result """ device_formatted_config = {} # Config formatted as needed by U3.configU3() # Apply target read config # TODO: add digital pins cycle_config = self._target["cycle_config"] self.mode = cycle_config["mode"] self._stream_mode = True if self.mode == "stream" else False # Process channel list long_settle, quick_sample = (False, True) # Use quickSample mode by default num_channels = len(cycle_config["channel_list"].items()) positive_channels = [] negative_channels = [] for channel, ref in cycle_config["channel_list"].items(): ref = ref if ref != "" else "Single-Ended" self.channels.append(channel) self.ref_channels.append(ref) p = LABJACK_U3_AIN_CHANNEL_NUMBERS[channel] n = LABJACK_U3_AIN_REF_NUMBERS[ref] n = 30 if n == 32 else n # U3-HV SPECIFIC HANDLING OF HIGH VOLTAGE AIN positive_channels.append(p) negative_channels.append(n) stream_config_formatted = { "Resolution": 3, "ScanFrequency": cycle_config["stream_rate"], "PChannels": positive_channels, # Positive channels "NChannels": negative_channels, # Negative/reference channels "NumChannels": num_channels} for p, n in zip(positive_channels, negative_channels): self._assembled_read.append(u3.AIN(p, n, long_settle, quick_sample)) # Bits to communicate with device self._read_functions.append(self._assemble_analog_conversion(p, n)) # Conversion to voltage # Interpret I/O config config_map_inverted = {value: key for key, value in LABJACK_U3_IO_CONFIG_MAP.items()} bitmasks_as_int = {state: format(0, fmt) for state, fmt in LABJACK_U3_IO_BITMASK_FORMATS.items()} for pin, pin_config in self._target["io_config"].items(): pin_group = pin[0:3] # Like FIO pin_number = int(pin[3:]) # 0-7 for key, value in pin_config.items(): state = ''.join([pin_group, config_map_inverted[key]]) # like FIO + Analog -> FIOAnalog bitmasks_as_int[state] = replace_char(bitmasks_as_int[state], pin_number, str(value)) for key, value in bitmasks_as_int.items(): bitmasks_as_int[key] = int(value[::-1], 2) # Convert back to int from string representation of bitmask device_formatted_config.update(bitmasks_as_int) # Interpret direct config direct_config_formatted = copy(self._target["direct_config"]) for analog_out_channel in ["DAC0", "DAC1"]: direct_config_formatted[analog_out_channel] = self._device.voltageToDACBits( self._target["direct_config"][analog_out_channel], dacNumber=int(analog_out_channel[-1]), is16Bits=True) device_formatted_config.update(direct_config_formatted) # Communicate with device if self.mode == "stream": self._device.configU3() self._device.configIO(FIOAnalog=255, EIOAnalog=255) self._device.streamConfig(**stream_config_formatted) else: self._device.configU3(**device_formatted_config) self.config = copy(self._target) def _assemble_analog_conversion(self, positive_channel, negative_channel): """ Partial reimplementation of u3.U3.getAIN() """ single_ended = True if negative_channel != 31: single_ended = False low_voltage = True try: if self._device.isHV and positive_channel < 4: low_voltage = False except AttributeError: pass # Build function that will apply calibrations in the loop high_voltage_mode = True if negative_channel == 30 else False # U3-HV SPECIFIC HANDLING OF HIGH VOLTAGE AIN calibration_function = partial(self._device.binaryToCalibratedAnalogVoltage, isLowVoltage=low_voltage, isSingleEnded=single_ended, isSpecialSetting=high_voltage_mode, channelNumber=positive_channel) return calibration_function def __str__(self): """ Return a unique identifier for the DAQ hardware """ return "{}_SN_{}".format(self.device_info["DeviceName"], self.device_info["SerialNumber"])
def _compare_keys(d_present, d_expected): """ Compare keys between two dictionaries """ expected_keys = d_expected.keys() present_keys = d_present.keys() extra_keys = list(set(present_keys) - set(expected_keys)) missing_keys = list(set(expected_keys) - set(present_keys)) return extra_keys, missing_keys def _validate_io_config(io_config): """ Make sure io_config dict is in the right format """ # Make sure all pins are addressed extra_keys, missing_keys = _compare_keys(io_config, LABJACK_U3_IO_CONFIG_DEFAULT) if any(extra_keys): raise ValueError("Unknown pin names: {}".format(extra_keys)) elif any(missing_keys): raise ValueError("Missing config for pins: {}".format(missing_keys)) # Compare each pin's config for pin, pin_config in io_config.items(): extra_keys, missing_keys = _compare_keys(pin_config, LABJACK_U3_IO_CONFIG_DEFAULT[pin]) if any(extra_keys): raise ValueError("{} has unknown config for {}".format(pin, extra_keys)) elif any(missing_keys): raise ValueError("{} is missing config for {}".format(pin, missing_keys)) for key, value in pin_config.items(): type_present = type(value) type_expected = type(LABJACK_U3_IO_CONFIG_DEFAULT[pin][key]) if type_present is not type_expected: raise TypeError( "Expecting {}[{}] to be of type {}; got type {}".format(pin, key, type_expected, type_present)) def _validate_direct_or_cycle_config(config, default, config_type=""): """ Make sure direct_config is in the right format """ extra_keys, missing_keys = _compare_keys(config, default) if any(extra_keys): raise ValueError("Unknown parameter in {} config: {}".format(config_type, extra_keys)) elif any(missing_keys): raise ValueError("Missing parameter in {} config: {}".format(config_type, missing_keys)) for key, value in config.items(): type_present = type(value) type_expected = type(default[key]) if type_present is not type_expected: raise TypeError( "Expecting {} to be of type {}; got type {}".format(key, type_expected, type_present)) def _validate_config(config): """ Make sure I/O bitmasks, direct config values, and stream config are in the right format """ io_config = config["io_config"] direct_config = config["direct_config"] cycle_config = config["cycle_config"] _validate_io_config(io_config) _validate_direct_or_cycle_config(direct_config, default=LABJACK_U3_DIRECT_CONFIG_DEFAULT, config_type="direct") _validate_direct_or_cycle_config(cycle_config, default=LABJACK_U3_CYCLE_CONFIG_DEFAULT, config_type="stream")
[docs]def replace_char(string, ind, char): if ind <= len(string) - 1: return string[:ind] + char + string[ind + 1:] else: return string[:-1] + char