Plans¶
Plans that might be useful at the APS when using BlueSky
nscan (detectors, *motor_sets[, num, …]) |
Scan over n variables moved together, each in equally spaced steps. |
ProcedureRegistry (*args, **kwargs) |
Procedure Registry |
run_blocker_in_plan (blocker, *args[, …]) |
plan: run blocking function blocker_(*args, **kwargs) from a Bluesky plan |
run_in_thread (func) |
(decorator) run func in thread |
snapshot (obj_list[, stream, md]) |
bluesky plan: record current values of list of ophyd signals |
TuneAxis (signals, axis[, signal_name]) |
tune an axis with a signal |
tune_axes (axes) |
BlueSky plan to tune a list of axes in sequence |
-
class
APS_BlueSky_tools.plans.
ProcedureRegistry
(*args, **kwargs)[source]¶ Procedure Registry
Caution
This Device may be relocated or removed entirely in future releases. Its use is complicated and could lead to instability.
With many instruments, such as USAXS, there are several operating modes to be used, each with its own setup code. This ophyd Device should coordinate those modes so that the setup procedures can be called either as part of a Bluesky plan or from the command line directly.
Assumes that users will write functions to setup a particular operation or operating mode. The user-written functions may not be appropriate to use in a plan directly since they might make blocking calls. The ProcedureRegistry will call the function in a thread (which is allowed to make blocking calls) and wait for the thread to complete.
It is assumed that each user-written function will not return until it is complete.
dir
tuple of procedure names add
(procedure[, proc_name])add procedure to registry remove
(procedure)remove procedure from registry set
(proc_name)run procedure in a thread, return once it is complete put
(value)replaces ophyd Device default put() behavior EXAMPLE:
use_mode = ProcedureRegistry(name="use_mode") def clearScalerNames(): for ch in scaler.channels.configuration_attrs: if ch.find(".") < 0: chan = scaler.channels.__getattribute__(ch) chan.chname.put("") def setMyScalerNames(): scaler.channels.chan01.chname.put("clock") scaler.channels.chan02.chname.put("I0") scaler.channels.chan03.chname.put("detector") def useMyScalerNames(): # Bluesky plan yield from bps.mv( m1, 5, use_mode, "clear", ) yield from bps.mv( m1, 0, use_mode, "set", ) def demo(): print(1) m1.move(5) print(2) time.sleep(2) print(3) m1.move(0) print(4) use_mode.add(demo) use_mode.add(clearScalerNames, "clear") use_mode.add(setMyScalerNames, "set") # use_mode.set("demo") # use_mode.set("clear") # RE(useMyScalerNames())
-
dir
¶ tuple of procedure names
-
-
class
APS_BlueSky_tools.plans.
TuneAxis
(signals, axis, signal_name=None)[source]¶ tune an axis with a signal
This class provides a tuning object so that a Device or other entity may gain its own tuning process, keeping track of the particulars needed to tune this device again. For example, one could add a tuner to a motor stage:
motor = EpicsMotor("xxx:motor", "motor") motor.tuner = TuneAxis([det], motor)
Then the
motor
could be tuned individually:RE(motor.tuner.tune(md={"activity": "tuning"}))
or the
tune()
could be part of a plan with other steps.Example:
tuner = TuneAxis([det], axis) live_table = LiveTable(["axis", "det"]) RE(tuner.multi_pass_tune(width=2, num=9), live_table) RE(tuner.tune(width=0.05, num=9), live_table)
Also see the jupyter notebook referenced here: Example: TuneAxis().
tune
([width, num, md])BlueSky plan to execute one pass through the current scan range multi_pass_tune
([width, step_factor, num, …])BlueSky plan for tuning this axis with this signal peak_detected
()returns True if a peak was detected, otherwise False -
multi_pass_tune
(width=None, step_factor=None, num=None, pass_max=None, snake=None, md=None)[source]¶ BlueSky plan for tuning this axis with this signal
Execute multiple passes to refine the centroid determination. Each subsequent pass will reduce the width of scan by
step_factor
. Ifsnake=True
then the scan direction will reverse with each subsequent pass.PARAMETERS
- width : float
- width of the tuning scan in the units of
self.axis
Default value inself.width
(initially 1) - num : int
- number of steps
Default value in
self.num
(initially 10) - step_factor : float
- This reduces the width of the next tuning scan by the given factor.
Default value in
self.step_factor
(initially 4) - pass_max : int
- Maximum number of passes to be executed (avoids runaway
scans when a centroid is not found).
Default value in
self.pass_max
(initially 10) - snake : bool
- If
True
, reverse scan direction on next pass. Default value inself.snake
(initially True) - md : dict, optional
- metadata
-
peak_detected
()[source]¶ returns True if a peak was detected, otherwise False
The default algorithm identifies a peak when the maximum value is four times the minimum value. Change this routine by subclassing
TuneAxis
and overridepeak_detected()
.
-
tune
(width=None, num=None, md=None)[source]¶ BlueSky plan to execute one pass through the current scan range
Scan self.axis centered about current position from
-width/2
to+width/2
withnum
observations. If a peak was detected (default check is that max >= 4*min), then setself.tune_ok = True
.PARAMETERS
- width : float
- width of the tuning scan in the units of
self.axis
Default value inself.width
(initially 1) - num : int
- number of steps
Default value in
self.num
(initially 10) - md : dict, optional
- metadata
-
-
APS_BlueSky_tools.plans.
nscan
(detectors, *motor_sets, num=11, per_step=None, md=None)[source]¶ Scan over
n
variables moved together, each in equally spaced steps.PARAMETERS
- detectors : list
- list of ‘readable’ objects
- motor_sets : list
- sequence of one or more groups of: motor, start, finish
- motor : object
- any ‘settable’ object (motor, temp controller, etc.)
- start : float
- starting position of motor
- finish : float
- ending position of motor
- num : int
- number of steps (default = 11)
- per_step : callable, optional
- hook for customizing action of inner loop (messages per step)
Expected signature:
f(detectors, step_cache, pos_cache)
- md : dict, optional
- metadata
See the nscan() example in a Jupyter notebook: https://github.com/BCDA-APS/APS_BlueSky_tools/blob/master/docs/source/resources/demo_nscan.ipynb
-
APS_BlueSky_tools.plans.
run_blocker_in_plan
(blocker, *args, _poll_s_=0.01, _timeout_s_=None, **kwargs)[source]¶ plan: run blocking function
blocker_(*args, **kwargs)
from a Bluesky planPARAMETERS
- blocker : func
- function object to be called in a Bluesky plan
- _poll_s_ : float
- sleep interval in loop while waiting for completion (default: 0.01)
- _timeout_s_ : float
- maximum time for completion (default: None which means no timeout)
Example: use
time.sleep
as blocking function:RE(run_blocker_in_plan(time.sleep, 2.14))
Example: in a plan, use
time.sleep
as blocking function:def my_sleep(t=1.0): yield from run_blocker_in_plan(time.sleep, t) RE(my_sleep())
-
APS_BlueSky_tools.plans.
run_in_thread
(func)[source]¶ (decorator) run
func
in threadUSAGE:
@run_in_thread def progress_reporting(): logger.debug("progress_reporting is starting") # ... #... progress_reporting() # runs in separate thread #...