"""
Ophyd support for the EPICS synApps sscan record
EXAMPLE
import APS_BlueSky_tools.synApps_ophyd
scans = APS_BlueSky_tools.synApps_ophyd.sscanDevice("xxx:", name="scans")
Public Structures
.. autosummary::
~sscanRecord
~sscanDevice
Private Structures
.. autosummary::
~sscanPositioner
~sscanDetector
~sscanTrigger
"""
from collections import OrderedDict
from ophyd.device import (
Device,
Component as Cpt,
DynamicDeviceComponent as DDC,
FormattedComponent as FC)
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd.status import DeviceStatus
__all__ = """
sscanRecord
sscanDevice
""".split()
class sscanPositioner(Device):
"""positioner of an EPICS sscan record"""
readback_pv = FC(EpicsSignal, '{self.prefix}.R{self._ch_num}PV')
readback_value = FC(EpicsSignalRO, '{self.prefix}.R{self._ch_num}CV')
setpoint_pv = FC(EpicsSignal, '{self.prefix}.P{self._ch_num}PV')
setpoint_value = FC(EpicsSignalRO, '{self.prefix}.P{self._ch_num}DV')
start = FC(EpicsSignal, '{self.prefix}.P{self._ch_num}SP')
center = FC(EpicsSignal, '{self.prefix}.P{self._ch_num}CP')
end = FC(EpicsSignal, '{self.prefix}.P{self._ch_num}EP')
step_size = FC(EpicsSignal, '{self.prefix}.P{self._ch_num}SI')
width = FC(EpicsSignal, '{self.prefix}.P{self._ch_num}WD')
abs_rel = FC(EpicsSignal, '{self.prefix}.P{self._ch_num}AR')
mode = FC(EpicsSignal, '{self.prefix}.P{self._ch_num}SM')
units = FC(EpicsSignalRO, '{self.prefix}.P{self._ch_num}EU')
def __init__(self, prefix, num, **kwargs):
self._ch_num = num
super().__init__(prefix, **kwargs)
def reset(self):
"""set all fields to default values"""
self.readback_pv.put("")
self.setpoint_pv.put("")
self.start.put(0)
self.center.put(0)
self.end.put(0)
self.step_size.put(0)
self.width.put(0)
self.abs_rel.put("ABSOLUTE")
self.mode.put("LINEAR")
class sscanDetector(Device):
"""detector of an EPICS sscan record"""
input_pv = FC(EpicsSignal, '{self.prefix}.D{self._ch_num}PV')
current_value = FC(EpicsSignal, '{self.prefix}.D{self._ch_num}CV')
def __init__(self, prefix, num, **kwargs):
self._ch_num = num
super().__init__(prefix, **kwargs)
def reset(self):
"""set all fields to default values"""
self.input_pv.put("")
class sscanTrigger(Device):
"""detector trigger of an EPICS sscan record"""
trigger_pv = FC(EpicsSignal, '{self.prefix}.T{self._ch_num}PV')
trigger_value = FC(EpicsSignal, '{self.prefix}.T{self._ch_num}CD')
def __init__(self, prefix, num, **kwargs):
self._ch_num = num
super().__init__(prefix, **kwargs)
def reset(self):
"""set all fields to default values"""
self.trigger_pv.put("")
self.trigger_value.put(1)
def _sscan_positioners(channel_list):
defn = OrderedDict()
for chan in channel_list:
attr = 'p{}'.format(chan)
defn[attr] = (sscanPositioner, '', {'num': chan})
return defn
def _sscan_detectors(channel_list):
defn = OrderedDict()
for chan in channel_list:
attr = 'd{}'.format(chan)
defn[attr] = (sscanDetector, '', {'num': chan})
return defn
def _sscan_triggers(channel_list):
defn = OrderedDict()
for chan in channel_list:
attr = 't{}'.format(chan)
defn[attr] = (sscanTrigger, '', {'num': chan})
return defn
[docs]class sscanRecord(Device):
"""EPICS synApps sscan record: used as $(P):scan(N)"""
desc = Cpt(EpicsSignal, '.DESC')
faze = Cpt(EpicsSignalRO, '.FAZE')
data_state = Cpt(EpicsSignalRO, '.DSTATE')
npts = Cpt(EpicsSignal, '.NPTS')
cpt = Cpt(EpicsSignalRO, '.CPT')
pasm = Cpt(EpicsSignal, '.PASM')
exsc = Cpt(EpicsSignal, '.EXSC')
bspv = Cpt(EpicsSignal, '.BSPV')
bscd = Cpt(EpicsSignal, '.BSCD')
bswait = Cpt(EpicsSignal, '.BSWAIT')
cmnd = Cpt(EpicsSignal, '.CMND')
ddly = Cpt(EpicsSignal, '.DDLY')
pdly = Cpt(EpicsSignal, '.PDLY')
refd = Cpt(EpicsSignal, '.REFD')
wait = Cpt(EpicsSignal, '.WAIT')
wcnt = Cpt(EpicsSignalRO, '.WCNT')
awct = Cpt(EpicsSignal, '.AWCT')
acqt = Cpt(EpicsSignal, '.ACQT')
acqm = Cpt(EpicsSignal, '.ACQM')
atime = Cpt(EpicsSignal, '.ATIME')
copyto = Cpt(EpicsSignal, '.COPYTO')
a1pv = Cpt(EpicsSignal, '.A1PV')
a1cd = Cpt(EpicsSignal, '.A1CD')
aspv = Cpt(EpicsSignal, '.ASPV')
ascd = Cpt(EpicsSignal, '.ASCD')
positioners = DDC(
_sscan_positioners(
"1 2 3 4".split()
)
)
detectors = DDC(
_sscan_detectors(
["%02d" % k for k in range(1,71)]
)
)
triggers = DDC(
_sscan_triggers(
"1 2 3 4".split()
)
)
[docs] def set(self, value, **kwargs):
"""interface to use bps.mv()"""
if value != 1:
return
working_status = DeviceStatus(self)
started = False
def exsc_cb(value, timestamp, **kwargs):
value = int(value)
if started and value == 0:
working_status._finished()
self.exsc.subscribe(exsc_cb)
self.exsc.set(1)
started = True
return working_status
[docs] def reset(self):
"""set all fields to default values"""
self.desc.put(self.desc.pvname.split(".")[0])
self.npts.put(1000)
for part in (self.positioners, self.detectors, self.triggers):
for ch_name in part.read_attrs:
channel = part.__getattr__(ch_name)
channel.reset()
self.a1pv.put("")
self.acqm.put("NORMAL")
if self.name.find("scanH") > 0:
self.acqt.put("1D ARRAY")
else:
self.acqt.put("SCALAR")
self.aspv.put("")
self.bspv.put("")
self.pasm.put("STAY")
self.bswait.put("Wait")
self.a1cd.put(1)
self.ascd.put(1)
self.bscd.put(1)
self.refd.put(1)
self.atime.put(0)
self.awct.put(0)
self.copyto.put(0)
self.ddly.put(0)
self.pdly.put(0)
while self.wcnt.get() > 0:
self.wait.put(0)
[docs]class sscanDevice(Device):
"""synApps XXX IOC setup of sscan records: $(P):scan$(N)"""
scan_dimension = Cpt(EpicsSignalRO, 'ScanDim')
scan_pause = Cpt(EpicsSignal, 'scanPause')
abort_scans = Cpt(EpicsSignal, 'AbortScans')
scan1 = Cpt(sscanRecord, 'scan1')
scan2 = Cpt(sscanRecord, 'scan2')
scan3 = Cpt(sscanRecord, 'scan3')
scan4 = Cpt(sscanRecord, 'scan4')
scanH = Cpt(sscanRecord, 'scanH')
resume_delay = Cpt(EpicsSignal, 'scanResumeSEQ.DLY1')
[docs] def reset(self):
"""set all fields to default values"""
self.scan1.reset()
self.scan2.reset()
self.scan3.reset()
self.scan4.reset()
self.scanH.reset()