Devices

(ophyd) Devices that might be useful at the APS using BlueSky

APS GENERAL SUPPORT

ApsMachineParametersDevice(*args, **kwargs) common operational parameters of the APS of general interest
ApsPssShutter(*args, **kwargs) APS PSS shutter
ApsPssShutterWithStatus(prefix, state_pv, …) APS PSS shutter with separate status PV
SimulatedApsPssShutterWithStatus(*args, **kwargs) Simulated APS PSS shutter

AREA DETECTOR SUPPORT

AD_setup_FrameType(prefix[, scheme]) configure so frames are identified & handled by type (dark, white, or image)
AD_warmed_up(detector) Has area detector pushed an NDarray to the HDF5 plugin? True or False
AD_EpicsHdf5FileName(*args, **kwargs) custom class to define image file name from EPICS

DETECTOR / SCALER SUPPORT

use_EPICS_scaler_channels(scaler) configure scaler for only the channels with names assigned in EPICS

MOTORS, POSITIONERS, AXES, …

AxisTunerException Exception during execution of AxisTunerBase subclass
AxisTunerMixin(*args, **kwargs) Mixin class to provide tuning capabilities for an axis
EpicsDescriptionMixin(*args, **kwargs) add a record’s description field to a Device, such as EpicsMotor
EpicsMotorDialMixin(*args, **kwargs) add motor record’s dial coordinate fields to Device
EpicsMotorLimitsMixin(*args, **kwargs) add motor record HLM & LLM fields & compatibility get_lim() and set_lim()
EpicsMotorRawMixin(*args, **kwargs) add motor record’s raw coordinate fields to Device
EpicsMotorServoMixin(*args, **kwargs) add motor record’s servo loop controls to Device
EpicsMotorShutter(*args, **kwargs) a shutter, implemented with an EPICS motor moved between two positions
EpicsOnOffShutter(*args, **kwargs) a shutter, implemented with an EPICS PV moved between two positions

SHUTTERS

ApsPssShutter(*args, **kwargs) APS PSS shutter
ApsPssShutterWithStatus(prefix, state_pv, …) APS PSS shutter with separate status PV
EpicsMotorShutter(*args, **kwargs) a shutter, implemented with an EPICS motor moved between two positions
EpicsOnOffShutter(*args, **kwargs) a shutter, implemented with an EPICS PV moved between two positions

synApps records

busyRecord(*args, **kwargs)
sscanRecord(*args, **kwargs) EPICS synApps sscan record: used as $(P):scan(N)
sscanDevice(*args, **kwargs) synApps XXX IOC setup of sscan records: $(P):scan$(N)
swaitRecord(*args, **kwargs) synApps swait record: used as $(P):userCalc$(N)
swait_setup_random_number(swait, **kw) setup swait record to generate random numbers
swait_setup_gaussian(swait, motor[, center, …]) setup swait for noisy Gaussian
swait_setup_lorentzian(swait, motor[, …]) setup swait record for noisy Lorentzian
swait_setup_incrementer(swait[, scan, limit]) setup swait record as an incrementer
userCalcsDevice(*args, **kwargs) synApps XXX IOC setup of userCalcs: $(P):userCalc$(N)

OTHER SUPPORT

DualPf4FilterBox(*args, **kwargs) Dual Xia PF4 filter boxes using support from synApps (using Al, Ti foils)
EpicsDescriptionMixin(*args, **kwargs) add a record’s description field to a Device, such as EpicsMotor
ProcedureRegistry(*args, **kwargs) Procedure Registry: run a blocking function in a thread

Internal routines

ApsOperatorMessagesDevice(*args, **kwargs) general messages from the APS main control room
DeviceMixinBase(*args, **kwargs) Base class for APS_Bluesky_tools Device mixin classes
class APS_BlueSky_tools.devices.AD_EpicsHdf5FileName(*args, **kwargs)[source]

custom class to define image file name from EPICS

Caution

Caveat emptor applies here. You assume expertise!

Replace standard Bluesky algorithm where file names are defined as UUID strings, virtually guaranteeing that no existing images files will ever be overwritten.

Also, this method decouples the data files from the databroker, which needs the files to be named by UUID.

make_filename() overrides default behavior: Get info from EPICS HDF5 plugin.
generate_datum(key, timestamp, datum_kwargs) Generate a uid and cache it with its key for later insertion.
get_frames_per_point() overrides default behavior
stage() overrides default behavior

To allow users to control the file name, we override the make_filename() method here and we need to override some intervening classes.

To allow users to control the file number, we override the stage() method here and triple-comment out that line, and bring in sections from the methods we are replacing here.

The image file name is set in FileStoreBase.make_filename() from ophyd.areadetector.filestore_mixins. This is called (during device staging) from FileStoreBase.stage()

EXAMPLE:

To use this custom class, we need to connect it to some intervening structure. Here are the steps:

  1. override default file naming
  2. use to make your custom iterative writer
  3. use to make your custom HDF5 plugin
  4. use to make your custom AD support

imports:

from bluesky import RunEngine, plans as bp
from ophyd.areadetector import SimDetector, SingleTrigger
from ophyd.areadetector import ADComponent, ImagePlugin, SimDetectorCam
from ophyd.areadetector import HDF5Plugin
from ophyd.areadetector.filestore_mixins import FileStoreIterativeWrite

override default file naming:

from APS_BlueSky_tools.devices import AD_EpicsHdf5FileName

make a custom iterative writer:

class myHdf5EpicsIterativeWriter(AD_EpicsHdf5FileName, FileStoreIterativeWrite): pass

make a custom HDF5 plugin:

class myHDF5FileNames(HDF5Plugin, myHdf5EpicsIterativeWriter): pass

define support for the detector (simulated detector here):

class MySimDetector(SingleTrigger, SimDetector):
    '''SimDetector with HDF5 file names specified by EPICS'''
    
    cam = ADComponent(SimDetectorCam, "cam1:")
    image = ADComponent(ImagePlugin, "image1:")
    
    hdf1 = ADComponent(
        myHDF5FileNames, 
        suffix = "HDF1:", 
        root = "/",
        write_path_template = "/",
        )

create an instance of the detector:

simdet = MySimDetector("13SIM1:", name="simdet")
if hasattr(simdet.hdf1.stage_sigs, "array_counter"):
    # remove this so array counter is not set to zero each staging
    del simdet.hdf1.stage_sigs["array_counter"]
simdet.hdf1.stage_sigs["file_template"] = '%s%s_%3.3d.h5'

setup the file names using the EPICS HDF5 plugin:

simdet.hdf1.file_path.put("/tmp/simdet_demo/")    # ! ALWAYS end with a "/" !
simdet.hdf1.file_name.put("test")
simdet.hdf1.array_counter.put(0)

If you have not already, create a bluesky RunEngine:

RE = RunEngine({})

take an image:

RE(bp.count([simdet]))

INTERNAL METHODS

generate_datum(key, timestamp, datum_kwargs)[source]

Generate a uid and cache it with its key for later insertion.

get_frames_per_point()[source]

overrides default behavior

make_filename()[source]

overrides default behavior: Get info from EPICS HDF5 plugin.

stage()[source]

overrides default behavior

Set EPICS items before device is staged, then copy EPICS naming template (and other items) to ophyd after staging.

APS_BlueSky_tools.devices.AD_setup_FrameType(prefix, scheme='NeXus')[source]

configure so frames are identified & handled by type (dark, white, or image)

PARAMETERS

prefix (str) : EPICS PV prefix of area detector, such as “13SIM1:” scheme (str) : any key in the AD_FrameType_schemes dictionary

This routine prepares the EPICS Area Detector to identify frames by image type for handling by clients, such as the HDF5 file writing plugin. With the HDF5 plugin, the FrameType PV is added to the NDattributes and then used in the layout file to direct the acquired frame to the chosen dataset. The FrameType PV value provides the HDF5 address to be used.

To use a different scheme than the defaults, add a new key to the AD_FrameType_schemes dictionary, defining storage values for the fields of the EPICS mbbo record that you will be using.

see: https://github.com/BCDA-APS/use_bluesky/blob/master/notebooks/images_darks_flats.ipynb

EXAMPLE:

AD_setup_FrameType("2bmbPG3:", scheme="DataExchange")
  • Call this function before creating the ophyd area detector object
  • use lower-level PyEpics interface
APS_BlueSky_tools.devices.AD_warmed_up(detector)[source]

Has area detector pushed an NDarray to the HDF5 plugin? True or False

Works around an observed issue: #598 https://github.com/NSLS-II/ophyd/issues/598#issuecomment-414311372

If detector IOC has just been started and has not yet taken an image with the HDF5 plugin, then a TimeoutError will occur as the HDF5 plugin “Capture” is set to 1 (Start). In such case, first acquire at least one image with the HDF5 plugin enabled.

class APS_BlueSky_tools.devices.ApsBssUserInfoDevice(*args, **kwargs)[source]

provide current experiment info from the APS BSS

BSS: Beamtime Scheduling System

EXAMPLE:

bss_user_info = ApsBssUserInfoDevice(
                    "9id_bss:",
                    name="bss_user_info")
sd.baseline.append(bss_user_info)
class APS_BlueSky_tools.devices.ApsMachineParametersDevice(*args, **kwargs)[source]

common operational parameters of the APS of general interest

EXAMPLE:

import APS_BlueSky_tools.devices as APS_devices
APS = APS_devices.ApsMachineParametersDevice(name="APS")
aps_current = APS.current

# make sure these values are logged at start and stop of every scan
sd.baseline.append(APS)
# record storage ring current as secondary stream during scans
# name: aps_current_monitor
# db[-1].table("aps_current_monitor")
sd.monitors.append(aps_current)

The sd.baseline and sd.monitors usage relies on this global setup:

from bluesky import SupplementalData sd = SupplementalData() RE.preprocessors.append(sd)
inUserOperations determine if APS is in User Operations mode (boolean)
inUserOperations

determine if APS is in User Operations mode (boolean)

Use this property to configure ophyd Devices for direct or simulated hardware. See issue #49 (https://github.com/BCDA-APS/APS_BlueSky_tools/issues/49) for details.

EXAMPLE:

APS = APS_BlueSky_tools.devices.ApsMachineParametersDevice(name="APS")

if APS.inUserOperations:
    suspend_APS_current = bluesky.suspenders.SuspendFloor(APS.current, 2, resume_thresh=10)
    RE.install_suspender(suspend_APS_current)
else:
    # use pseudo shutter controls and no current suspenders
    pass
class APS_BlueSky_tools.devices.ApsOperatorMessagesDevice(*args, **kwargs)[source]

general messages from the APS main control room

class APS_BlueSky_tools.devices.ApsPssShutter(*args, **kwargs)[source]

APS PSS shutter

  • APS PSS shutters have separate bit PVs for open and close
  • set either bit, the shutter moves, and the bit resets a short time later
  • no indication that the shutter has actually moved from the bits (see ApsPssShutterWithStatus() for alternative)

EXAMPLE:

shutter_a = ApsPssShutter("2bma:A_shutter", name="shutter")

shutter_a.open()
shutter_a.close()

shutter_a.set("open")
shutter_a.set("close")

When using the shutter in a plan, be sure to use yield from, such as:

def in_a_plan(shutter):
    yield from abs_set(shutter, "open", wait=True)
    # do something
    yield from abs_set(shutter, "close", wait=True)

RE(in_a_plan(shutter_a))

The strings accepted by set() are defined in two lists: valid_open_values and valid_close_values. These lists are treated (internally to set()) as lower case strings.

Example, add “o” & “x” as aliases for “open” & “close”:

shutter_a.valid_open_values.append(“o”) shutter_a.valid_close_values.append(“x”) shutter_a.set(“o”) shutter_a.set(“x”)
close()[source]

request shutter to close, interactive use

open()[source]

request shutter to open, interactive use

set(value, **kwargs)[source]

request the shutter to open or close, BlueSky plan use

class APS_BlueSky_tools.devices.ApsPssShutterWithStatus(prefix, state_pv, *args, **kwargs)[source]

APS PSS shutter with separate status PV

  • APS PSS shutters have separate bit PVs for open and close
  • set either bit, the shutter moves, and the bit resets a short time later
  • a separate status PV tells if the shutter is open or closed (see ApsPssShutter() for alternative)

EXAMPLE:

A_shutter = ApsPssShutterWithStatus(
    "2bma:A_shutter", 
    "PA:02BM:STA_A_FES_OPEN_PL", 
    name="A_shutter")
B_shutter = ApsPssShutterWithStatus(
    "2bma:B_shutter", 
    "PA:02BM:STA_B_SBS_OPEN_PL", 
    name="B_shutter")

A_shutter.open()
A_shutter.close()

or

%mov A_shutter "open"
%mov A_shutter "close"

or

A_shutter.set("open")       # MUST be "open", not "Open"
A_shutter.set("close")

When using the shutter in a plan, be sure to use yield from.

def in_a_plan(shutter):
yield from abs_set(shutter, “open”, wait=True) # do something yield from abs_set(shutter, “close”, wait=True)

RE(in_a_plan(A_shutter))

The strings accepted by set() are defined in attributes (open_str and close_str).

close(timeout=10)[source]
isClosed
isOpen
open(timeout=10)[source]
class APS_BlueSky_tools.devices.ApsUndulator(*args, **kwargs)[source]

APS Undulator

EXAMPLE:

undulator = ApsUndulator("ID09ds:", name="undulator")
class APS_BlueSky_tools.devices.ApsUndulatorDual(*args, **kwargs)[source]

APS Undulator with upstream and downstream controls

EXAMPLE:

undulator = ApsUndulatorDual("ID09", name="undulator")

note:: the trailing : in the PV prefix should be omitted

exception APS_BlueSky_tools.devices.AxisTunerException[source]

Exception during execution of AxisTunerBase subclass

class APS_BlueSky_tools.devices.AxisTunerMixin(*args, **kwargs)[source]

Mixin class to provide tuning capabilities for an axis

See the TuneAxis() example in this jupyter notebook: https://github.com/BCDA-APS/APS_BlueSky_tools/blob/master/docs/source/resources/demo_tuneaxis.ipynb

HOOK METHODS

There are two hook methods (pre_tune_method(), and post_tune_method()) for callers to add additional plan parts, such as opening or closing shutters, setting detector parameters, or other actions.

Each hook method must accept a single argument: an axis object such as EpicsMotor or SynAxis, such as:

def my_pre_tune_hook(axis):
    yield from bps.mv(shutter, "open")
def my_post_tune_hook(axis):
    yield from bps.mv(shutter, "close")

class TunableSynAxis(AxisTunerMixin, SynAxis): pass

myaxis = TunableSynAxis(name="myaxis")
mydet = SynGauss('mydet', myaxis, 'myaxis', center=0.21, Imax=0.98e5, sigma=0.127)
myaxis.tuner = TuneAxis([mydet], myaxis)
myaxis.pre_tune_method = my_pre_tune_hook
myaxis.post_tune_method = my_post_tune_hook

RE(myaxis.tune())
class APS_BlueSky_tools.devices.DeviceMixinBase(*args, **kwargs)[source]

Base class for APS_Bluesky_tools Device mixin classes

class APS_BlueSky_tools.devices.DualPf4FilterBox(*args, **kwargs)[source]

Dual Xia PF4 filter boxes using support from synApps (using Al, Ti foils)

EXAMPLE:

pf4 = DualPf4FilterBox("2bmb:pf4:", name="pf4")
pf4_AlTi = DualPf4FilterBox("9idcRIO:pf4:", name="pf4_AlTi")
class APS_BlueSky_tools.devices.EpicsDescriptionMixin(*args, **kwargs)[source]

add a record’s description field to a Device, such as EpicsMotor

EXAMPLE:

from ophyd import EpicsMotor
from APS_BlueSky_tools.devices import EpicsDescriptionMixin

class myEpicsMotor(EpicsDescriptionMixin, EpicsMotor): pass
m1 = myEpicsMotor('xxx:m1', name='m1')
print(m1.desc.value)
class APS_BlueSky_tools.devices.EpicsMotorDialMixin(*args, **kwargs)[source]

add motor record’s dial coordinate fields to Device

EXAMPLE:

from ophyd import EpicsMotor
from APS_BlueSky_tools.devices import EpicsMotorDialMixin

class myEpicsMotor(EpicsMotorDialMixin, EpicsMotor): pass
m1 = myEpicsMotor('xxx:m1', name='m1')
print(m1.dial.read())
class APS_BlueSky_tools.devices.EpicsMotorLimitsMixin(*args, **kwargs)[source]

add motor record HLM & LLM fields & compatibility get_lim() and set_lim()

EXAMPLE:

from ophyd import EpicsMotor
from APS_BlueSky_tools.devices import EpicsMotorLimitsMixin

class myEpicsMotor(EpicsMotorLimitsMixin, EpicsMotor): pass
m1 = myEpicsMotor('xxx:m1', name='m1')
lo = m1.get_lim(-1)
hi = m1.get_lim(1)
m1.set_lim(-25, -5)
print(m1.get_lim(-1), m1.get_lim(1))
m1.set_lim(lo, hi)
get_lim(flag)[source]

Returns the user limit of motor

  • flag > 0: returns high limit
  • flag < 0: returns low limit
  • flag == 0: returns None

Similar with SPEC command

set_lim(low, high)[source]

Sets the low and high limits of motor

  • No action taken if motor is moving.
  • Low limit is set to lesser of (low, high)
  • High limit is set to greater of (low, high)

Similar with SPEC command

class APS_BlueSky_tools.devices.EpicsMotorRawMixin(*args, **kwargs)[source]

add motor record’s raw coordinate fields to Device

EXAMPLE:

from ophyd import EpicsMotor
from APS_BlueSky_tools.devices import EpicsMotorRawMixin

class myEpicsMotor(EpicsMotorRawMixin, EpicsMotor): pass
m1 = myEpicsMotor('xxx:m1', name='m1')
print(m1.raw.read())
class APS_BlueSky_tools.devices.EpicsMotorServoMixin(*args, **kwargs)[source]

add motor record’s servo loop controls to Device

EXAMPLE:

from ophyd import EpicsMotor
from APS_BlueSky_tools.devices import EpicsMotorServoMixin

class myEpicsMotor(EpicsMotorServoMixin, EpicsMotor): pass
m1 = myEpicsMotor('xxx:m1', name='m1')
print(m1.servo.read())
class APS_BlueSky_tools.devices.EpicsMotorShutter(*args, **kwargs)[source]

a shutter, implemented with an EPICS motor moved between two positions

EXAMPLE:

tomo_shutter = EpicsMotorShutter("2bma:m23", name="tomo_shutter")
tomo_shutter.closed_position = 1.0      # default
tomo_shutter.open_position = 0.0        # default
tomo_shutter.open()
tomo_shutter.close()

# or, when used in a plan
def planA():
    yield from abs_set(tomo_shutter, "open", group="O")
    yield from wait("O")
    yield from abs_set(tomo_shutter, "close", group="X")
    yield from wait("X")
def planA():
    yield from abs_set(tomo_shutter, "open", wait=True)
    yield from abs_set(tomo_shutter, "close", wait=True)
def planA():
    yield from mv(tomo_shutter, "open")
    yield from mv(tomo_shutter, "close")
close()[source]

move motor to BEAM BLOCKED position, interactive use

isClosed
isOpen
open()[source]

move motor to BEAM NOT BLOCKED position, interactive use

set(value, *, timeout=None, settle_time=None)[source]

set() is like put(), but used in BlueSky plans

PARAMETERS

value : “open” or “close”

timeout : float, optional
Maximum time to wait. Note that set_and_wait does not support an infinite timeout.
settle_time: float, optional
Delay after the set() has completed to indicate completion to the caller

RETURNS

status : DeviceStatus

class APS_BlueSky_tools.devices.EpicsOnOffShutter(*args, **kwargs)[source]

a shutter, implemented with an EPICS PV moved between two positions

Use for a shutter controlled by a single PV which takes a value for the close command and a different value for the open command. The current position is determined by comparing the value of the control with the expected open and close values.

EXAMPLE:

bit_shutter = EpicsOnOffShutter("2bma:bit1", name="bit_shutter")
bit_shutter.closed_position = 0      # default
bit_shutter.open_position = 1        # default
bit_shutter.open()
bit_shutter.close()

# or, when used in a plan
def planA():
    yield from mv(bit_shutter, "open")
    yield from mv(bit_shutter, "close")
close()[source]

move control to BEAM BLOCKED position, interactive use

isClosed
isOpen
open()[source]

move control to BEAM NOT BLOCKED position, interactive use

set(value, *, timeout=None, settle_time=None)[source]

set() is like put(), but used in BlueSky plans

PARAMETERS

value : “open” or “close”

timeout : float, optional
Maximum time to wait. Note that set_and_wait does not support an infinite timeout.
settle_time: float, optional
Delay after the set() has completed to indicate completion to the caller

RETURNS

status : DeviceStatus

class APS_BlueSky_tools.devices.ProcedureRegistry(*args, **kwargs)[source]

Procedure Registry: run a blocking function in a thread

With many instruments, such as USAXS, there are several operating modes to be used, each with its own setup code. This ophyd Device should coordinate those modes so that the setup procedures can be called either as part of a Bluesky plan or from the command line directly. Assumes that users will write functions to setup a particular operation or operating mode. The user-written functions may not be appropriate to use in a plan directly since they might make blocking calls. The ProcedureRegistry will call the function in a thread (which is allowed to make blocking calls) and wait for the thread to complete.

It is assumed that each user-written function will not return until it is complete. .. autosummary:

~dir
~add
~remove
~set
~put

EXAMPLE:

Given these function definitions:

def clearScalerNames():
    for ch in scaler.channels.configuration_attrs:
        if ch.find(".") < 0:
            chan = scaler.channels.__getattribute__(ch)
            chan.chname.put("")

def setMyScalerNames():
    scaler.channels.chan01.chname.put("clock")
    scaler.channels.chan02.chname.put("I0")
    scaler.channels.chan03.chname.put("detector")

create a registry and add the two functions (default name is the function name):

use_mode = ProcedureRegistry(name=”ProcedureRegistry”) use_mode.add(clearScalerNames) use_mode.add(setMyScalerNames)

and then use this registry in a plan, such as this:

def myPlan():
    yield from bps.mv(use_mode, "setMyScalerNames")
    yield from bps.sleep(5)
    yield from bps.mv(use_mode, "clearScalerNames")
add(procedure, proc_name=None)[source]

add procedure to registry

dir

tuple of procedure names

put(value)[source]

replaces ophyd Device default put() behavior

remove(procedure)[source]

remove procedure from registry

set(proc_name)[source]

run procedure in a thread, return once it is complete

proc_name (str) : name of registered procedure to be run

class APS_BlueSky_tools.devices.SimulatedApsPssShutterWithStatus(*args, **kwargs)[source]

Simulated APS PSS shutter

EXAMPLE:

sim = SimulatedApsPssShutterWithStatus(name="sim")
close(timeout=10)[source]

request the shutter to close

get_response_time()[source]

simulated response time for PSS status

isClosed

is the shutter closed?

isOpen

is the shutter open?

open(timeout=10)[source]

request the shutter to open

set(value, **kwargs)[source]

set the shutter to “close” or “open”

APS_BlueSky_tools.devices.use_EPICS_scaler_channels(scaler)[source]

configure scaler for only the channels with names assigned in EPICS