"""Software for tomography scanning with EPICS at APS beamline 32-ID
Classes
-------
TomoScan32ID
Derived class for tomography scanning with EPICS at APS beamline 32-ID
"""
import time
import os
import h5py
import sys
import traceback
import numpy as np
from epics import PV
import pvaccess as pva
import threading
from pathlib import Path
from tomoscan import data_management as dm
from tomoscan.tomoscan_pso import TomoScanPSO
from tomoscan import log
EPSILON = .001
[docs]class SampleXError(Exception):
'''Exception raised when SampleX is not equal to SampleInX
'''
[docs]class TomoScan32ID(TomoScanPSO):
"""Derived class used for tomography scanning with EPICS at APS beamline 32-ID
Parameters
----------
pv_files : list of str
List of files containing EPICS pvNames to be used.
macros : dict
Dictionary of macro definitions to be substituted when
reading the pv_files
"""
def __init__(self, pv_files, macros):
super().__init__(pv_files, macros)
# Set the detector running in FreeRun mode
# self.set_trigger_mode('FreeRun', 1)
# self.epics_pvs['CamAcquire'].put('Acquire') ###
# self.wait_pv(self.epics_pvs['CamAcquire'], 1) ###
# set TomoScan xml files
self.epics_pvs['CamNDAttributesFile'].put('TomoScanDetectorAttributes.xml')
self.epics_pvs['FPXMLFileName'].put('TomoScanLayout.xml')
macro = 'DET=' + self.pv_prefixes['Camera'] + ',' + 'TS=' + self.epics_pvs['Testing'].__dict__['pvname'].replace('Testing', '', 1)
self.control_pvs['CamNDAttributesMacros'].put(macro)
# Enable auto-increment on file writer
self.epics_pvs['FPAutoIncrement'].put('Yes')
# Set standard file template on file writer
self.epics_pvs['FPFileTemplate'].put("%s%s_%3.3d.h5", wait=True)
# Disable over writing warning
self.epics_pvs['OverwriteWarning'].put('Yes')
# TXMOptics
prefix = self.pv_prefixes['TxmOptics']
self.epics_pvs['TXMEnergySet'] = PV(prefix+'EnergySet')
self.epics_pvs['TXMEnergy'] = PV(prefix+'Energy')
self.epics_pvs['TXMMoveAllOut'] = PV(prefix+'MoveAllOut')
self.epics_pvs['TXMMoveAllIn'] = PV(prefix+'MoveAllIn')
# pva type channel that contains projection and metadata
prefix = self.pv_prefixes['Image']
self.epics_pvs['PvaPImage'] = pva.Channel(prefix + 'Image')
self.epics_pvs['PvaPDataType_RBV'] = pva.Channel(prefix + 'DataType_RBV')
# energy scan
self.epics_pvs['EnergySet'].put(0)
self.epics_pvs['EnergySet'].add_callback(self.pv_callback_32id)
self.epics_pvs['SampleXSet'] = PV(self.control_pvs['SampleX'].pvname + '.SET')
self.epics_pvs['SampleYSet'] = PV(self.control_pvs['SampleY'].pvname + '.SET')
log.setup_custom_logger("./tomoscan.log")
[docs] def pv_callback_32id(self, pvname=None, value=None, char_value=None, **kw):
"""Callback function that is called by pyEpics when certain EPICS PVs are changed
"""
log.debug('pv_callback_32id pvName=%s, value=%s, char_value=%s', pvname, value, char_value)
if (pvname.find('EnergySet') != -1) and value==1:
thread = threading.Thread(target=self.energy_change, args=())
thread.start()
[docs] def open_frontend_shutter(self):
"""Opens the shutters to collect flat fields or projections.
This does the following:
- Checks if we are in testing mode. If we are, do nothing else opens the 2-BM-A front-end shutter.
"""
if self.epics_pvs['Testing'].get():
log.warning('In testing mode, so not opening shutters.')
else:
# Open front-end shutter
if not self.epics_pvs['OpenShutter'] is None:
pv = self.epics_pvs['OpenShutter']
value = self.epics_pvs['OpenShutterValue'].get(as_string=True)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
log.info('open shutter: %s, value: %s', pv, value)
self.epics_pvs['OpenShutter'].put(value, wait=True)
self.wait_frontend_shutter_open()
# self.wait_pv(self.epics_pvs['ShutterStatus'], 1)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
[docs] def open_shutter(self):
"""Opens the shutters to collect flat fields or projections.
This does the following:
- Opens the 32-ID-C fast shutter.
"""
# Open 32-ID-C fast shutter
if not self.epics_pvs['OpenFastShutter'] is None:
pv = self.epics_pvs['OpenFastShutter']
value = self.epics_pvs['OpenFastShutterValue'].get(as_string=True)
log.info('open fast shutter: %s, value: %s', pv, value)
self.epics_pvs['OpenFastShutter'].put(value, wait=True)
[docs] def close_frontend_shutter(self):
"""Closes the shutters to collect dark fields.
This does the following:
- Closes the 32-ID-C front-end shutter.
"""
if self.epics_pvs['Testing'].get():
log.warning('In testing mode, so not opening shutters.')
else:
# Close front-end shutter
if not self.epics_pvs['CloseShutter'] is None:
pv = self.epics_pvs['CloseShutter']
value = self.epics_pvs['CloseShutterValue'].get(as_string=True)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
log.info('close shutter: %s, value: %s', pv, value)
self.epics_pvs['CloseShutter'].put(value, wait=True)
self.wait_pv(self.epics_pvs['ShutterStatus'], 0)
status = self.epics_pvs['ShutterStatus'].get(as_string=True)
log.info('shutter status: %s', status)
[docs] def close_shutter(self):
"""Closes the shutters to collect dark fields.
This does the following:
- Closes the 32-ID-C fast shutter.
"""
# Close 32-ID-C fast shutter
if not self.epics_pvs['CloseFastShutter'] is None:
pv = self.epics_pvs['CloseFastShutter']
value = self.epics_pvs['CloseFastShutterValue'].get(as_string=True)
log.info('close fast shutter: %s, value: %s', pv, value)
self.epics_pvs['CloseFastShutter'].put(value, wait=True)
[docs] def fly_scan(self):
"""Control of Sample X position
"""
super().fly_scan()
[docs] def set_trigger_mode(self, trigger_mode, num_images):
"""Sets the trigger mode SIS3820 and the camera.
Parameters
----------
trigger_mode : str
Choices are: "FreeRun", "Internal", or "PSOExternal"
num_images : int
Number of images to collect. Ignored if trigger_mode="FreeRun".
This is used to set the ``NumImages`` PV of the camera.
"""
camera_model = self.epics_pvs['CamModel'].get(as_string=True)
if(camera_model=='Grasshopper3 GS3-U3-51S5M'):
self.set_trigger_mode_grasshopper(trigger_mode, num_images)
elif(camera_model=='Blackfly S BFS-PGE-161S7M'):
self.set_trigger_mode_grasshopper(trigger_mode, num_images)
else:
log.error('Camera is not supported')
exit(1)
[docs] def set_trigger_mode_grasshopper(self, trigger_mode, num_images):
self.epics_pvs['CamAcquire'].put('Done') ###
self.wait_pv(self.epics_pvs['CamAcquire'], 0) ###
log.info('set trigger mode: %s', trigger_mode)
if trigger_mode == 'FreeRun':
self.epics_pvs['CamImageMode'].put('Continuous', wait=True)
self.epics_pvs['CamTriggerMode'].put('Off', wait=True)
self.wait_pv(self.epics_pvs['CamTriggerMode'], 0)
# self.epics_pvs['CamAcquire'].put('Acquire')
elif trigger_mode == 'Internal':
self.epics_pvs['CamTriggerMode'].put('Off', wait=True)
self.wait_pv(self.epics_pvs['CamTriggerMode'], 0)
self.epics_pvs['CamImageMode'].put('Multiple')
self.epics_pvs['CamNumImages'].put(num_images, wait=True)
else: # set camera to external triggering
# These are just in case the scan aborted with the camera in another state
self.epics_pvs['CamTriggerMode'].put('On', wait=True) # VN: For PG we need to switch to On to be able to switch to readout overlap mode
self.epics_pvs['CamTriggerSource'].put('Line2', wait=True)
self.epics_pvs['CamTriggerOverlap'].put('ReadOut', wait=True)
self.epics_pvs['CamExposureMode'].put('Timed', wait=True)
self.epics_pvs['CamImageMode'].put('Multiple')
self.epics_pvs['CamArrayCallbacks'].put('Enable')
self.epics_pvs['CamFrameRateEnable'].put(0)
self.epics_pvs['CamNumImages'].put(self.num_angles, wait=True)
self.epics_pvs['CamTriggerMode'].put('On', wait=True)
self.wait_pv(self.epics_pvs['CamTriggerMode'], 1)
[docs] def begin_scan(self):
"""Performs the operations needed at the very start of a scan.
This does the following:
- Set data directory.
- Set the TomoScan xml files
- Calls the base class method.
- Opens the front-end shutter.
- Sets the PSO controller.
- Creates theta array using list from PSO.
- Turns on data capture.
"""
log.info('begin scan')
self.epics_pvs['SampleXSet'].put(1, wait=True)
self.epics_pvs['SampleYSet'].put(1, wait=True)
self.epics_pvs['SampleX'].put(0, wait=True)
self.epics_pvs['SampleY'].put(0, wait=True)
self.epics_pvs['SampleXSet'].put(0, wait=True)
self.epics_pvs['SampleYSet'].put(0, wait=True)
# if(abs(self.epics_pvs['SampleInX'].value-self.epics_pvs['SampleX'].value)>1e-4) or abs(self.epics_pvs['SampleInY'].value-self.epics_pvs['SampleY'].value)>1e-4:
# log.error('SampleInX/SampleInZ is not the same as current SampleTopX/SampleTopZ')
# self.epics_pvs['ScanStatus'].put('Sample position error')
# self.epics_pvs['StartScan'].put(0)
# return
# Set data directory
file_path = self.epics_pvs['DetectorTopDir'].get(as_string=True) + self.epics_pvs['ExperimentYearMonth'].get(as_string=True) + os.path.sep + self.epics_pvs['UserLastName'].get(as_string=True) + os.path.sep
self.epics_pvs['FilePath'].put(file_path, wait=True)
# Call the base class method
super().begin_scan()
# Opens the front-end shutter
self.open_frontend_shutter()
[docs] def energy_change(self):
"""Change energy trhough TXMOptics"""
energy = self.epics_pvs['Energy'].get()
self.epics_pvs['TXMEnergy'].put(energy,wait=True)
self.epics_pvs['TXMEnergySet'].put(1,wait=True)
time.sleep(1)
self.epics_pvs['EnergySet'].put(0)
[docs] def end_scan(self):
"""Performs the operations needed at the very end of a scan.
This does the following:
- Calls ``save_configuration()``.
- Put the camera back in "FreeRun" mode and acquiring so the user sees live images.
- Sets the speed of the rotation stage back to the maximum value.
- Calls ``move_sample_in()``.
- Calls the base class method.
- Closes shutter.
- Add theta to the raw data file.
- Copy raw data to data analysis computer.
"""
if self.return_rotation == 'Yes':
# Reset rotation position by mod 360 , the actual return
# to start position is handled by super().end_scan()
log.info('wait until the stage is stopped')
time.sleep(self.epics_pvs['RotationAccelTime'].get()*1.2)
ang = self.epics_pvs['RotationRBV'].get()
current_angle = np.sign(ang)*(np.abs(ang)%360)
self.epics_pvs['RotationSet'].put('Set', wait=True)
self.epics_pvs['Rotation'].put(current_angle, wait=True)
self.epics_pvs['RotationSet'].put('Use', wait=True)
# Call the base class method
super().end_scan()
# Stop the file plugin
self.epics_pvs['FPCapture'].put('Done')
self.wait_pv(self.epics_pvs['FPCaptureRBV'], 0)
# Add theta in the hdf file
self.add_theta()
if self.epics_pvs['CollectMicroCTdata'].get(as_string=True)=='Yes':
try:
log.warning('take microCT projection')
self.epics_pvs['TXMMoveAllOut'].put(1,wait=True)
self.epics_pvs['CamAcquire'].put(1)
log.info(f"write to {self.epics_pvs['FPFullFileName'].get(as_string=True)}")
time.sleep(0.5)
with h5py.File(self.epics_pvs['FPFullFileName'].get(as_string=True),'r+') as fid:
pva_image_data = self.epics_pvs['PvaPImage'].get('')
width = pva_image_data['dimension'][0]['size']
height = pva_image_data['dimension'][1]['size']
datatype_list = self.epics_pvs['PvaPDataType_RBV'].get()['value']
type_dict = {
'uint8': 'ubyteValue',
'float32': 'floatValue',
'uint16' : 'ushortValue'
}
datatype = type_dict[datatype_list['choices'][datatype_list['index']].lower()]
data = pva_image_data['value'][0][datatype]
fid.create_dataset("exchange/data2", data = data.reshape([height,width]))
self.epics_pvs['CamAcquire'].put(0)
except:
log.warning('microCT projection was not taken')
#self.epics_pvs['TXMMoveAllIn'].put(1,wait=True)
# Close shutter
self.close_shutter()
# Copy raw data to data analysis computer
log.info('Automatic data trasfer to data analysis computer is enabled.')
full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True)
remote_analysis_dir = self.epics_pvs['RemoteAnalysisDir'].get(as_string=True)
copy_to_analysis_dir = self.epics_pvs['CopyToAnalysisDir'].get()
if copy_to_analysis_dir == 1:
log.info('Using FDT')
dm.fdt_scp(full_file_name, remote_analysis_dir, Path(self.epics_pvs['DetectorTopDir'].get()))
self.epics_pvs['ScanStatus'].put('fdt file transfer complete')
elif copy_to_analysis_dir == 2:
log.info('Using scp')
dm.scp(full_file_name, remote_analysis_dir)
self.epics_pvs['ScanStatus'].put('scp file transfer complete')
else:
log.warning('Automatic data trasfer to data analysis computer is disabled.')
[docs] def add_theta(self):
"""Add theta at the end of a scan.
"""
log.info('add theta')
full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True)
if os.path.exists(full_file_name):
try:
with h5py.File(full_file_name, "a") as f:
if self.theta is not None:
unique_ids = f['/defaults/NDArrayUniqueId']
hdf_location = f['/defaults/HDF5FrameLocation']
total_dark_fields = self.num_dark_fields * ((self.dark_field_mode in ('Start', 'Both')) + (self.dark_field_mode in ('End', 'Both')))
total_flat_fields = self.num_flat_fields * ((self.flat_field_mode in ('Start', 'Both')) + (self.flat_field_mode in ('End', 'Both')))
proj_ids = unique_ids[hdf_location[:] == b'/exchange/data']
flat_ids = unique_ids[hdf_location[:] == b'/exchange/data_white']
dark_ids = unique_ids[hdf_location[:] == b'/exchange/data_dark']
# create theta dataset in hdf5 file
if len(proj_ids) > 0:
theta_ds = f.create_dataset('/exchange/theta', (len(proj_ids),))
theta_ds[:] = self.theta[proj_ids - proj_ids[0]]
# warnings that data is missing
if len(proj_ids) != len(self.theta):
log.warning(f'There are {len(self.theta) - len(proj_ids)} missing data frames')
missed_ids = [ele for ele in range(len(self.theta)) if ele not in proj_ids-proj_ids[0]]
missed_theta = self.theta[missed_ids]
# log.warning(f'Missed ids: {list(missed_ids)}')
log.warning(f'Missed theta: {list(missed_theta)}')
if len(flat_ids) != total_flat_fields:
log.warning(f'There are {total_flat_fields - len(flat_ids)} missing flat field frames')
if (len(dark_ids) != total_dark_fields):
log.warning(f'There are {total_dark_fields - len(dark_ids)} missing dark field frames')
except:
log.error('Add theta: Failed accessing: %s', full_file_name)
traceback.print_exc(file=sys.stdout)
else:
log.error('Failed adding theta. %s file does not exist', full_file_name)
[docs] def wait_pv(self, epics_pv, wait_val, timeout=-1):
"""Wait on a pv to be a value until max_timeout (default forever)
delay for pv to change
"""
time.sleep(.01)
start_time = time.time()
while True:
pv_val = epics_pv.get()
if isinstance(pv_val, float):
if abs(pv_val - wait_val) < EPSILON:
return True
if pv_val != wait_val:
if timeout > -1:
current_time = time.time()
diff_time = current_time - start_time
if diff_time >= timeout:
log.error(' *** ERROR: DROPPED IMAGES ***')
log.error(' *** wait_pv(%s, %d, %5.2f reached max timeout. Return False',
epics_pv.pvname, wait_val, timeout)
return False
time.sleep(.01)
else:
return True
[docs] def wait_frontend_shutter_open(self, timeout=-1):
"""Waits for the front end shutter to open, or for ``abort_scan()`` to be called.
While waiting this method periodically tries to open the shutter..
Parameters
----------
timeout : float
The maximum number of seconds to wait before raising a ShutterTimeoutError exception.
Raises
------
ScanAbortError
If ``abort_scan()`` is called
ShutterTimeoutError
If the open shutter has not completed within timeout value.
"""
start_time = time.time()
pv = self.epics_pvs['OpenShutter']
value = self.epics_pvs['OpenShutterValue'].get(as_string = True)
log.info('open shutter: %s, value: %s', pv, value)
elapsed_time = 0
while True:
if self.epics_pvs['ShutterStatus'].get() == int(value):
log.warning("Shutter is open in %f s", elapsed_time)
return
if not self.scan_is_running:
exit()
value = self.epics_pvs['OpenShutterValue'].get()
time.sleep(1.0)
current_time = time.time()
elapsed_time = current_time - start_time
log.warning("Waiting on shutter to open: %f s", elapsed_time)
self.epics_pvs['OpenShutter'].put(value, wait=True)
if timeout > 0:
if elapsed_time >= timeout:
exit()