Source code for daq.operator

""" Automation and data acquisition """
import json
import networkx as nx
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from multiprocessing import Process, Pipe, Manager
from os import path
from time import sleep, time
from scipy.interpolate import interp1d

OPERATOR_SLEEP_DT_MULTIPLIER_DEFAULT = 0.001  # Target 0.1% of cycle time; expect 0.5% relative timing accuracy


[docs]class Operator: """ Controls DAQ hardware """ def __init__(self, interface, config_dir, sleep_dt=None): self.block_graph = nx.DiGraph() self._interface = interface self._allocator = Allocator() self._allocator_data_conn, self._allocator_request_conn = (None, None) self._config_dir = config_dir self._blocks = {} self._sleep_dt = sleep_dt self._entry_point = None self._operator_config = None self._load_operator() # Load resources from disk self._validate_operator()
[docs] def run(self, memory_margin=1.2): """ Run automated process """ block_names = [x for x in self._blocks.keys()] internal_channels = ["time", "cycle_timing_error", "cycle_timing_margin"] + \ ["block_time_{}".format(block) for block in block_names] # Gratuitous intermediate variables to avoid in-loop math and assignments n_data_channels = len(self._interface.channels) n_internal_channels = len(internal_channels) n_channels = n_data_channels + n_internal_channels block = self._blocks[self._entry_point] self._allocator_data_conn, self._allocator_request_conn = self._allocator.open(n_channels) self._allocate(int(block.len * memory_margin)) op_start_time = time() + self.control_dt next_time = op_start_time # Start one cycle from now block_count = 0 print("Starting control loop") while block is not None: try: self._allocate(int(block.len_longest_child * memory_margin)) block.reset() # Rebuild iterator; this allows cycles in the block graph block_time_ind = 3 + block.index # Where in the output does block_time go? for i, t, cmd in block: sample = [np.nan] * n_channels # Initialize all channels to NaN sample[block_time_ind] = t sample[2] = next_time - time() # "cycle_timing_margin" sample[0], sample[1] = wait(next_time, self._sleep_dt) # "time", "cycle_timing_error" sample[n_internal_channels:] = self._interface.read() # Sample data # TODO: Apply calibrations # TODO: Run calculations self._interface.command(cmd) # Send commands # TODO: Add commands to output self._store(sample) # Send sample to allocator # TODO: Check for conditional transitions next_time += self.control_dt block = block.next_block block_count += 1 except KeyboardInterrupt: break data = self._allocator.close() # Close the allocator and get its stored data columns = internal_channels + self._interface.channels data = pd.DataFrame(data, columns=columns) data["time"] -= op_start_time - self._blocks[self._entry_point].start_time # Offset from system time data.set_index("time", inplace=True) data.dropna(inplace=True, how="all") return data
[docs] def plot_block_graph(self, **kwargs): """ Display a chart of the block graph """ f = plt.figure(figsize=kwargs.get("figsize", (10, 6))) labels = {node: str(node) for node in self.block_graph.nodes} default_kwargs = dict(labels=labels, arrowsize=30, font_size=18, node_shape="s", node_color="lightgray", node_size=10000, width=3.) default_kwargs.update(kwargs) nx.draw_circular(self.block_graph, **default_kwargs) return f
[docs] def save_block_graph_plot(self, dst="./"): """ Save a static image of the block graph """ f = self.plot_block_graph() plt.savefig(path.join(dst, "block_graph.png")) plt.close(f)
def _allocate(self, n): """ Request allocator to create a chunk of length n """ if self._allocator_request_conn is not None: self._allocator_request_conn.send(n) def _store(self, sample): """ Send data sample to allocator """ if self._allocator_data_conn is not None: self._allocator_data_conn.send(sample) def _load_operator(self): """ Assemble blocks from block graph and command tables """ # Get operation config with open(path.join(self._config_dir, "operator.json")) as f: self._operator_config = json.load(f) self.control_dt = 1. / self._operator_config["control_rate"] self._sleep_dt = self._sleep_dt or self.control_dt * OPERATOR_SLEEP_DT_MULTIPLIER_DEFAULT # Configure control blocks self._entry_point = self._operator_config["entry_point"] for block_name, config in self._operator_config["blocks"].items(): self._blocks[block_name] = self._load_block(block_name) self.block_graph.add_node(self._blocks[block_name]) # Populate connections between blocks for block_name, block in self._blocks.items(): next_block_name = self._operator_config["blocks"][block_name]["next_block"] block.next_block = None if next_block_name == "" else self._blocks[next_block_name] for i, item in enumerate(self._blocks.items()): block_name, block = item if block.next_block is not None: self.block_graph.add_edge(block, block.next_block) block.index = i # Find longest successor of each block to advise memory allocation for block_name, block in self._blocks.items(): successors = self.block_graph.successors(block) block.len_longest_child = max([b.len for b in successors] + [0]) def _load_block(self, name): """ Build a Block object from inputs """ command_table_path = path.join(self._config_dir, "blocks", name + ".csv") try: command_table = self._load_command_table(command_table_path) except FileNotFoundError as e: print("CSV command table for block \"{}\" not found at {}".format(name, command_table_path)) raise e times, commands = self._interface.assemble_command_table(command_table) return Block(name, times, commands) def _load_command_table(self, src, tol=1e-2): """ Load, pad, and interpolate command table :param src: Path to .csv command table :type src: str :param tol: Relative tolerance on command timing for interpolation :type tol: float """ dt = self.control_dt # Load base table command_table = pd.read_csv(src, header=[0, 1], index_col=0) command_table.fillna(method="pad", inplace=True) # Where unspecified, hold most recent command if any(np.diff(command_table.index.values) < dt * (1. - tol)): raise IndexError("Command table at %s contains timesteps smaller than command dt" % src) # Resample time to command rate time_ = command_table.index.values time_upsampled = np.arange(min(time_), max(time_) + dt, dt) # Pin commands to nearest command cycle command_table.index = [time_upsampled[np.argmin(np.abs(time_upsampled - t))] for t in time_] # Resample commanded values to command rate command_table_upsampled = pd.DataFrame(columns=command_table.columns, index=time_upsampled) for c in command_table.columns: channel, interp_method = c command_interpolator = interp1d(command_table.index, command_table[c], kind=interp_method) command_table_upsampled[channel] = command_interpolator(time_upsampled) return command_table_upsampled def _validate_operator(self): """ Check for errors in operation definition """ # TODO: check for multiple nodes with no descendants (is there more than one possible end state?) # TODO: check for cycles with no exit criteria (this should just be a warning) # TODO: check for unused files # TODO: check that all channels are commanded in entry point and exit point # TODO: warn for blocks other than entry point with negative time pass
[docs]class Block: """ A block of commands """ def __init__(self, name, times, commands): self.name = name self.times = times self.commands = commands self.next_block = None self.start_time = min(self.times) self.end_time = max(self.times) self.duration = self.end_time - self.start_time self.len = len(self.times) self.len_longest_child = 0 self.index = None self.indices = range(len(self.times)) self.iterator = (x for x in zip(self.indices, self.times, self.commands)) def __str__(self): return "".join([self.name, "\n", "{:.2f}s".format(self.duration)]) # For graph embedding def __iter__(self): return self def __next__(self): return next(self.iterator)
[docs] def reset(self): """Rebuild iterator""" self.iterator = (x for x in zip(self.indices, self.times, self.commands))
[docs]class Allocator: """ Asynchronous memory manager """ def __init__(self): self.manager = Manager() self.data_chunks = self.manager.list([]) self.listener = None self.data_receiver, self.data_sender = (None, None) self.request_receiver, self.request_sender = (None, None) self.n_channels = None
[docs] def open(self, n_channels): """ Start asynchronous process watching for incoming data and allocation requests """ if self.listener is None: self.n_channels = n_channels self.data_receiver, self.data_sender = Pipe(duplex=False) self.request_receiver, self.request_sender = Pipe(duplex=False) self.listener = Process(target=self.listen) self.listener.start() else: print("daq.operation.Allocator already open") return self.data_sender, self.request_sender
[docs] def close(self): """ Tell the listener to shut down and return data """ if self.listener is not None: self.request_sender.send(-1) # To signal end, send a request to allocate a negative-size chunk self.listener.join() self.data_receiver.close() self.data_sender.close() self.request_receiver.close() self.request_sender.close() self.listener = None else: print("daq.operation.Allocator not open") self.data_chunks.reverse() # First in, first out len_total = sum([len(x) for x in self.data_chunks]) output = np.empty((len_total, self.n_channels)) i = 0 while len(self.data_chunks) > 0: chunk = self.data_chunks.pop() for row in chunk: output[i][0:] = row i += 1 del chunk # Otherwise we use 2x the total memory return output
[docs] def listen(self): """ Data listener and memory allocator loop """ closing = False chunk_ind = 0 data_ind = 0 chunk_sizes = [] request_sizes = [] while True: try: # Allocate if self.request_receiver.poll(): request_sizes.append(self.request_receiver.recv()) space_remaining = sum(chunk_sizes[chunk_ind:]) - data_ind space_required = sum(request_sizes[-2:]) # Only allocate one chunk ahead memory_needed = space_required > space_remaining # Prevent memory leakage due to block cycles closing = request_sizes[-1] == -1 # Are we closing the allocator? if memory_needed and not closing: # ...then allocate self.data_chunks.append(self.manager.list(*[[[np.nan] * self.n_channels] * request_sizes[-1]])) chunk_sizes.append(request_sizes[-1]) # This request is now a chunk of data # Collect if self.data_receiver.poll(): # Go to next chunk? if (data_ind > chunk_sizes[chunk_ind] - 1) and (len(self.data_chunks) > chunk_ind + 1): chunk_ind += 1 data_ind = 0 # Store sample if data_ind < chunk_sizes[chunk_ind]: # Nominal mode self.data_chunks[chunk_ind][data_ind] = self.data_receiver.recv() # out-of-place reassignment else: # Emergency mode TODO: stream this to a log instead print("Allocating with append. This is hazardous because it can be very slow") self.data_chunks[chunk_ind].append(self.data_receiver.recv()) data_ind += 1 except KeyboardInterrupt: closing = True if closing: break
[docs]def wait(target, sleep_dt): """ Sleep until time within (dt) of (target) """ t = time() error = t - target while error < -sleep_dt: sleep(sleep_dt) t = time() error = t - target return t, error