"""
Plans that might be useful at the APS when using BlueSky
.. autosummary::
~nscan
~ProcedureRegistry
~run_blocker_in_plan
~run_in_thread
~snapshot
~TuneAxis
~tune_axes
"""
# Copyright (c) 2017-2018, UChicago Argonne, LLC. See LICENSE file.
from collections import OrderedDict
import datetime
import logging
import numpy as np
import sys
import threading
import time
from bluesky import preprocessors as bpp
from bluesky import plan_stubs as bps
from bluesky import plans as bp
from bluesky.callbacks.fitting import PeakStats
import ophyd
from ophyd import Device, Component, Signal
logger = logging.getLogger(__name__).addHandler(logging.NullHandler())
[docs]def run_in_thread(func):
"""
(decorator) run ``func`` in thread
USAGE::
@run_in_thread
def progress_reporting():
logger.debug("progress_reporting is starting")
# ...
#...
progress_reporting() # runs in separate thread
#...
"""
def wrapper(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return thread
return wrapper
[docs]def run_blocker_in_plan(blocker, *args, _poll_s_=0.01, _timeout_s_=None, **kwargs):
"""
plan: run blocking function ``blocker_(*args, **kwargs)`` from a Bluesky plan
PARAMETERS
blocker : func
function object to be called in a Bluesky plan
_poll_s_ : float
sleep interval in loop while waiting for completion
(default: 0.01)
_timeout_s_ : float
maximum time for completion
(default: `None` which means no timeout)
Example: use ``time.sleep`` as blocking function::
RE(run_blocker_in_plan(time.sleep, 2.14))
Example: in a plan, use ``time.sleep`` as blocking function::
def my_sleep(t=1.0):
yield from run_blocker_in_plan(time.sleep, t)
RE(my_sleep())
"""
status = ophyd.status.Status()
@run_in_thread
def _internal(blocking_function, *args, **kwargs):
blocking_function(*args, **kwargs)
status._finished(success=True, done=True)
if _timeout_s_ is not None:
t_expire = time.time() + _timeout_s_
# FIXME: how to keep this from running during summarize_plan()?
_internal(blocker, *args, **kwargs)
while not status.done:
if _timeout_s_ is not None:
if time.time() > t_expire:
status._finished(success=False, done=True)
break
yield from bps.sleep(_poll_s_)
return status
[docs]def nscan(detectors, *motor_sets, num=11, per_step=None, md=None):
"""
Scan over ``n`` variables moved together, each in equally spaced steps.
PARAMETERS
detectors : list
list of 'readable' objects
motor_sets : list
sequence of one or more groups of: motor, start, finish
motor : object
any 'settable' object (motor, temp controller, etc.)
start : float
starting position of motor
finish : float
ending position of motor
num : int
number of steps (default = 11)
per_step : callable, optional
hook for customizing action of inner loop (messages per step)
Expected signature: ``f(detectors, step_cache, pos_cache)``
md : dict, optional
metadata
See the `nscan()` example in a Jupyter notebook:
https://github.com/BCDA-APS/APS_BlueSky_tools/blob/master/docs/source/resources/demo_nscan.ipynb
"""
# TODO: Isn't there a similar plan in bluesky? At least reference it.
def take_n_at_a_time(args, n=2):
yield from zip(*[iter(args)]*n)
if len(motor_sets) < 3:
raise ValueError("must provide at least one movable")
if len(motor_sets) % 3 > 0:
raise ValueError("must provide sets of movable, start, finish")
motors = OrderedDict()
for m, s, f in take_n_at_a_time(motor_sets, n=3):
if not isinstance(s, (int, float)):
msg = "start={} ({}): is not a number".format(s, type(s))
raise ValueError(msg)
if not isinstance(f, (int, float)):
msg = "finish={} ({}): is not a number".format(f, type(f))
raise ValueError(msg)
motors[m.name] = dict(motor=m, start=s, finish=f,
steps=np.linspace(start=s, stop=f, num=num))
_md = {'detectors': [det.name for det in detectors],
'motors': [m for m in motors.keys()],
'num_points': num,
'num_intervals': num - 1,
'plan_args': {'detectors': list(map(repr, detectors)),
'num': num,
'motors': repr(motor_sets),
'per_step': repr(per_step)},
'plan_name': 'nscan',
'plan_pattern': 'linspace',
'hints': {},
'iso8601': datetime.datetime.now(),
}
_md.update(md or {})
try:
m = list(motors.keys())[0]
dimensions = [(motors[m]["motor"].hints['fields'], 'primary')]
except (AttributeError, KeyError):
pass
else:
_md['hints'].setdefault('dimensions', dimensions)
if per_step is None:
per_step = bps.one_nd_step
@bpp.stage_decorator(list(detectors)
+ [m["motor"] for m in motors.values()])
@bpp.run_decorator(md=_md)
def inner_scan():
for step in range(num):
step_cache, pos_cache = {}, {}
for m in motors.values():
next_pos = m["steps"][step]
m = m["motor"]
pos_cache[m] = m.read()[m.name]["value"]
step_cache[m] = next_pos
yield from per_step(detectors, step_cache, pos_cache)
return (yield from inner_scan())
[docs]def snapshot(obj_list, stream="primary", md=None):
"""
bluesky plan: record current values of list of ophyd signals
PARAMETERS
obj_list : list
list of ophyd Signal or EpicsSignal objects
stream : str
document stream, default: "primary"
md : dict
metadata
"""
from .__init__ import __version__
import bluesky
import databroker
import epics
from ophyd import EpicsSignal
import socket
import getpass
objects = []
for obj in obj_list:
# TODO: consider supporting Device objects
if isinstance(obj, (Signal, EpicsSignal)) and obj.connected:
objects.append(obj)
else:
if hasattr(obj, "pvname"):
nm = obj.pvname
else:
nm = obj.name
print(f"ignoring object: {nm}")
if len(objects) == 0:
raise ValueError("No signals to log.")
hostname = socket.gethostname() or 'localhost'
username = getpass.getuser() or 'bluesky_user'
# we want this metadata to appear
_md = dict(
plan_name = "snapshot",
plan_description = "archive snapshot of ophyd Signals (usually EPICS PVs)",
iso8601 = str(datetime.datetime.now()), # human-readable
hints = {},
software_versions = dict(
python = sys.version,
PyEpics = epics.__version__,
bluesky = bluesky.__version__,
ophyd = ophyd.__version__,
databroker = databroker.__version__,
APS_Bluesky_Tools = __version__,),
hostname = hostname,
username = username,
login_id = f"{username}@{hostname}",
)
# caller may have given us additional metadata
_md.update(md or {})
def _snap(md=None):
yield from bps.open_run(md)
yield from bps.create(name=stream)
for obj in objects:
# passive observation: DO NOT TRIGGER, only read
yield from bps.read(obj)
yield from bps.save()
yield from bps.close_run()
return (yield from _snap(md=_md))
# def sscan(*args, md=None, **kw): # TODO: planned
# """
# gather data form the sscan record and emit documents
#
# Should this operate a complete scan using the sscan record?
# """
# raise NotImplemented("this is only planned")
[docs]class TuneAxis(object):
"""
tune an axis with a signal
This class provides a tuning object so that a Device or other entity
may gain its own tuning process, keeping track of the particulars
needed to tune this device again. For example, one could add
a tuner to a motor stage::
motor = EpicsMotor("xxx:motor", "motor")
motor.tuner = TuneAxis([det], motor)
Then the ``motor`` could be tuned individually::
RE(motor.tuner.tune(md={"activity": "tuning"}))
or the :meth:`tune()` could be part of a plan with other steps.
Example::
tuner = TuneAxis([det], axis)
live_table = LiveTable(["axis", "det"])
RE(tuner.multi_pass_tune(width=2, num=9), live_table)
RE(tuner.tune(width=0.05, num=9), live_table)
Also see the jupyter notebook referenced here:
:ref:`example_tuneaxis`.
.. autosummary::
~tune
~multi_pass_tune
~peak_detected
"""
_peak_choices_ = "cen com".split()
def __init__(self, signals, axis, signal_name=None):
self.signals = signals
self.signal_name = signal_name or signals[0].name
self.axis = axis
self.stats = {}
self.tune_ok = False
self.peaks = None
self.peak_choice = self._peak_choices_[0]
self.center = None
self.stats = []
# defaults
self.width = 1
self.num = 10
self.step_factor = 4
self.pass_max = 6
self.snake = True
[docs] def tune(self, width=None, num=None, md=None):
"""
BlueSky plan to execute one pass through the current scan range
Scan self.axis centered about current position from
``-width/2`` to ``+width/2`` with ``num`` observations.
If a peak was detected (default check is that max >= 4*min),
then set ``self.tune_ok = True``.
PARAMETERS
width : float
width of the tuning scan in the units of ``self.axis``
Default value in ``self.width`` (initially 1)
num : int
number of steps
Default value in ``self.num`` (initially 10)
md : dict, optional
metadata
"""
width = width or self.width
num = num or self.num
if self.peak_choice not in self._peak_choices_:
msg = "peak_choice must be one of {}, geave {}"
msg = msg.format(self._peak_choices_, self.peak_choice)
raise ValueError(msg)
initial_position = self.axis.position
final_position = initial_position # unless tuned
start = initial_position - width/2
finish = initial_position + width/2
self.tune_ok = False
tune_md = dict(
width = width,
initial_position = self.axis.position,
time_iso8601 = str(datetime.datetime.now()),
)
_md = {'tune_md': tune_md,
'plan_name': self.__class__.__name__ + '.tune',
'tune_parameters': dict(
num = num,
width = width,
initial_position = self.axis.position,
peak_choice = self.peak_choice,
x_axis = self.axis.name,
y_axis = self.signal_name,
),
'hints': dict(
dimensions = [
(
[self.axis.name],
'primary')]
)
}
_md.update(md or {})
if "pass_max" not in _md:
self.stats = []
self.peaks = PeakStats(x=self.axis.name, y=self.signal_name)
class Results(Device):
"""because bps.read() needs a Device or a Signal)"""
tune_ok = Component(Signal)
initial_position = Component(Signal)
final_position = Component(Signal)
center = Component(Signal)
# - - - - -
x = Component(Signal)
y = Component(Signal)
cen = Component(Signal)
com = Component(Signal)
fwhm = Component(Signal)
min = Component(Signal)
max = Component(Signal)
crossings = Component(Signal)
peakstats_attrs = "x y cen com fwhm min max crossings".split()
def report(self):
keys = self.peakstats_attrs + "tune_ok center initial_position final_position".split()
for key in keys:
print("{} : {}".format(key, getattr(self, key).value))
@bpp.subs_decorator(self.peaks)
def _scan(md=None):
yield from bps.open_run(md)
position_list = np.linspace(start, finish, num)
signal_list = list(self.signals)
signal_list += [self.axis,]
for pos in position_list:
yield from bps.mv(self.axis, pos)
yield from bps.trigger_and_read(signal_list)
final_position = initial_position
if self.peak_detected():
self.tune_ok = True
if self.peak_choice == "cen":
final_position = self.peaks.cen
elif self.peak_choice == "com":
final_position = self.peaks.com
else:
final_position = None
self.center = final_position
# add stream with results
# yield from add_results_stream()
stream_name = "PeakStats"
results = Results(name=stream_name)
for key in "tune_ok center".split():
getattr(results, key).put(getattr(self, key))
results.final_position.put(final_position)
results.initial_position.put(initial_position)
for key in results.peakstats_attrs:
v = getattr(self.peaks, key)
if key in ("crossings", "min", "max"):
v = np.array(v)
getattr(results, key).put(v)
yield from bps.create(name=stream_name)
yield from bps.read(results)
yield from bps.save()
yield from bps.mv(self.axis, final_position)
self.stats.append(self.peaks)
yield from bps.close_run()
results.report()
return (yield from _scan(md=_md))
[docs] def multi_pass_tune(self, width=None, step_factor=None,
num=None, pass_max=None, snake=None, md=None):
"""
BlueSky plan for tuning this axis with this signal
Execute multiple passes to refine the centroid determination.
Each subsequent pass will reduce the width of scan by ``step_factor``.
If ``snake=True`` then the scan direction will reverse with
each subsequent pass.
PARAMETERS
width : float
width of the tuning scan in the units of ``self.axis``
Default value in ``self.width`` (initially 1)
num : int
number of steps
Default value in ``self.num`` (initially 10)
step_factor : float
This reduces the width of the next tuning scan by the given factor.
Default value in ``self.step_factor`` (initially 4)
pass_max : int
Maximum number of passes to be executed (avoids runaway
scans when a centroid is not found).
Default value in ``self.pass_max`` (initially 10)
snake : bool
If ``True``, reverse scan direction on next pass.
Default value in ``self.snake`` (initially True)
md : dict, optional
metadata
"""
width = width or self.width
num = num or self.num
step_factor = step_factor or self.step_factor
snake = snake or self.snake
pass_max = pass_max or self.pass_max
self.stats = []
def _scan(width=1, step_factor=10, num=10, snake=True):
for _pass_number in range(pass_max):
_md = {'pass': _pass_number+1,
'pass_max': pass_max,
'plan_name': self.__class__.__name__ + '.multi_pass_tune',
}
_md.update(md or {})
yield from self.tune(width=width, num=num, md=_md)
if not self.tune_ok:
return
width /= step_factor
if snake:
width *= -1
return (
yield from _scan(
width=width, step_factor=step_factor, num=num, snake=snake))
[docs] def peak_detected(self):
"""
returns True if a peak was detected, otherwise False
The default algorithm identifies a peak when the maximum
value is four times the minimum value. Change this routine
by subclassing :class:`TuneAxis` and override :meth:`peak_detected`.
"""
if self.peaks is None:
return False
self.peaks.compute()
if self.peaks.max is None:
return False
ymax = self.peaks.max[-1]
ymin = self.peaks.min[-1]
return ymax > 4*ymin # this works for USAXS@APS
[docs]def tune_axes(axes):
"""
BlueSky plan to tune a list of axes in sequence
EXAMPLE
Sequentially, tune a list of preconfigured axes::
RE(tune_axes([mr, m2r, ar, a2r])
"""
for axis in axes:
yield from axis.tune()
[docs]class ProcedureRegistry(ophyd.Device):
"""
Procedure Registry
.. caution:: This Device may be relocated or removed entirely in future releases.
Its use is complicated and could lead to instability.
With many instruments, such as USAXS, there are several operating
modes to be used, each with its own setup code. This ophyd Device
should coordinate those modes so that the setup procedures can be called
either as part of a Bluesky plan or from the command line directly.
Assumes that users will write functions to setup a particular
operation or operating mode. The user-written functions may not
be appropriate to use in a plan directly since they might
make blocking calls. The ProcedureRegistry will call the function
in a thread (which is allowed to make blocking calls) and wait
for the thread to complete.
It is assumed that each user-written function will not return until
it is complete.
.. autosummary::
~dir
~add
~remove
~set
~put
EXAMPLE::
use_mode = ProcedureRegistry(name="use_mode")
def clearScalerNames():
for ch in scaler.channels.configuration_attrs:
if ch.find(".") < 0:
chan = scaler.channels.__getattribute__(ch)
chan.chname.put("")
def setMyScalerNames():
scaler.channels.chan01.chname.put("clock")
scaler.channels.chan02.chname.put("I0")
scaler.channels.chan03.chname.put("detector")
def useMyScalerNames(): # Bluesky plan
yield from bps.mv(
m1, 5,
use_mode, "clear",
)
yield from bps.mv(
m1, 0,
use_mode, "set",
)
def demo():
print(1)
m1.move(5)
print(2)
time.sleep(2)
print(3)
m1.move(0)
print(4)
use_mode.add(demo)
use_mode.add(clearScalerNames, "clear")
use_mode.add(setMyScalerNames, "set")
# use_mode.set("demo")
# use_mode.set("clear")
# RE(useMyScalerNames())
"""
busy = ophyd.Component(ophyd.Signal, value=False)
registry = {}
delay_s = 0
timeout_s = None
state = "__created__"
@property
def dir(self):
"""tuple of procedure names"""
return tuple(sorted(self.registry.keys()))
[docs] def add(self, procedure, proc_name=None):
"""
add procedure to registry
"""
#if procedure.__class__ == "function":
nm = proc_name or procedure.__name__
self.registry[nm] = procedure
[docs] def remove(self, procedure):
"""
remove procedure from registry
"""
if isinstance(procedure, str):
nm = procedure
else:
nm = procedure.__name__
if nm in self.registry:
del self.registry[nm]
[docs] def set(self, proc_name):
"""
run procedure in a thread, return once it is complete
proc_name (str) : name of registered procedure to be run
"""
if not isinstance(proc_name, str):
raise ValueError("expected a procedure name, not {}".format(proc_name))
if proc_name not in self.registry:
raise KeyError("unknown procedure name: "+proc_name)
if self.busy.value:
raise RuntimeError("busy now")
self.state = "__busy__"
status = ophyd.DeviceStatus(self)
@run_in_thread
def run_and_delay():
self.busy.put(True)
self.registry[proc_name]()
# optional delay
if self.delay_s > 0:
time.sleep(self.delay_s)
self.busy.put(False)
status._finished(success=True)
run_and_delay()
ophyd.status.wait(status, timeout=self.timeout_s)
self.state = proc_name
return status
[docs] def put(self, value): # TODO: risky?
"""replaces ophyd Device default put() behavior"""
self.set(value)