"""Software for tomography step scanning with EPICS at APS beamline 2-BM
Classes
-------
TomoScan2BM
Derived class for tomography step scanning with EPICS at APS beamline 2-BM
"""
import time
import os
import sys
import h5py
import traceback
import numpy as np
from tomoscan import data_management as dm
from tomoscan.tomoscan_step import TomoScanSTEP
from tomoscan import log
EPSILON = .001
[docs]class TomoScan2BMSTEP(TomoScanSTEP):
"""Derived class used for tomography scanning with EPICS at APS beamline 2-BM
Parameters
----------
pv_files : list of str
List of files containing EPICS pvNames to be used.
macros : dict
Dictionary of macro definitions to be substituted when
reading the pv_files
"""
def __init__(self, pv_files, macros):
super().__init__(pv_files, macros)
# set TomoScan xml files
self.epics_pvs['CamNDAttributesFile'].put('TomoScanDetectorAttributes.xml')
self.epics_pvs['FPXMLFileName'].put('TomoScanLayout.xml')
macro = 'DET=' + self.pv_prefixes['Camera'] + ',' + 'TS=' + self.epics_pvs['Testing'].__dict__['pvname'].replace('Testing', '', 1)
self.control_pvs['CamNDAttributesMacros'].put(macro)
# Enable auto-increment on file writer
self.epics_pvs['FPAutoIncrement'].put('Yes')
# # Set standard file template on file writer
# self.epics_pvs['FPFileTemplate'].put("%s%s_%3.3d.h5", wait=True)
# Disable over writing warning
self.epics_pvs['OverwriteWarning'].put('Yes')
# Set AD plugins
self.epics_pvs['PVANDArrayPort'].put('OVER1')
self.epics_pvs['PVAEnableCallbacks'].put('Enable')
self.epics_pvs['ROIEnableCallbacks'].put('Disable')
self.epics_pvs['CBEnableCallbacks'].put('Disable')
self.epics_pvs['FPEnableCallbacks'].put('Enable')
log.setup_custom_logger("./tomoscan.log")
[docs] def open_frontend_shutter(self):
"""Opens the shutters to collect flat fields or projections.
This does the following:
- Checks if we are in testing mode. If we are, do nothing else opens the 2-BM-A front-end shutter.
"""
if self.epics_pvs['Testing'].get():
log.warning('In testing mode, so not opening shutters.')
else:
# Open 2-BM-A front-end shutter
if not self.epics_pvs['OpenShutter'] is None:
pv = self.epics_pvs['OpenShutter']
value = self.epics_pvs['OpenShutterValue'].get(as_string=True)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
log.info('open shutter: %s, value: %s', pv, value)
self.epics_pvs['OpenShutter'].put(value, wait=True)
self.wait_frontend_shutter_open()
# self.wait_pv(self.epics_pvs['ShutterStatus'], 1)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
[docs] def open_shutter(self):
"""Opens the shutters to collect flat fields or projections.
This does the following:
- Opens the 2-BM-A fast shutter.
"""
# Open 2-BM-A fast shutter
if not self.epics_pvs['OpenFastShutter'] is None:
pv = self.epics_pvs['OpenFastShutter']
value = self.epics_pvs['OpenFastShutterValue'].get(as_string=True)
log.info('open fast shutter: %s, value: %s', pv, value)
self.epics_pvs['OpenFastShutter'].put(value, wait=True)
[docs] def close_frontend_shutter(self):
"""Closes the shutters to collect dark fields.
This does the following:
- Closes the 2-BM-A front-end shutter.
"""
if self.epics_pvs['Testing'].get():
log.warning('In testing mode, so not opening shutters.')
else:
# Close 2-BM-A front-end shutter
if not self.epics_pvs['CloseShutter'] is None:
pv = self.epics_pvs['CloseShutter']
value = self.epics_pvs['CloseShutterValue'].get(as_string=True)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
log.info('close shutter: %s, value: %s', pv, value)
self.epics_pvs['CloseShutter'].put(value, wait=True)
self.wait_pv(self.epics_pvs['ShutterStatus'], 0)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
[docs] def close_shutter(self):
"""Closes the shutters to collect dark fields.
This does the following:
- Closes the 2-BM-A fast shutter.
"""
# Close 2-BM-A fast shutter
if not self.epics_pvs['CloseFastShutter'] is None:
pv = self.epics_pvs['CloseFastShutter']
value = self.epics_pvs['CloseFastShutterValue'].get(as_string=True)
log.info('close fast shutter: %s, value: %s', pv, value)
self.epics_pvs['CloseFastShutter'].put(value, wait=True)
[docs] def set_trigger_mode(self, trigger_mode, num_images):
"""Sets the trigger mode SIS3820 and the camera.
Parameters
----------
trigger_mode : str
Choices are: "FreeRun", "Internal", or "PSOExternal"
num_images : int
Number of images to collect. Ignored if trigger_mode="FreeRun".
This is used to set the ``NumImages`` PV of the camera.
"""
self.epics_pvs['CamAcquire'].put('Done') ###
self.wait_pv(self.epics_pvs['CamAcquire'], 0) ###
log.info('set trigger mode: %s', trigger_mode)
if trigger_mode == 'FreeRun':
self.epics_pvs['CamImageMode'].put('Continuous', wait=True)
self.epics_pvs['CamTriggerMode'].put('Off', wait=True)
self.wait_pv(self.epics_pvs['CamTriggerMode'], 0)
# self.epics_pvs['CamAcquire'].put('Acquire')
elif trigger_mode == 'Internal':
self.epics_pvs['CamTriggerMode'].put('Off', wait=True)
self.wait_pv(self.epics_pvs['CamTriggerMode'], 0)
self.epics_pvs['CamImageMode'].put('Multiple')
self.epics_pvs['CamNumImages'].put(num_images, wait=True)
else: # set camera to internal triggering
# These are just in case the scan aborted with the camera in another state
camera_model = self.epics_pvs['CamModel'].get(as_string=True)
if(camera_model=='Oryx ORX-10G-51S5M' or camera_model=='Oryx ORX-10G-310S9M'):
self.epics_pvs['CamTriggerMode'].put('Off', wait=True) # VN: For FLIR we first switch to Off and then change overlap. any reason of that?
self.epics_pvs['CamTriggerSource'].put('Line2', wait=True)
elif(camera_model=='Grasshopper3 GS3-U3-23S6M'):# 2bmb
self.epics_pvs['CamTriggerMode'].put('On', wait=True) # VN: For PG we need to switch to On to be able to switch to readout overlap mode
self.epics_pvs['CamTriggerSource'].put('Software', wait=True)
self.epics_pvs['CamTriggerOverlap'].put('ReadOut', wait=True)
self.epics_pvs['CamExposureMode'].put('Timed', wait=True)
self.epics_pvs['CamImageMode'].put('Multiple')
self.epics_pvs['CamArrayCallbacks'].put('Enable')
self.epics_pvs['CamFrameRateEnable'].put(0)
self.epics_pvs['CamNumImages'].put(num_images, wait=True)
self.epics_pvs['CamTriggerMode'].put('On', wait=True)
self.wait_pv(self.epics_pvs['CamTriggerMode'], 1)
[docs] def begin_scan(self):
"""Performs the operations needed at the very start of a scan.
This does the following:
- Set data directory.
- Set the TomoScan xml files
- Calls the base class method.
- Opens the front-end shutter.
"""
log.info('begin scan')
# Set data directory
file_path = self.epics_pvs['DetectorTopDir'].get(as_string=True) + self.epics_pvs['ExperimentYearMonth'].get(as_string=True) + os.path.sep + self.epics_pvs['UserLastName'].get(as_string=True) + os.path.sep
self.epics_pvs['FilePath'].put(file_path, wait=True)
# Call the base class method
super().begin_scan()
# Opens the front-end shutter
self.open_frontend_shutter()
[docs] def end_scan(self):
"""Performs the operations needed at the very end of a scan.
This does the following:
- Reset rotation position by mod 360.
- Calls the base class method.
- Stop the file plugin.
- Closes shutter.
- Add theta to the raw data file.
- Copy raw data to data analysis computer.
"""
if self.return_rotation == 'Yes':
# Reset rotation position by mod 360 , the actual return
# to start position is handled by super().end_scan()
current_angle = self.epics_pvs['Rotation'].get() %360
self.epics_pvs['RotationSet'].put('Set', wait=True)
self.epics_pvs['Rotation'].put(current_angle, wait=True)
self.epics_pvs['RotationSet'].put('Use', wait=True)
# Call the base class method
super().end_scan()
# Close shutter
self.close_shutter()
# Stop the file plugin
self.epics_pvs['FPCapture'].put('Done')
self.wait_pv(self.epics_pvs['FPCaptureRBV'], 0)
# Add theta in the hdf file
self.add_theta()
# Copy raw data to data analysis computer
if self.epics_pvs['CopyToAnalysisDir'].get():
log.info('Automatic data trasfer to data analysis computer is enabled.')
full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True)
remote_analysis_dir = self.epics_pvs['RemoteAnalysisDir'].get(as_string=True)
dm.scp(full_file_name, remote_analysis_dir)
else:
log.warning('Automatic data trasfer to data analysis computer is disabled.')
[docs] def add_theta(self):
"""Add theta at the end of a scan.
"""
log.info('add theta')
full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True)
if os.path.exists(full_file_name):
try:
f = h5py.File(full_file_name, "a")
with f:
try:
if self.theta is not None:
# theta_ds = f.create_dataset('/exchange/theta', (len(self.theta),))
# theta_ds[:] = self.theta[:]
unique_ids = f['/defaults/NDArrayUniqueId']
shift_start = int(self.num_dark_fields > 0 and (self.dark_field_mode in ('Start', 'Both')))+ \
int(self.num_flat_fields > 0 and (self.flat_field_mode in ('Start', 'Both')))
# find beginnings of sorted subarrays
# for [1,2,1,3,1,2,3,4,1,2] returns 0,2,4,8
ids_list = [0,*np.where(unique_ids[1:]-unique_ids[:-1]<0)[0]+1]
# extract projection ids
if(len(ids_list[shift_start:])==1):
proj_ids = unique_ids[ids_list[shift_start]:]
else:
proj_ids = unique_ids[ids_list[shift_start]:ids_list[shift_start+1]]
# subtract first id
proj_ids -= proj_ids[0]
# create theta dataset in hdf5 file
theta_ds = f.create_dataset('/exchange/theta', (len(proj_ids),))
theta_ds[:] = self.theta[proj_ids]
if(len(proj_ids) != len(self.theta)):
log.warning('There are %d missing frames',len(self.theta)-len(proj_ids))
missed_ids = [ele for ele in range(len(self.theta)) if ele not in proj_ids]
missed_theta = self.theta[missed_ids]
log.warning(f'Missed ids: {list(missed_ids)}')
log.warning(f'Missed theta: {list(missed_theta)}')
except:
log.error('Add theta: Failed accessing: %s', full_file_name)
traceback.print_exc(file=sys.stdout)
except OSError:
log.error('Add theta aborted: %s not closed', full_file_name)
else:
log.error('Failed adding theta. %s file does not exist', full_file_name)
[docs] def wait_pv(self, epics_pv, wait_val, timeout=-1):
"""Wait on a pv to be a value until max_timeout (default forever)
delay for pv to change
"""
time.sleep(.01)
start_time = time.time()
while True:
pv_val = epics_pv.get()
if isinstance(pv_val, float):
if abs(pv_val - wait_val) < EPSILON:
return True
if pv_val != wait_val:
if timeout > -1:
current_time = time.time()
diff_time = current_time - start_time
if diff_time >= timeout:
log.error(' *** ERROR: DROPPED IMAGES ***')
log.error(' *** wait_pv(%s, %d, %5.2f reached max timeout. Return False',
epics_pv.pvname, wait_val, timeout)
return False
time.sleep(.01)
else:
return True
[docs] def wait_frontend_shutter_open(self, timeout=-1):
"""Waits for the front end shutter to open, or for ``abort_scan()`` to be called.
While waiting this method periodically tries to open the shutter..
Parameters
----------
timeout : float
The maximum number of seconds to wait before raising a ShutterTimeoutError exception.
Raises
------
ScanAbortError
If ``abort_scan()`` is called
ShutterTimeoutError
If the open shutter has not completed within timeout value.
"""
start_time = time.time()
pv = self.epics_pvs['OpenShutter']
value = self.epics_pvs['OpenShutterValue'].get(as_string = True)
log.info('open shutter: %s, value: %s', pv, value)
elapsed_time = 0
while True:
if self.epics_pvs['ShutterStatus'].get() == int(value):
log.warning("Shutter is open in %f s", elapsed_time)
return
if not self.scan_is_running:
exit()
value = self.epics_pvs['OpenShutterValue'].get()
time.sleep(1.0)
current_time = time.time()
elapsed_time = current_time - start_time
log.warning("Waiting on shutter to open: %f s", elapsed_time)
self.epics_pvs['OpenShutter'].put(value, wait=True)
if timeout > 0:
if elapsed_time >= timeout:
exit()
[docs] def abort_scan(self):
super().abort_scan()
self.add_theta()