""" Automation and data acquisition """
import json
import networkx as nx
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from multiprocessing import Process, Pipe, Manager
from os import path
from time import sleep, time
from scipy.interpolate import interp1d
OPERATOR_SLEEP_DT_MULTIPLIER_DEFAULT = 0.001 # Target 0.1% of cycle time; expect 0.5% relative timing accuracy
[docs]class Operator:
""" Controls DAQ hardware """
def __init__(self, interface, config_dir, sleep_dt=None):
self.block_graph = nx.DiGraph()
self._interface = interface
self._allocator = Allocator()
self._allocator_data_conn, self._allocator_request_conn = (None, None)
self._config_dir = config_dir
self._blocks = {}
self._sleep_dt = sleep_dt
self._entry_point = None
self._operator_config = None
self._load_operator() # Load resources from disk
self._validate_operator()
[docs] def run(self, memory_margin=1.2):
""" Run automated process """
block_names = [x for x in self._blocks.keys()]
internal_channels = ["time", "cycle_timing_error", "cycle_timing_margin"] + \
["block_time_{}".format(block) for block in block_names]
# Gratuitous intermediate variables to avoid in-loop math and assignments
n_data_channels = len(self._interface.channels)
n_internal_channels = len(internal_channels)
n_channels = n_data_channels + n_internal_channels
block = self._blocks[self._entry_point]
self._allocator_data_conn, self._allocator_request_conn = self._allocator.open(n_channels)
self._allocate(int(block.len * memory_margin))
op_start_time = time() + self.control_dt
next_time = op_start_time # Start one cycle from now
block_count = 0
print("Starting control loop")
while block is not None:
try:
self._allocate(int(block.len_longest_child * memory_margin))
block.reset() # Rebuild iterator; this allows cycles in the block graph
block_time_ind = 3 + block.index # Where in the output does block_time go?
for i, t, cmd in block:
sample = [np.nan] * n_channels # Initialize all channels to NaN
sample[block_time_ind] = t
sample[2] = next_time - time() # "cycle_timing_margin"
sample[0], sample[1] = wait(next_time, self._sleep_dt) # "time", "cycle_timing_error"
sample[n_internal_channels:] = self._interface.read() # Sample data
# TODO: Apply calibrations
# TODO: Run calculations
self._interface.command(cmd) # Send commands
# TODO: Add commands to output
self._store(sample) # Send sample to allocator
# TODO: Check for conditional transitions
next_time += self.control_dt
block = block.next_block
block_count += 1
except KeyboardInterrupt:
break
data = self._allocator.close() # Close the allocator and get its stored data
columns = internal_channels + self._interface.channels
data = pd.DataFrame(data, columns=columns)
data["time"] -= op_start_time - self._blocks[self._entry_point].start_time # Offset from system time
data.set_index("time", inplace=True)
data.dropna(inplace=True, how="all")
return data
[docs] def plot_block_graph(self, **kwargs):
""" Display a chart of the block graph """
f = plt.figure(figsize=kwargs.get("figsize", (10, 6)))
labels = {node: str(node) for node in self.block_graph.nodes}
default_kwargs = dict(labels=labels, arrowsize=30, font_size=18, node_shape="s",
node_color="lightgray", node_size=10000, width=3.)
default_kwargs.update(kwargs)
nx.draw_circular(self.block_graph, **default_kwargs)
return f
[docs] def save_block_graph_plot(self, dst="./"):
""" Save a static image of the block graph """
f = self.plot_block_graph()
plt.savefig(path.join(dst, "block_graph.png"))
plt.close(f)
def _allocate(self, n):
""" Request allocator to create a chunk of length n """
if self._allocator_request_conn is not None:
self._allocator_request_conn.send(n)
def _store(self, sample):
""" Send data sample to allocator """
if self._allocator_data_conn is not None:
self._allocator_data_conn.send(sample)
def _load_operator(self):
""" Assemble blocks from block graph and command tables """
# Get operation config
with open(path.join(self._config_dir, "operator.json")) as f:
self._operator_config = json.load(f)
self.control_dt = 1. / self._operator_config["control_rate"]
self._sleep_dt = self._sleep_dt or self.control_dt * OPERATOR_SLEEP_DT_MULTIPLIER_DEFAULT
# Configure control blocks
self._entry_point = self._operator_config["entry_point"]
for block_name, config in self._operator_config["blocks"].items():
self._blocks[block_name] = self._load_block(block_name)
self.block_graph.add_node(self._blocks[block_name])
# Populate connections between blocks
for block_name, block in self._blocks.items():
next_block_name = self._operator_config["blocks"][block_name]["next_block"]
block.next_block = None if next_block_name == "" else self._blocks[next_block_name]
for i, item in enumerate(self._blocks.items()):
block_name, block = item
if block.next_block is not None:
self.block_graph.add_edge(block, block.next_block)
block.index = i
# Find longest successor of each block to advise memory allocation
for block_name, block in self._blocks.items():
successors = self.block_graph.successors(block)
block.len_longest_child = max([b.len for b in successors] + [0])
def _load_block(self, name):
""" Build a Block object from inputs """
command_table_path = path.join(self._config_dir, "blocks", name + ".csv")
try:
command_table = self._load_command_table(command_table_path)
except FileNotFoundError as e:
print("CSV command table for block \"{}\" not found at {}".format(name, command_table_path))
raise e
times, commands = self._interface.assemble_command_table(command_table)
return Block(name, times, commands)
def _load_command_table(self, src, tol=1e-2):
"""
Load, pad, and interpolate command table
:param src: Path to .csv command table
:type src: str
:param tol: Relative tolerance on command timing for interpolation
:type tol: float
"""
dt = self.control_dt
# Load base table
command_table = pd.read_csv(src, header=[0, 1], index_col=0)
command_table.fillna(method="pad", inplace=True) # Where unspecified, hold most recent command
if any(np.diff(command_table.index.values) < dt * (1. - tol)):
raise IndexError("Command table at %s contains timesteps smaller than command dt" % src)
# Resample time to command rate
time_ = command_table.index.values
time_upsampled = np.arange(min(time_), max(time_) + dt, dt)
# Pin commands to nearest command cycle
command_table.index = [time_upsampled[np.argmin(np.abs(time_upsampled - t))] for t in time_]
# Resample commanded values to command rate
command_table_upsampled = pd.DataFrame(columns=command_table.columns, index=time_upsampled)
for c in command_table.columns:
channel, interp_method = c
command_interpolator = interp1d(command_table.index, command_table[c], kind=interp_method)
command_table_upsampled[channel] = command_interpolator(time_upsampled)
return command_table_upsampled
def _validate_operator(self):
""" Check for errors in operation definition """
# TODO: check for multiple nodes with no descendants (is there more than one possible end state?)
# TODO: check for cycles with no exit criteria (this should just be a warning)
# TODO: check for unused files
# TODO: check that all channels are commanded in entry point and exit point
# TODO: warn for blocks other than entry point with negative time
pass
[docs]class Block:
""" A block of commands """
def __init__(self, name, times, commands):
self.name = name
self.times = times
self.commands = commands
self.next_block = None
self.start_time = min(self.times)
self.end_time = max(self.times)
self.duration = self.end_time - self.start_time
self.len = len(self.times)
self.len_longest_child = 0
self.index = None
self.indices = range(len(self.times))
self.iterator = (x for x in zip(self.indices, self.times, self.commands))
def __str__(self):
return "".join([self.name, "\n", "{:.2f}s".format(self.duration)]) # For graph embedding
def __iter__(self):
return self
def __next__(self):
return next(self.iterator)
[docs] def reset(self):
"""Rebuild iterator"""
self.iterator = (x for x in zip(self.indices, self.times, self.commands))
[docs]class Allocator:
""" Asynchronous memory manager """
def __init__(self):
self.manager = Manager()
self.data_chunks = self.manager.list([])
self.listener = None
self.data_receiver, self.data_sender = (None, None)
self.request_receiver, self.request_sender = (None, None)
self.n_channels = None
[docs] def open(self, n_channels):
""" Start asynchronous process watching for incoming data and allocation requests """
if self.listener is None:
self.n_channels = n_channels
self.data_receiver, self.data_sender = Pipe(duplex=False)
self.request_receiver, self.request_sender = Pipe(duplex=False)
self.listener = Process(target=self.listen)
self.listener.start()
else:
print("daq.operation.Allocator already open")
return self.data_sender, self.request_sender
[docs] def close(self):
""" Tell the listener to shut down and return data """
if self.listener is not None:
self.request_sender.send(-1) # To signal end, send a request to allocate a negative-size chunk
self.listener.join()
self.data_receiver.close()
self.data_sender.close()
self.request_receiver.close()
self.request_sender.close()
self.listener = None
else:
print("daq.operation.Allocator not open")
self.data_chunks.reverse() # First in, first out
len_total = sum([len(x) for x in self.data_chunks])
output = np.empty((len_total, self.n_channels))
i = 0
while len(self.data_chunks) > 0:
chunk = self.data_chunks.pop()
for row in chunk:
output[i][0:] = row
i += 1
del chunk # Otherwise we use 2x the total memory
return output
[docs] def listen(self):
""" Data listener and memory allocator loop """
closing = False
chunk_ind = 0
data_ind = 0
chunk_sizes = []
request_sizes = []
while True:
try:
# Allocate
if self.request_receiver.poll():
request_sizes.append(self.request_receiver.recv())
space_remaining = sum(chunk_sizes[chunk_ind:]) - data_ind
space_required = sum(request_sizes[-2:]) # Only allocate one chunk ahead
memory_needed = space_required > space_remaining # Prevent memory leakage due to block cycles
closing = request_sizes[-1] == -1 # Are we closing the allocator?
if memory_needed and not closing: # ...then allocate
self.data_chunks.append(self.manager.list(*[[[np.nan] * self.n_channels] * request_sizes[-1]]))
chunk_sizes.append(request_sizes[-1]) # This request is now a chunk of data
# Collect
if self.data_receiver.poll():
# Go to next chunk?
if (data_ind > chunk_sizes[chunk_ind] - 1) and (len(self.data_chunks) > chunk_ind + 1):
chunk_ind += 1
data_ind = 0
# Store sample
if data_ind < chunk_sizes[chunk_ind]: # Nominal mode
self.data_chunks[chunk_ind][data_ind] = self.data_receiver.recv() # out-of-place reassignment
else: # Emergency mode TODO: stream this to a log instead
print("Allocating with append. This is hazardous because it can be very slow")
self.data_chunks[chunk_ind].append(self.data_receiver.recv())
data_ind += 1
except KeyboardInterrupt:
closing = True
if closing:
break
[docs]def wait(target, sleep_dt):
"""
Sleep until time within (dt) of (target)
"""
t = time()
error = t - target
while error < -sleep_dt:
sleep(sleep_dt)
t = time()
error = t - target
return t, error