"""
BlueSky callback that writes SPEC data files
.. autosummary::
~SpecWriterCallback
EXAMPLE : the :ref:`specfile_example() <example_specfile>` writes one or more scans to a SPEC data file using a jupyter notebook.
EXAMPLE : use as BlueSky callback::
from APS_BlueSky_tools.filewriters import SpecWriterCallback
specwriter = SpecWriterCallback()
RE.subscribe(specwriter.receiver)
EXAMPLE : use as writer from Databroker::
from APS_BlueSky_tools.filewriters import SpecWriterCallback
specwriter = SpecWriterCallback()
for key, doc in db.get_documents(db[-1]):
specwriter.receiver(key, doc)
print("Look at SPEC data file: "+specwriter.spec_filename)
EXAMPLE : use as writer from Databroker with customizations::
from APS_BlueSky_tools.filewriters import SpecWriterCallback
# write into file: /tmp/cerium.spec
specwriter = SpecWriterCallback(filename="/tmp/cerium.spec")
for key, doc in db.get_documents(db[-1]):
specwriter.receiver(key, doc)
# write into file: /tmp/barium.dat
specwriter.newfile("/tmp/barium.dat")
for key, doc in db.get_documents(db["b46b63d4"]):
specwriter.receiver(key, doc)
"""
# Copyright (c) 2017-2018, UChicago Argonne, LLC. See LICENSE file.
from collections import OrderedDict
from datetime import datetime
import getpass
import logging
import os
import socket
import time
logger = logging.getLogger(__name__).addHandler(logging.NullHandler())
# Programmer's Note: subclassing from `object` avoids the need
# to import `bluesky.callbacks.core.CallbackBase`.
# One less import when only accessing the Databroker.
# The *only* advantage to subclassing from CallbackBase
# seems to be a simpler setup call to RE.subscribe().
#
# superclass | subscription code
# ------------ | -------------------------------
# object | RE.subscribe(specwriter.receiver)
# CallbackBase | RE.subscribe(specwriter)
SPEC_TIME_FORMAT = "%a %b %d %H:%M:%S %Y"
SCAN_ID_RESET_VALUE = 1
def _rebuild_scan_command(doc):
"""reconstruct the scan command for SPEC data file #S line"""
def get_name(src):
"""
get name field from object representation
given: EpicsMotor(prefix='xxx:m1', name='m1', settle_time=0.0,
timeout=None, read_attrs=['user_readback', 'user_setpoint'],
configuration_attrs=['motor_egu', 'velocity', 'acceleration',
'user_offset', 'user_offset_dir'])
return: "m1"
"""
s = str(src)
p = s.find("(")
if p > 0: # only if an open parenthesis is found
parts = s[p+1:].rstrip(")").split(",")
for item in parts:
# should be key=value pairs
item = item.strip()
p = item.find("=")
if item[:p] == "name":
s = item[p+1:] # get the name value
break
return s
s = []
if "plan_args" in doc:
for _k, _v in doc['plan_args'].items():
if _k == "detectors":
_v = doc[_k]
elif _k.startswith("motor"):
_v = doc["motors"]
elif _k == "args":
_v = "[" + ", ".join(map(get_name, _v)) + "]"
s.append("{}={}".format(_k, _v))
cmd = "{}({})".format(doc.get("plan_name", ""), ", ".join(s))
scan_id = doc.get("scan_id") or 1 # TODO: improve the `1` default
return "{} {}".format(scan_id, cmd)
[docs]class SpecWriterCallback(object):
"""
collect data from BlueSky RunEngine documents to write as SPEC data
This gathers data from all documents and appends scan to the file
when the *stop* document is received.
Parameters
filename : string, optional
Local, relative or absolute name of SPEC data file to be used.
If `filename=None`, defaults to format of YYYmmdd-HHMMSS.dat
derived from the current system time.
auto_write : boolean, optional
If True (default), `write_scan()` is called when *stop* document
is received.
If False, the caller is responsible for calling `write_scan()`
before the next *start* document is received.
User Interface methods
.. autosummary::
~receiver
~newfile
~usefile
~make_default_filename
~clear
~prepare_scan_contents
~write_scan
Internal methods
.. autosummary::
~write_header
~start
~descriptor
~event
~bulk_events
~datum
~resource
~stop
"""
def __init__(self, filename=None, auto_write=True):
self.clear()
self.spec_filename = filename
self.auto_write = auto_write
self.uid_short_length = 8
self.write_file_header = False
self.spec_epoch = None # for both #E & #D line in header, also offset for all scans
self.spec_host = None
self.spec_user = None
self._datetime = None # most recent document time as datetime object
self._streams = {} # descriptor documents, keyed by uid
if filename is None or not os.path.exists(filename):
self.newfile(filename)
else:
last_scan_id = self.usefile(filename) # TODO: set RE's scan_id based on result?
[docs] def clear(self):
"""reset all scan data defaults"""
self.uid = None
self.scan_epoch = None # absolute epoch to report in scan #D line
self.time = None # full time from document
self.comments = dict(start=[], event=[], descriptor=[], resource=[], datum=[], stop=[])
self.data = OrderedDict() # data in the scan
self.detectors = OrderedDict() # names of detectors in the scan
self.hints = OrderedDict() # why?
self.metadata = OrderedDict() # #MD lines in header
self.motors = OrderedDict() # names of motors in the scan
self.positioners = OrderedDict() # names in #O, values in #P
self.num_primary_data = 0
#
# note: for one scan, #O & #P information is not provided
# unless collecting baseline data
# wait for case with baseline data that needs #O/#P lines
#
self.columns = OrderedDict() # #L in scan
self.scan_command = None # #S line
def _cmt(self, key, text):
"""enter a comment"""
ts = datetime.strftime(self._datetime, SPEC_TIME_FORMAT)
self.comments[key].append("{}. {}".format(ts, text))
[docs] def receiver(self, key, document):
"""BlueSky callback: receive all documents for handling"""
xref = dict(
start = self.start,
descriptor = self.descriptor,
event = self.event,
bulk_events = self.bulk_events,
datum = self.datum,
resource = self.resource,
stop = self.stop,
)
logger = logging.getLogger(__name__)
if key in xref:
uid = document.get("uid") or document.get("datum_id")
logger.debug("%s document, uid=%s", key, str(uid))
ts = document.get("time")
if ts is None:
ts = datetime.now()
else:
ts = datetime.fromtimestamp(document["time"])
self._datetime = ts
xref[key](document)
else:
msg = "custom_callback encountered: {} : {}".format(key, document)
# raise ValueError(msg)
logger.warning(msg)
[docs] def start(self, doc):
"""handle *start* documents"""
known_properties = """
uid time project sample scan_id group owner
detectors hints
plan_type plan_name plan_args
""".split()
self.clear()
self.uid = doc["uid"]
self._cmt("start", "uid = {}".format(self.uid))
self.time = doc["time"]
self.scan_epoch = int(self.time)
self.scan_id = doc["scan_id"] or 0
# Which reference? fixed counting time or fixed monitor count?
# Can this be omitted?
self.T_or_M = None # for now
# self.T_or_M = "T" # TODO: how to get this from the document stream?
# self.T_or_M_value = 1
# self._cmt("start", "!!! #T line not correct yet !!!")
# metadata
for key in sorted(doc.keys()):
if key not in known_properties:
self.metadata[key] = doc[key]
# various dicts
for item in "detectors hints motors".split():
if item in doc:
obj = self.__getattribute__(item)
for key in doc.get(item):
obj[key] = None
cmt = "plan_type = " + doc["plan_type"]
ts = datetime.strftime(self._datetime, SPEC_TIME_FORMAT)
self.comments["start"].insert(0, "{}. {}".format(ts, cmt))
self.scan_command = _rebuild_scan_command(doc)
[docs] def descriptor(self, doc):
"""
handle *descriptor* documents
prepare for primary scan data, ignore any other data stream
"""
# TODO: log descriptor documents by uid
# for reference from event and bulk_events documents
if doc["uid"] in self._streams:
fmt = "duplicate descriptor UID {} found"
raise KeyError(fmt.format(doc["uid"]))
# log descriptor documents by uid
# referenced by event and bulk_events documents
self._streams[doc["uid"]] = doc
if doc["name"] == "primary":
doc_data_keys = list(doc["data_keys"].keys())
self.data.update({k: [] for k in sorted(doc_data_keys)})
self.data["Epoch"] = []
self.data["Epoch_float"] = []
# SPEC data files have implied defaults
# SPEC default: X axis in 1st column and Y axis in last column
_at_last = len(self.motors) > 0
self.data.move_to_end("Epoch_float", last=_at_last)
self.data.move_to_end("Epoch")
# TODO: move motors to first
# TODO: move detectors to last
if len(self.motors) > 0:
# find 1st motor and move to last
motor_name = list(self.motors.keys())[0]
self.data.move_to_end(motor_name, last=False)
# monitor (detector) in next to last position
# but how can we get that name here?
if len(self.detectors) > 0:
# find 1st detector and move to last
det_name = list(self.detectors.keys())[0]
if det_name not in self.data and len(doc_data_keys) > 0:
det_name = doc_data_keys[0]
if det_name in self.data:
self.data.move_to_end(det_name)
[docs] def event(self, doc):
"""
handle *event* documents
"""
stream_doc = self._streams.get(doc["descriptor"])
if stream_doc is None:
fmt = "descriptor UID {} not found"
raise KeyError(fmt.format(doc["descriptor"]))
if stream_doc["name"] == "primary":
for k in doc["data"].keys():
if k not in self.data.keys():
fmt = "unexpected failure here, key {} not found"
raise KeyError(fmt.format(k))
#return # not our expected event data
for k in self.data.keys():
if k == "Epoch":
v = int(doc["time"] - self.time + 0.5)
elif k == "Epoch_float":
v = doc["time"] - self.time
else:
v = doc["data"][k]
self.data[k].append(v)
self.num_primary_data += 1
[docs] def bulk_events(self, doc):
"""handle *bulk_events* documents"""
pass
[docs] def datum(self, doc):
"""handle *datum* documents"""
self._cmt("datum", "datum " + str(doc))
[docs] def resource(self, doc):
"""handle *resource* documents"""
self._cmt("resource", "resource " + str(doc))
[docs] def stop(self, doc):
"""handle *stop* documents"""
if "num_events" in doc:
for k, v in doc["num_events"].items():
self._cmt("stop", "num_events_{} = {}".format(k, v))
if "exit_status" in doc:
self._cmt("stop", "exit_status = " + doc["exit_status"])
else:
self._cmt("stop", "exit_status = not available")
if self.auto_write:
self.write_scan()
[docs] def prepare_scan_contents(self):
"""
format the scan for a SPEC data file
:returns: [str] a list of lines to append to the data file
"""
dt = datetime.fromtimestamp(self.scan_epoch)
lines = []
lines.append("")
lines.append("#S " + self.scan_command)
lines.append("#D " + datetime.strftime(dt, SPEC_TIME_FORMAT))
if self.T_or_M is not None:
lines.append("#{} {}".format(self.T_or_M, self.T_or_M_value))
for v in self.comments["start"]:
#C Wed Feb 03 16:51:38 2016. do ./usaxs.mac.
lines.append("#C " + v) # TODO: add time/date stamp as SPEC does
for v in self.comments["descriptor"]:
lines.append("#C " + v)
for k, v in self.metadata.items():
# "#MD" is our ad hoc SPEC data tag
lines.append("#MD {} = {}".format(k, v))
lines.append("#N " + str(self.num_primary_data))
if len(self.data.keys()) > 0:
lines.append("#L " + " ".join(self.data.keys()))
for i in range(self.num_primary_data):
str_data = OrderedDict()
s = []
for k in self.data.keys():
datum = self.data[k][i]
if isinstance(datum, str):
# SPEC scan data is expected to be numbers
# this is text, substitute the row number
# and report after this line in a #U line
str_data[k] = datum
datum = i
s.append(str(datum))
lines.append(" ".join(s))
for k in str_data.keys():
# report the text data
lines.append("#U {} {} {}".format(i, k, str_data[k]))
else:
lines.append("#C no data column labels identified")
for v in self.comments["event"]:
lines.append("#C " + v)
for v in self.comments["resource"]:
lines.append("#C " + v)
for v in self.comments["datum"]:
lines.append("#C " + v)
for v in self.comments["stop"]:
lines.append("#C " + v)
return lines
def _write_lines_(self, lines, mode="a"):
"""write (more) lines to the file"""
with open(self.spec_filename, mode) as f:
f.write("\n".join(lines))
[docs] def write_scan(self):
"""
write the most recent (completed) scan to the file
* creates file if not existing
* writes header if needed
* appends scan data
note: does nothing if there are no lines to be written
"""
if os.path.exists(self.spec_filename):
with open(self.spec_filename) as f:
buf = f.read()
if buf.find(self.uid) >= 0:
# raise exception if uid is already in the file!
fmt = "{} already contains uid={}"
raise ValueError(fmt.format(self.spec_filename, self.uid))
logger = logging.getLogger(__name__)
lines = self.prepare_scan_contents()
lines.append("")
if lines is not None:
if self.write_file_header:
self.write_header()
logger.info("wrote header to SPEC file: %s", self.spec_filename)
self._write_lines_(lines, mode="a")
logger.info("wrote scan %d to SPEC file: %s", self.scan_id, self.spec_filename)
[docs] def make_default_filename(self):
"""generate a file name to be used as default"""
now = datetime.now()
return datetime.strftime(now, "%Y%m%d-%H%M%S")+".dat"
[docs] def newfile(self, filename=None, reset_scan_id=False, RE=None):
"""
prepare to use a new SPEC data file
but don't create it until we have data
"""
self.clear()
filename = filename or self.make_default_filename()
if os.path.exists(filename):
ValueError("file {} exists".format(filename))
self.spec_filename = filename
self.spec_epoch = int(time.time()) # ! no roundup here!!!
self.spec_host = socket.gethostname() or 'localhost'
self.spec_user = getpass.getuser() or 'BlueSkyUser'
self.write_file_header = True # don't write the file yet
if reset_scan_id and RE is not None:
# assume isinstance(RE, bluesky.run_engine.RunEngine)
RE.md["scan_id"] = SCAN_ID_RESET_VALUE
print("scan ID set to {}".format(SCAN_ID_RESET_VALUE))
return self.spec_filename
[docs] def usefile(self, filename):
"""read from existing SPEC data file"""
if not os.path.exists(self.spec_filename):
IOError("file {} does not exist".format(filename))
scan_id = None
with open(filename, "r") as f:
key = "#F"
line = f.readline().strip()
if not line.startswith(key+" "):
raise ValueError("first line does not start with "+key)
key = "#E"
line = f.readline().strip()
if not line.startswith(key+" "):
raise ValueError("first line does not start with "+key)
epoch = int(line.split()[-1])
key = "#D"
line = f.readline().strip()
if not line.startswith(key+" "):
raise ValueError("first line does not start with "+key)
# ignore content, it is derived from #E line
key = "#C"
line = f.readline().strip()
if not line.startswith(key+" "):
raise ValueError("first line does not start with "+key)
p = line.split()
username = "BlueSkyUser"
if len(p) > 4 and p[2] == "user":
username = p[4]
# find the last scan number used
key = "#S"
for line in f.readlines():
if line.startswith(key+" ") and len(line.split())>1:
scan_id = int(line.split()[1])
self.spec_filename = filename
self.spec_epoch = epoch
self.spec_user = username
return scan_id