"""Software for tomography scanning with EPICS at APS beamline 7-BM-B
Classes
-------
TomoScan7BM
Derived class for tomography scanning with EPICS at APS beamline 7-BM-B
"""
import time
import os
import math
import h5py
from pathlib import Path
import numpy as np
from epics import PV
from tomoscan import data_management as dm
from tomoscan.tomoscan import TomoScan
from tomoscan.tomoscan_helical import TomoScanHelical
from tomoscan import log
EPSILON = .001
[docs]class TomoScan7BM(TomoScanHelical):
"""Derived class used for tomography scanning with EPICS at APS beamline 7-BM-B
Parameters
----------
pv_files : list of str
List of files containing EPICS pvNames to be used.
macros : dict
Dictionary of macro definitions to be substituted when
reading the pv_files
"""
def __init__(self, pv_files, macros):
log.setup_custom_logger(lfname=Path.home().joinpath('logs','TomoScan_7BM.log'), stream_to_console=True)
super().__init__(pv_files, macros)
# set TomoScan xml files
self.epics_pvs['CamNDAttributesFile'].put('TomoScanDetectorAttributes.xml')
self.epics_pvs['FPXMLFileName'].put('TomoScanLayout.xml')
macro = 'DET=' + self.pv_prefixes['Camera'] + ',' + 'TC=' + self.epics_pvs['Testing'].__dict__['pvname'].replace('Testing', '', 1)
self.control_pvs['CamNDAttributesMacros'].put(macro)
# Set the detector running in FreeRun mode
self.set_trigger_mode('FreeRun', 1)
# Set data directory
file_path = Path(self.epics_pvs['DetectorTopDir'].get(as_string=True))
#file_path = file_path.joinpath(self.epics_pvs['ExperimentYearMonth'].get(as_string=True))
file_path = file_path.joinpath(self.epics_pvs['ExperimentYearMonth'].get(as_string=True) + '-'
+ self.epics_pvs['UserLastName'].get(as_string=True) + '-'
+ self.epics_pvs['ProposalNumber'].get(as_string=True))
self.epics_pvs['FilePath'].put(str(file_path), wait=True)
# Enable auto-increment on file writer
self.epics_pvs['FPAutoIncrement'].put('Yes')
# Enable over-writing warning
self.epics_pvs['OverwriteWarning'].put('Yes')
[docs] def fly_scan(self):
"""Overrides fly_scan in super class to catch a bad file path.
"""
if self.epics_pvs['FilePathExists'].get() == 1:
log.info('file path for file writer exists')
super(TomoScanHelical, self).fly_scan()
else:
log.info('file path for file writer not found')
self.epics_pvs['ScanStatus'].put('Abort: Bad File Path')
self.epics_pvs['StartScan'].put(0)
self.scan_is_running = False
[docs] def collect_static_frame(self):
"""Collects num_frames images in "InternalSingle" trigger mode for gains
"""
# This is called when collecting dark fields or flat fields
log.info('collect static frame')
self.set_trigger_mode('InternalSingle', 1)
self.epics_pvs['CamAcquire'].put('Acquire')
# Wait for detector and file plugin to be ready
time.sleep(0.5)
frame_time = self.compute_frame_time()
self.wait_camera_done(frame_time + 5.0)
[docs] def open_shutter(self):
"""Opens the shutter to collect flat fields or projections.
This does the following:
- Checks if we are in testing mode. If we are, do nothing.
- Opens the front end shutter, waiting for it to indicate it is open.
This is copied from the 2-BM implementation 9/2020
- Opens the 7-BM-B fast shutter.
"""
if self.epics_pvs['Testing'].get():
log.warning('In testing mode, so not opening shutters.')
return
# Open the front end shutter
if not self.epics_pvs['OpenShutter'] is None:
pv = self.epics_pvs['OpenShutter']
value = self.epics_pvs['OpenShutterValue'].get(as_string=True)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
log.info('open shutter: %s, value: %s', pv, value)
self.epics_pvs['OpenShutter'].put(value, wait=True)
self.wait_pv(self.epics_pvs['ShutterStatus'], 0)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
# Open 7-BM-B fast shutter
if not self.epics_pvs['OpenFastShutter'] is None:
pv = self.epics_pvs['OpenFastShutter']
value = self.epics_pvs['OpenFastShutterValue'].get(as_string=True)
log.info('open fast shutter: %s, value: %s', pv, value)
self.epics_pvs['OpenFastShutter'].put(value, wait=True)
[docs] def close_shutter(self):
"""Closes the shutter to collect dark fields and at the end of a scan
This does the following:
- Checks if we are in testing mode. If we are, do nothing
- Closes the 7-BM-B fast shutter.
- Closes the beamline shutter.
"""
if self.epics_pvs['Testing'].get():
log.warning('In testing mode, so not closing shutters.')
return
# Close 7-BM-B fast shutter; don't wait for it
if not self.epics_pvs['CloseFastShutter'] is None:
pv = self.epics_pvs['CloseFastShutter']
value = self.epics_pvs['CloseFastShutterValue'].get(as_string=True)
log.info('close fast shutter: %s, value: %s', pv, value)
self.epics_pvs['CloseFastShutter'].put(value, wait=False)
# Call the base class method
if not self.epics_pvs['CloseShutter'] is None:
pv = self.epics_pvs['CloseShutter']
value = self.epics_pvs['CloseShutterValue'].get(as_string=True)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
log.info('close shutter: %s, value: %s', pv, value)
self.epics_pvs['CloseShutter'].put(value, wait=True)
self.wait_pv(self.epics_pvs['ShutterStatus'], 1)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
[docs] def set_trigger_mode(self, trigger_mode, num_images):
"""Sets the trigger mode for the camera.
Parameters
----------
trigger_mode : str
Choices are: "FreeRun", "Internal", or "PSOExternal"
num_images : int
Number of images to collect. Ignored if trigger_mode="FreeRun".
This is used to set the ``NumImages`` PV of the camera.
"""
if trigger_mode == 'FreeRun':
self.epics_pvs['CamImageMode'].put('Continuous', wait=True)
self.epics_pvs['CamTriggerMode'].put('Off', wait=True)
self.epics_pvs['CamAcquire'].put('Acquire')
elif trigger_mode == 'Internal':
self.epics_pvs['CamTriggerMode'].put('Off', wait=True)
self.epics_pvs['CamImageMode'].put('Multiple')
self.epics_pvs['CamNumImages'].put(num_images, wait=True)
elif trigger_mode == 'InternalSingle':
self.epics_pvs['CamTriggerMode'].put('Off', wait=True)
self.epics_pvs['CamImageMode'].put('Single')
else: # set camera to external triggering
self.epics_pvs['CamTriggerMode'].put('On', wait=True)
ext_source = str(self.epics_pvs['ExternalTriggerSource'].get())
self.epics_pvs['CamTriggerSource'].put(ext_source, wait=True)
self.epics_pvs['CamTriggerOverlap'].put('ReadOut', wait=True)
self.epics_pvs['CamExposureMode'].put('Timed', wait=True)
self.epics_pvs['CamImageMode'].put('Multiple')
self.epics_pvs['CamArrayCallbacks'].put('Enable')
self.epics_pvs['CamFrameRateEnable'].put(0)
self.epics_pvs['CamNumImages'].put(self.num_angles, wait=True)
self.epics_pvs['CamTriggerMode'].put('On', wait=True)
self.wait_pv(self.epics_pvs['CamTriggerMode'], 1)
[docs] def collect_flat_fields(self):
"""Collects flat field data
This overrides the super class's method to allow for camera gain
images to be acquired.
- Performs the normal flat corrections
- Set the FrameType to the appropriate value
- For exposure times from 0 to the flat exposure time:
- Change exposure time to the appropriate value
- Take a single frame
"""
super().collect_flat_fields()
if self.num_gain_fields:
self.epics_pvs['ScanStatus'].put('Collecting camera gains')
log.info('collecting camera gains')
self.epics_pvs['HDF5Location'].put(self.epics_pvs['HDF5GainsLocation'].value)
self.epics_pvs['FrameType'].put('Gains')
self.gain_exp_times = np.linspace(0, self.epics_pvs['FlatExposureTime'].value, self.num_gain_fields)
for gain_time in self.gain_exp_times:
log.info('image at exposure time = {0:6.4f}'.format(gain_time))
self.control_pvs['CamAcquireTime'].put(gain_time, wait=True)
self.collect_static_frame()
[docs] def begin_scan(self):
"""Performs the operations needed at the very start of a scan.
This is an override of the begin_scan in TomoScanPSO to account for
the extra frames due to saving camera gains.
This does the following:
- Calls the base class method.
- Sets the speed of the rotation motor
- Computes the delta theta, start and stop motor positions for the scan
- Programs the Aerotech driver to provide pulses at the right positions
"""
log.info('begin scan')
# Call the base class method from TomoScan
TomoScan.begin_scan(self)
time.sleep(0.1)
# Program the stage driver to provide PSO pulses
self.compute_positions_PSO()
self.program_PSO()
# Insert the number of gain images
self.num_gain_fields = self.epics_pvs['NumGainFields'].value
if self.num_gain_fields:
if self.flat_field_mode != 'None':
self.total_images += self.num_gain_fields
if self.flat_field_mode == 'Both':
self.total_images += self.num_gain_fields
self.epics_pvs['FPNumCapture'].put(self.total_images, wait=True)
self.epics_pvs['FPCapture'].put('Capture')
[docs] def end_scan(self):
"""Performs the operations needed at the very end of a scan.
This does the following:
- Add theta to the raw data file.
- Close the shutter
- Calls the base class method.
"""
log.info('end scan')
# Close the shutter
self.close_shutter()
# Stop the file plugin, though it should be done already
log.info('stop the file plugin')
self.epics_pvs['FPCapture'].put('Done')
log.info('Check the status of the plugin')
self.wait_pv(self.epics_pvs['FPCaptureRBV'], 0)
# Add theta in the hdf file
self.add_theta()
# Add the exposure times for the gain images
self.add_gain_exp_times()
# Copy file to the analysis computer, if desired
self.auto_copy_data()
# Call the base class method
super().end_scan()
[docs] def add_theta(self):
"""Add theta at the end of a scan.
Taken from tomoscan_2BM.py function. This gives the correct theta for scans with missing frames
"""
log.info('add theta')
time.sleep(1.0)
if self.theta is None:
log.warning('no theta to add')
return
full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True)
if os.path.exists(full_file_name):
try:
with h5py.File(full_file_name, "a") as f:
unique_ids = f['/defaults/NDArrayUniqueId']
hdf_location = f['/defaults/HDF5FrameLocation']
total_dark_fields = self.num_dark_fields * ((self.dark_field_mode in ('Start', 'Both')) + (self.dark_field_mode in ('End', 'Both')))
total_flat_fields = self.num_flat_fields * ((self.flat_field_mode in ('Start', 'Both')) + (self.flat_field_mode in ('End', 'Both')))
proj_ids = unique_ids[hdf_location[:] == b'/exchange/data']
flat_ids = unique_ids[hdf_location[:] == b'/exchange/data_white']
dark_ids = unique_ids[hdf_location[:] == b'/exchange/data_dark']
# create theta dataset in hdf5 file
if len(proj_ids) == 0 or len(self.theta) == 0:
log.error('No value theta values')
log.error('Abort adding theta')
return
if len(proj_ids) > 0:
theta_ds = f.create_dataset('/exchange/theta', (len(proj_ids),))
theta_ds[:] = self.theta[proj_ids - proj_ids[0]]
theta_ds.attrs['units'] = 'degrees'
# warnings that data is missing
if len(proj_ids) != len(self.theta):
log.warning(f'There are {len(self.theta) - len(proj_ids)} missing data frames')
missed_ids = [ele for ele in range(len(self.theta)) if ele not in proj_ids-proj_ids[0]]
missed_theta = self.theta[missed_ids]
log.warning(f'Missed theta: {list(missed_theta)}')
if len(flat_ids) != total_flat_fields:
log.warning(f'There are {total_flat_fields - len(flat_ids)} missing flat field frames')
if (len(dark_ids) != total_dark_fields):
log.warning(f'There are {total_dark_fields - len(dark_ids)} missing dark field frames')
except:
log.error('Add theta: Failed accessing: %s', full_file_name)
traceback.print_exc(file=sys.stdout)
else:
log.error('Failed adding theta. %s file does not exist', full_file_name)
[docs] def add_gain_exp_times(self):
"""Add gain exposure times to the output file.
"""
log.info('add gain exposure times')
time.sleep(1.0)
if self.gain_exp_times is None:
log.warning('no gain exposure times to add')
return
full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True)
if os.path.exists(full_file_name):
try:
with h5py.File(full_file_name, "a") as f:
gain_exp_times_ds = f.create_dataset(
'/exchange/gain_exp_times',
data = self.gain_exp_times,
)
gain_exp_times_ds.attrs['units'] = 's'
except:
log.error('Add gain exposure times: Failed accessing: %s', full_file_name)
traceback.print_exc(file=sys.stdout)
else:
log.error('Failed adding gain exposure times. %s file does not exist', full_file_name)
[docs] def wait_pv(self, epics_pv, wait_val, timeout=np.inf, delta_t=0.01):
"""Wait on a pv to be a value until max_timeout (default forever)
delay for pv to change
"""
log.info('wait_pv')
time.sleep(delta_t)
start_time = time.time()
while time.time() - start_time < timeout:
pv_val = epics_pv.get()
if isinstance(pv_val, float):
if abs(pv_val - wait_val) < EPSILON:
return True
if pv_val == wait_val:
return True
time.sleep(delta_t)
else:
log.error(' *** ERROR: PV TIMEOUT ***')
log.error(' *** wait_pv(%s, %d, %5.2f reached max timeout. Return False',
epics_pv.pvname, wait_val, timeout)
return False
[docs] def auto_copy_data(self):
'''Copies data from detector computer to analysis computer.
'''
# Copy raw data to data analysis computer
if self.epics_pvs['CopyToAnalysisDir'].get():
log.info('Automatic data trasfer to data analysis computer is enabled.')
self.epics_pvs['ScanStatus'].put('Auto File Transfer')
full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True)
remote_analysis_dir = self.epics_pvs['RemoteAnalysisDir'].get(as_string=True)
dm.fdt_scp(full_file_name, remote_analysis_dir, Path(self.epics_pvs['DetectorTopDir'].get()))
self.epics_pvs['ScanStatus'].put('File Transfer Complete')
#dm.scp(full_file_name, remote_analysis_dir)
else:
log.warning('Automatic data trasfer to data analysis computer is disabled.')