Plans

Plans that might be useful at the APS when using BlueSky

nscan(detectors, *motor_sets[, num, …]) Scan over n variables moved together, each in equally spaced steps.
ProcedureRegistry(*args, **kwargs) Procedure Registry
run_blocker_in_plan(blocker, *args[, …]) plan: run blocking function blocker_(*args, **kwargs) from a Bluesky plan
run_in_thread(func) (decorator) run func in thread
snapshot(obj_list[, stream, md]) bluesky plan: record current values of list of ophyd signals
TuneAxis(signals, axis[, signal_name]) tune an axis with a signal
tune_axes(axes) BlueSky plan to tune a list of axes in sequence
class APS_BlueSky_tools.plans.ProcedureRegistry(*args, **kwargs)[source]

Procedure Registry

Caution

This Device may be relocated or removed entirely in future releases. Its use is complicated and could lead to instability.

With many instruments, such as USAXS, there are several operating modes to be used, each with its own setup code. This ophyd Device should coordinate those modes so that the setup procedures can be called either as part of a Bluesky plan or from the command line directly.

Assumes that users will write functions to setup a particular operation or operating mode. The user-written functions may not be appropriate to use in a plan directly since they might make blocking calls. The ProcedureRegistry will call the function in a thread (which is allowed to make blocking calls) and wait for the thread to complete.

It is assumed that each user-written function will not return until it is complete.

dir tuple of procedure names
add(procedure[, proc_name]) add procedure to registry
remove(procedure) remove procedure from registry
set(proc_name) run procedure in a thread, return once it is complete
put(value) replaces ophyd Device default put() behavior

EXAMPLE:

use_mode = ProcedureRegistry(name="use_mode")

def clearScalerNames():
    for ch in scaler.channels.configuration_attrs:
        if ch.find(".") < 0:
            chan = scaler.channels.__getattribute__(ch)
            chan.chname.put("")

def setMyScalerNames():
    scaler.channels.chan01.chname.put("clock")
    scaler.channels.chan02.chname.put("I0")
    scaler.channels.chan03.chname.put("detector")

def useMyScalerNames(): # Bluesky plan
    yield from bps.mv(
        m1, 5,
        use_mode, "clear",
    )
    yield from bps.mv(
        m1, 0,
        use_mode, "set",
    )

def demo():
    print(1)
    m1.move(5)
    print(2)
    time.sleep(2)
    print(3)
    m1.move(0)
    print(4)


use_mode.add(demo)
use_mode.add(clearScalerNames, "clear")
use_mode.add(setMyScalerNames, "set")
# use_mode.set("demo")
# use_mode.set("clear")
# RE(useMyScalerNames())
add(procedure, proc_name=None)[source]

add procedure to registry

dir

tuple of procedure names

put(value)[source]

replaces ophyd Device default put() behavior

remove(procedure)[source]

remove procedure from registry

set(proc_name)[source]

run procedure in a thread, return once it is complete

proc_name (str) : name of registered procedure to be run

class APS_BlueSky_tools.plans.TuneAxis(signals, axis, signal_name=None)[source]

tune an axis with a signal

This class provides a tuning object so that a Device or other entity may gain its own tuning process, keeping track of the particulars needed to tune this device again. For example, one could add a tuner to a motor stage:

motor = EpicsMotor("xxx:motor", "motor")
motor.tuner = TuneAxis([det], motor)

Then the motor could be tuned individually:

RE(motor.tuner.tune(md={"activity": "tuning"}))

or the tune() could be part of a plan with other steps.

Example:

tuner = TuneAxis([det], axis)
live_table = LiveTable(["axis", "det"])
RE(tuner.multi_pass_tune(width=2, num=9), live_table)
RE(tuner.tune(width=0.05, num=9), live_table)

Also see the jupyter notebook referenced here: Example: TuneAxis().

tune([width, num, md]) BlueSky plan to execute one pass through the current scan range
multi_pass_tune([width, step_factor, num, …]) BlueSky plan for tuning this axis with this signal
peak_detected() returns True if a peak was detected, otherwise False
multi_pass_tune(width=None, step_factor=None, num=None, pass_max=None, snake=None, md=None)[source]

BlueSky plan for tuning this axis with this signal

Execute multiple passes to refine the centroid determination. Each subsequent pass will reduce the width of scan by step_factor. If snake=True then the scan direction will reverse with each subsequent pass.

PARAMETERS

width : float
width of the tuning scan in the units of self.axis Default value in self.width (initially 1)
num : int
number of steps Default value in self.num (initially 10)
step_factor : float
This reduces the width of the next tuning scan by the given factor. Default value in self.step_factor (initially 4)
pass_max : int
Maximum number of passes to be executed (avoids runaway scans when a centroid is not found). Default value in self.pass_max (initially 10)
snake : bool
If True, reverse scan direction on next pass. Default value in self.snake (initially True)
md : dict, optional
metadata
peak_detected()[source]

returns True if a peak was detected, otherwise False

The default algorithm identifies a peak when the maximum value is four times the minimum value. Change this routine by subclassing TuneAxis and override peak_detected().

tune(width=None, num=None, md=None)[source]

BlueSky plan to execute one pass through the current scan range

Scan self.axis centered about current position from -width/2 to +width/2 with num observations. If a peak was detected (default check is that max >= 4*min), then set self.tune_ok = True.

PARAMETERS

width : float
width of the tuning scan in the units of self.axis Default value in self.width (initially 1)
num : int
number of steps Default value in self.num (initially 10)
md : dict, optional
metadata
APS_BlueSky_tools.plans.nscan(detectors, *motor_sets, num=11, per_step=None, md=None)[source]

Scan over n variables moved together, each in equally spaced steps.

PARAMETERS

detectors : list
list of ‘readable’ objects
motor_sets : list
sequence of one or more groups of: motor, start, finish
motor : object
any ‘settable’ object (motor, temp controller, etc.)
start : float
starting position of motor
finish : float
ending position of motor
num : int
number of steps (default = 11)
per_step : callable, optional
hook for customizing action of inner loop (messages per step) Expected signature: f(detectors, step_cache, pos_cache)
md : dict, optional
metadata

See the nscan() example in a Jupyter notebook: https://github.com/BCDA-APS/APS_BlueSky_tools/blob/master/docs/source/resources/demo_nscan.ipynb

APS_BlueSky_tools.plans.run_blocker_in_plan(blocker, *args, _poll_s_=0.01, _timeout_s_=None, **kwargs)[source]

plan: run blocking function blocker_(*args, **kwargs) from a Bluesky plan

PARAMETERS

blocker : func
function object to be called in a Bluesky plan
_poll_s_ : float
sleep interval in loop while waiting for completion (default: 0.01)
_timeout_s_ : float
maximum time for completion (default: None which means no timeout)

Example: use time.sleep as blocking function:

RE(run_blocker_in_plan(time.sleep, 2.14))

Example: in a plan, use time.sleep as blocking function:

def my_sleep(t=1.0):
    yield from run_blocker_in_plan(time.sleep, t)

RE(my_sleep())
APS_BlueSky_tools.plans.run_in_thread(func)[source]

(decorator) run func in thread

USAGE:

@run_in_thread
def progress_reporting():
    logger.debug("progress_reporting is starting")
    # ...

#...
progress_reporting()   # runs in separate thread
#...
APS_BlueSky_tools.plans.snapshot(obj_list, stream='primary', md=None)[source]

bluesky plan: record current values of list of ophyd signals

PARAMETERS

obj_list : list
list of ophyd Signal or EpicsSignal objects
stream : str
document stream, default: “primary”
md : dict
metadata
APS_BlueSky_tools.plans.tune_axes(axes)[source]

BlueSky plan to tune a list of axes in sequence

EXAMPLE

Sequentially, tune a list of preconfigured axes:

RE(tune_axes([mr, m2r, ar, a2r])