"""
.. _tomoStream: https://tomostream.readthedocs.io
.. _circular buffer plugin: https://cars9.uchicago.edu/software/epics/NDPluginCircularBuff.html
.. _AreaDetector: https://areadetector.github.io/master/index.html
.. _stream: https://tomoscan.readthedocs.io/en/latest/tomoScanApp.html#tomoscan-2bm-stream-adl
Software for tomography stream scanning with EPICS at APS beamline 2-BM
This class support `tomoStream`_ by providing:
- Dark-flat field image PVs broadcasting
| Dark-flat field images are broadcasted using PVaccess. Dark-flat field images are also saved in a temporary \
hdf5 file that are re-written whenever new flat/dark fields are acquired. Acquisition of dark and flat fields is \
performed without stopping rotation of the stage. Dark-flat field images can also be binned setting the binning \
parameter in ROI1 plugin.
- On-demand capturing to an hdf5 file
| The capturing/saving to an hdf5 file can be done on-demand by pressing the Capture proj button in the `Stream`_\
MEDM control screen. Whenever capturing is done, dark/flat fields from the temporarily hdf5 file are added to the file containing \
the projections and the experimental meta data. In addition, the `circular buffer plugin`_ (CB1) of `AreaDetector`_ \
is used to store a set of projections acquired before capturing is started. This allows to save projections containing \
information about the sample right before a sample change is detected. Data from the circular buffer is also added to \
the hdf5 after capturing is done. The resulting hdf5 file has the same format as in regular single tomoscan file.
Classes
-------
TomoScanStream2BM
Derived class for tomography scanning in streaming mode with EPICS at APS beamline 2-BM
"""
import traceback
import os
import time
from pathlib import Path
import h5py
import numpy as np
from tomoscan.tomoscan_stream_pso import TomoScanStreamPSO
from tomoscan import log
from tomoscan import util
import threading
import pvaccess
EPSILON = .001
[docs]class TomoScanStream7BM(TomoScanStreamPSO):
"""Derived class used for tomography scanning in streamaing mode with EPICS at APS beamline 2-BM
Parameters
----------
pv_files : list of str
List of files containing EPICS pvNames to be used.
macros : dict
Dictionary of macro definitions to be substituted when
reading the pv_files
"""
def __init__(self, pv_files, macros):
super().__init__(pv_files, macros)
# Set the detector running in FreeRun mode
self.set_trigger_mode('FreeRun', 1)
# Set data directory
file_path = Path(self.epics_pvs['DetectorTopDir'].get(as_string=True))
#file_path = file_path.joinpath(self.epics_pvs['ExperimentYearMonth'].get(as_string=True))
file_path = file_path.joinpath(self.epics_pvs['ExperimentYearMonth'].get(as_string=True) + '-'
+ self.epics_pvs['UserLastName'].get(as_string=True) + '-'
+ self.epics_pvs['ProposalNumber'].get(as_string=True))
self.epics_pvs['FilePath'].put(str(file_path), wait=True)
macro = 'DET=' + self.pv_prefixes['Camera'] + ',' + 'TC=' + self.epics_pvs['Testing'].__dict__['pvname'].replace('Testing', '', 1)
self.control_pvs['CamNDAttributesMacros'].put(macro)
# Enable auto-increment on file writer
self.epics_pvs['FPAutoIncrement'].put('Yes')
# Set standard file template on file writer
self.epics_pvs['FPFileTemplate'].put("%s%s_%3.3d.h5", wait=True)
# Disable overwriting warning
self.epics_pvs['OverwriteWarning'].put('Yes')
[docs] def open_shutter(self):
"""Opens the shutter to collect flat fields or projections.
This does the following:
- Checks if we are in testing mode. If we are, do nothing.
- Opens the front end shutter, waiting for it to indicate it is open.
This is copied from the 2-BM implementation 9/2020
- Opens the 7-BM-B fast shutter.
"""
if self.epics_pvs['Testing'].get():
log.warning('In testing mode, so not opening shutters.')
return
# Open the front end shutter
if not self.epics_pvs['OpenShutter'] is None:
pv = self.epics_pvs['OpenShutter']
value = self.epics_pvs['OpenShutterValue'].get(as_string=True)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
log.info('open shutter: %s, value: %s', pv, value)
self.epics_pvs['OpenShutter'].put(value, wait=True)
self.wait_pv(self.epics_pvs['ShutterStatus'], 0)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
# Open 7-BM-B fast shutter
if not self.epics_pvs['OpenFastShutter'] is None:
pv = self.epics_pvs['OpenFastShutter']
value = self.epics_pvs['OpenFastShutterValue'].get(as_string=True)
log.info('open fast shutter: %s, value: %s', pv, value)
self.epics_pvs['OpenFastShutter'].put(value, wait=True)
[docs] def close_shutter(self):
"""Closes the shutter to collect dark fields and at the end of a scan
This does the following:
- Checks if we are in testing mode. If we are, do nothing
- Closes the 7-BM-B fast shutter.
- Closes the beamline shutter.
"""
if self.epics_pvs['Testing'].get():
log.warning('In testing mode, so not closing shutters.')
return
# Close 7-BM-B fast shutter; don't wait for it
if not self.epics_pvs['CloseFastShutter'] is None:
pv = self.epics_pvs['CloseFastShutter']
value = self.epics_pvs['CloseFastShutterValue'].get(as_string=True)
log.info('close fast shutter: %s, value: %s', pv, value)
self.epics_pvs['CloseFastShutter'].put(value, wait=False)
# Close the beamline shutter
if not self.epics_pvs['CloseShutter'] is None:
pv = self.epics_pvs['CloseShutter']
value = self.epics_pvs['CloseShutterValue'].get(as_string=True)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
log.info('close shutter: %s, value: %s', pv, value)
self.epics_pvs['CloseShutter'].put(value, wait=True)
self.wait_pv(self.epics_pvs['ShutterStatus'], 1)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
[docs] def set_trigger_mode(self, trigger_mode, num_images):
"""Sets the trigger mode for the camera.
Parameters
----------
trigger_mode : str
Choices are: "FreeRun", "Internal", or "PSOExternal"
num_images : int
Number of images to collect. Ignored if trigger_mode="FreeRun".
This is used to set the ``NumImages`` PV of the camera.
"""
if trigger_mode == 'FreeRun':
self.epics_pvs['CamAcquire'].put('Done', wait=True)
self.epics_pvs['CamImageMode'].put('Continuous', wait=True)
self.epics_pvs['CamTriggerMode'].put('Off', wait=True)
self.epics_pvs['CamAcquire'].put('Acquire')
elif trigger_mode == 'Internal':
self.epics_pvs['CamTriggerMode'].put('Off', wait=True)
self.epics_pvs['CamImageMode'].put('Multiple')
self.epics_pvs['CamNumImages'].put(num_images, wait=True)
else: # set camera to external triggering
self.epics_pvs['CamTriggerMode'].put('On', wait=True)
ext_source = str(self.epics_pvs['ExternalTriggerSource'].get())
self.epics_pvs['CamTriggerSource'].put(ext_source, wait=True)
self.epics_pvs['CamTriggerOverlap'].put('ReadOut', wait=True)
self.epics_pvs['CamExposureMode'].put('Timed', wait=True)
self.epics_pvs['CamImageMode'].put('Continuous')
self.epics_pvs['CamArrayCallbacks'].put('Enable')
self.epics_pvs['CamFrameRateEnable'].put(0)
self.epics_pvs['CamNumImages'].put(self.num_angles, wait=True)
self.epics_pvs['CamTriggerMode'].put('On', wait=True)
self.wait_pv(self.epics_pvs['CamTriggerMode'], 1)
[docs] def end_scan(self):
"""Performs the operations needed at the very end of a scan.
This does the following:
- Calls ``save_configuration()``.
- Put the camera back in "FreeRun" mode and acquiring so the user sees live images.
- Sets the speed of the rotation stage back to the maximum value.
- Calls ``move_sample_in()``.
- Calls the base class method.
- Closes shutter.
"""
log.info('end scan')
# Close the shutter
self.close_shutter()
# Stop the file plugin, though it should be done already
self.epics_pvs['FPCapture'].put('Done')
self.wait_pv(self.epics_pvs['FPCaptureRBV'], 0)
# Add theta in the hdf file
#self.add_theta()
# Call the base class method
super().end_scan()
[docs] def add_theta(self):
"""Add theta at the end of a scan.
"""
log.info('add theta')
self.theta = np.linspace(self.rotation_start, self.rotation_stop, self.num_angles)
full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True)
file_name_path = Path(full_file_name)
if os.path.exists(full_file_name):
try:
f = h5py.File(full_file_name, "a")
with f:
try:
if self.theta is not None:
theta_ds = f.create_dataset('/exchange/theta', data = self.theta)
except:
log.error('Add theta: Failed accessing: %s', full_file_name)
traceback.print_exc(file=sys.stdout)
except OSError:
log.error('Add theta aborted')
else:
log.error('Failed adding theta. %s file does not exist', full_file_name)
[docs] def wait_pv(self, epics_pv, wait_val, timeout=np.inf, delta_t=0.01):
"""Wait on a pv to be a value until max_timeout (default forever)
delay for pv to change
"""
time.sleep(delta_t)
start_time = time.time()
while time.time() - start_time < timeout:
pv_val = epics_pv.get()
if isinstance(pv_val, float):
if abs(pv_val - wait_val) < EPSILON:
return True
if pv_val == wait_val:
return True
time.sleep(delta_t)
else:
log.error(' *** ERROR: PV TIMEOUT ***')
log.error(' *** wait_pv(%s, %d, %5.2f reached max timeout. Return False',
epics_pv.pvname, wait_val, timeout)
return False
[docs] def auto_copy_data(self):
'''Copies data from detector computer to analysis computer.
'''
# Copy raw data to data analysis computer
if self.epics_pvs['CopyToAnalysisDir'].get():
log.info('Automatic data trasfer to data analysis computer is enabled.')
full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True)
remote_analysis_dir = self.epics_pvs['RemoteAnalysisDir'].get(as_string=True)
dm.scp(full_file_name, remote_analysis_dir)
else:
log.warning('Automatic data trasfer to data analysis computer is disabled.')
[docs] def move_sample_in(self):
"""Moves the sample to the in beam position for collecting projections.
The in-beam position is defined by the ``SampleInX`` and ``SampleInY`` PVs.
Which axis to move is defined by the ``FlatFieldAxis`` PV,
which can be ``X``, ``Y``, or ``Both``.
"""
axis = self.epics_pvs['FlatFieldAxis'].get(as_string=True)
log.info('move_sample_in axis: %s', axis)
if axis in ('X', 'Both'):
position = self.epics_pvs['SampleInX'].value
self.epics_pvs['SampleX'].put(position, wait=True, timeout=600)
if axis in ('Y', 'Both'):
position = self.epics_pvs['SampleInY'].value
self.epics_pvs['SampleY'].put(position, wait=True, timeout=600)
self.epics_pvs['MoveSampleIn'].put('Done')
[docs] def move_sample_out(self):
"""Moves the sample to the out of beam position for collecting flat fields.
The out of beam position is defined by the ``SampleOutX`` and ``SampleOutY`` PVs.
Which axis to move is defined by the ``FlatFieldAxis`` PV,
which can be ``X``, ``Y``, or ``Both``.
"""
axis = self.epics_pvs['FlatFieldAxis'].get(as_string=True)
log.info('move_sample_out axis: %s', axis)
if axis in ('X', 'Both'):
position = self.epics_pvs['SampleOutX'].value
self.epics_pvs['SampleX'].put(position, wait=True, timeout=600)
if axis in ('Y', 'Both'):
position = self.epics_pvs['SampleOutY'].value
self.epics_pvs['SampleY'].put(position, wait=True, timeout=600)
self.epics_pvs['MoveSampleOut'].put('Done')