Source code for tomoscan.tomoscan

"""Software for tomography scanning with EPICS

   Classes
   -------
   TomoScan
     Base class for tomography scanning with EPICS.
"""

import json
import time
import threading
import signal
import sys
import os
from datetime import timedelta
import pymsgbox
from epics import PV
from tomoscan import log

[docs]class ScanAbortError(Exception): '''Exception raised when user wants to abort a scan. '''
[docs]class CameraTimeoutError(Exception): '''Exception raised when the camera times out during a scan. '''
[docs]class FileOverwriteError(Exception): '''Exception raised when a file would be overwritten. '''
[docs]class TomoScan(): """ Base class used for tomography scanning with EPICS Parameters ---------- pv_files : list of str List of files containing EPICS pvNames to be used. macros : dict Dictionary of macro definitions to be substituted when reading the pv_files """ def __init__(self, pv_files, macros): self.scan_is_running = False self.config_pvs = {} self.control_pvs = {} self.pv_prefixes = {} # These variables are set in begin_scan(). # They are used to prevent reading PVs repeatedly, and so that if the users changes # a PV during the scan it won't mess things up. self.exposure_time = None self.rotation_start = None self.rotation_step = None self.rotation_stop = None self.rotation_save = None self.rotation_resolution = None self.max_rotation_speed = None self.return_rotation = None self.num_angles = None self.num_dark_fields = None self.dark_field_mode = None self.num_flat_fields = None self.flat_field_mode = None self.total_images = None self.file_path_rbv = None self.file_name_rbv = None self.file_number = None self.file_template = None if not isinstance(pv_files, list): pv_files = [pv_files] for pv_file in pv_files: self.read_pv_file(pv_file, macros) if 'Rotation' not in self.control_pvs: log.error('RotationPVName must be present in autoSettingsFile') sys.exit() if 'Camera' not in self.pv_prefixes: log.error('CameraPVPrefix must be present in autoSettingsFile') sys.exit() if 'FilePlugin' not in self.pv_prefixes: log.error('FilePluginPVPrefix must be present in autoSettingsFile') sys.exit() #Define PVs we will need from the rotation motor, which is on another IOC rotation_pv_name = self.control_pvs['Rotation'].pvname self.control_pvs['RotationSpeed'] = PV(rotation_pv_name + '.VELO') self.control_pvs['RotationMaxSpeed'] = PV(rotation_pv_name + '.VMAX') self.control_pvs['RotationResolution'] = PV(rotation_pv_name + '.MRES') self.control_pvs['RotationEResolution'] = PV(rotation_pv_name + '.ERES') self.control_pvs['RotationSet'] = PV(rotation_pv_name + '.SET') self.control_pvs['RotationStop'] = PV(rotation_pv_name + '.STOP') self.control_pvs['RotationDmov'] = PV(rotation_pv_name + '.DMOV') self.control_pvs['RotationDirection'] = PV(rotation_pv_name + '.DIR') self.control_pvs['RotationAccelTime'] = PV(rotation_pv_name + '.ACCL') self.control_pvs['RotationRBV'] = PV(rotation_pv_name + '.RBV') self.control_pvs['RotationJog'] = PV(rotation_pv_name + '.JOGF') self.control_pvs['RotationHomR'] = PV(rotation_pv_name + '.HOMR') self.control_pvs['RotationHomF'] = PV(rotation_pv_name + '.HOMF') self.control_pvs['RotationSpeedJog'] = PV(rotation_pv_name + '.JVEL') self.control_pvs['RotationAccelJog'] = PV(rotation_pv_name + '.JAR') self.control_pvs['RotationOFF'] = PV(rotation_pv_name + '.OFF') #Define PVs from the camera IOC that we will need prefix = self.pv_prefixes['Camera'] camera_prefix = prefix + 'cam1:' self.control_pvs['CamManufacturer'] = PV(camera_prefix + 'Manufacturer_RBV') self.control_pvs['CamModel'] = PV(camera_prefix + 'Model_RBV') self.control_pvs['CamAcquire'] = PV(camera_prefix + 'Acquire') self.control_pvs['CamAcquireBusy'] = PV(camera_prefix + 'AcquireBusy') self.control_pvs['CamImageMode'] = PV(camera_prefix + 'ImageMode') self.control_pvs['CamTriggerMode'] = PV(camera_prefix + 'TriggerMode') self.control_pvs['CamNumImages'] = PV(camera_prefix + 'NumImages') self.control_pvs['CamNumImagesCounter'] = PV(camera_prefix + 'NumImagesCounter_RBV') self.control_pvs['CamAcquireTime'] = PV(camera_prefix + 'AcquireTime') self.control_pvs['CamAcquireTimeRBV'] = PV(camera_prefix + 'AcquireTime_RBV') self.control_pvs['CamBinX'] = PV(camera_prefix + 'BinX') self.control_pvs['CamBinY'] = PV(camera_prefix + 'BinY') self.control_pvs['CamWaitForPlugins'] = PV(camera_prefix + 'WaitForPlugins') self.control_pvs['PortNameRBV'] = PV(camera_prefix + 'PortName_RBV') self.control_pvs['CamNDAttributesFile'] = PV(camera_prefix + 'NDAttributesFile') self.control_pvs['CamNDAttributesMacros'] = PV(camera_prefix + 'NDAttributesMacros') self.control_pvs['CamArrayCounterRBV'] = PV(camera_prefix + 'ArrayCounter_RBV') self.control_pvs['CamUniqueIdMode'] = PV(camera_prefix + 'UniqueIdMode') # If this is a Point Grey camera then assume we are running ADSpinnaker # and create some PVs specific to that driver manufacturer = self.control_pvs['CamManufacturer'].get(as_string=True) model = self.control_pvs['CamModel'].get(as_string=True) if (manufacturer.find('Point Grey') != -1) or (manufacturer.find('FLIR') != -1): self.control_pvs['CamExposureMode'] = PV(camera_prefix + 'ExposureMode') self.control_pvs['CamTriggerOverlap'] = PV(camera_prefix + 'TriggerOverlap') self.control_pvs['CamPixelFormat'] = PV(camera_prefix + 'PixelFormat') self.control_pvs['CamArrayCallbacks'] = PV(camera_prefix + 'ArrayCallbacks') self.control_pvs['CamFrameRateEnable'] = PV(camera_prefix + 'FrameRateEnable') self.control_pvs['CamTriggerSource'] = PV(camera_prefix + 'TriggerSource') self.control_pvs['CamTriggerSoftware'] = PV(camera_prefix + 'TriggerSoftware') if model.find('Grasshopper3 GS3-U3-23S6M') != -1: self.control_pvs['CamVideoMode'] = PV(camera_prefix + 'GC_VideoMode_RBV') if model.find('Blackfly S BFS-PGE-161S7M') != -1: self.control_pvs['GC_ExposureAuto'] = PV(camera_prefix + 'GC_ExposureAuto') if (manufacturer.find('Adimec') != -1): self.control_pvs['CamExposureMode'] = PV(camera_prefix + 'ExposureMode') self.control_pvs['CamAcquisitionFrameRate'] = PV(camera_prefix + 'AcquisitionFrameRate') self.control_pvs['CamAcquisitionFramePeriod'] = PV(camera_prefix + 'AcquisitionFramePeriod') self.control_pvs['CamExposureTime+R'] = PV(camera_prefix + 'ExposureTime+R') # Set some initial PV values self.control_pvs['CamWaitForPlugins'].put('Yes') self.control_pvs['StartScan'].put(0) prefix = self.pv_prefixes['FilePlugin'] self.control_pvs['FPNDArrayPort'] = PV(prefix + 'NDArrayPort') self.control_pvs['FPFileWriteMode'] = PV(prefix + 'FileWriteMode') self.control_pvs['FPNumCapture'] = PV(prefix + 'NumCapture') self.control_pvs['FPNumCaptured'] = PV(prefix + 'NumCaptured_RBV') self.control_pvs['FPCapture'] = PV(prefix + 'Capture') self.control_pvs['FPCaptureRBV'] = PV(prefix + 'Capture_RBV') self.control_pvs['FPFilePath'] = PV(prefix + 'FilePath') self.control_pvs['FPFilePathRBV'] = PV(prefix + 'FilePath_RBV') self.control_pvs['FPFilePathExists'] = PV(prefix + 'FilePathExists_RBV') self.control_pvs['FPFileName'] = PV(prefix + 'FileName') self.control_pvs['FPFileNameRBV'] = PV(prefix + 'FileName_RBV') self.control_pvs['FPFileNumber'] = PV(prefix + 'FileNumber') self.control_pvs['FPAutoIncrement'] = PV(prefix + 'AutoIncrement') self.control_pvs['FPFileTemplate'] = PV(prefix + 'FileTemplate') self.control_pvs['FPFullFileName'] = PV(prefix + 'FullFileName_RBV') self.control_pvs['FPAutoSave'] = PV(prefix + 'AutoSave') self.control_pvs['FPEnableCallbacks'] = PV(prefix + 'EnableCallbacks') self.control_pvs['FPXMLFileName'] = PV(prefix + 'XMLFileName') self.control_pvs['FPWriteStatus'] = PV(prefix + 'WriteStatus') # Set some initial PV values file_path = self.config_pvs['FilePath'].get(as_string=True) self.control_pvs['FPFilePath'].put(file_path) file_name = self.config_pvs['FileName'].get(as_string=True) self.control_pvs['FPFileName'].put(file_name) self.control_pvs['FPAutoSave'].put('No') self.control_pvs['FPFileWriteMode'].put('Stream') self.control_pvs['FPEnableCallbacks'].put('Enable') #Define PVs from the MCS or PSO that live on another IOC if 'MCS' in self.pv_prefixes: prefix = self.pv_prefixes['MCS'] self.control_pvs['MCSEraseStart'] = PV(prefix + 'EraseStart') self.control_pvs['MCSStopAll'] = PV(prefix + 'StopAll') self.control_pvs['MCSPrescale'] = PV(prefix + 'Prescale') self.control_pvs['MCSDwell'] = PV(prefix + 'Dwell') self.control_pvs['MCSLNEOutputWidth'] = PV(prefix + 'LNEOutputWidth') self.control_pvs['MCSChannelAdvance'] = PV(prefix + 'ChannelAdvance') self.control_pvs['MCSMaxChannels'] = PV(prefix + 'MaxChannels') self.control_pvs['MCSNuseAll'] = PV(prefix + 'NuseAll') if 'PvaPlugin' in self.pv_prefixes: prefix = self.pv_prefixes['PvaPlugin'] self.control_pvs['PVANDArrayPort'] = PV(prefix + 'NDArrayPort') self.control_pvs['PVAEnableCallbacks'] = PV(prefix + 'EnableCallbacks') if 'RoiPlugin' in self.pv_prefixes: prefix = self.pv_prefixes['RoiPlugin'] self.control_pvs['ROINDArrayPort'] = PV(prefix + 'NDArrayPort') self.control_pvs['ROIScale'] = PV(prefix + 'Scale') self.control_pvs['ROIBinX'] = PV(prefix + 'BinX') self.control_pvs['ROIBinY'] = PV(prefix + 'BinY') self.control_pvs['ROIEnableCallbacks'] = PV(prefix + 'EnableCallbacks') if 'CbPlugin' in self.pv_prefixes: prefix = self.pv_prefixes['CbPlugin'] self.control_pvs['CBPortNameRBV'] = PV(prefix + 'PortName_RBV') self.control_pvs['CBNDArrayPort'] = PV(prefix + 'NDArrayPort') self.control_pvs['CBPreCount'] = PV(prefix + 'PreCount') self.control_pvs['CBPostCount'] = PV(prefix + 'PostCount') self.control_pvs['CBCapture'] = PV(prefix + 'Capture') self.control_pvs['CBCaptureRBV'] = PV(prefix + 'Capture_RBV') self.control_pvs['CBTrigger'] = PV(prefix + 'Trigger') self.control_pvs['CBTriggerRBV'] = PV(prefix + 'Trigger_RBV') self.control_pvs['CBCurrentQtyRBV'] = PV(prefix + 'CurrentQty_RBV') self.control_pvs['CBEnableCallbacks'] = PV(prefix + 'EnableCallbacks') self.control_pvs['CBStatusMessage'] = PV(prefix + 'StatusMessage') self.epics_pvs = {**self.config_pvs, **self.control_pvs} # Wait 1 second for all PVs to connect time.sleep(1) self.check_pvs_connected() # Configure callbacks on a few PVs for epics_pv in ('MoveSampleIn', 'MoveSampleOut', 'StartScan', 'AbortScan', 'ExposureTime', 'FilePath', 'FPFilePathExists', 'FPWriteStatus'): self.epics_pvs[epics_pv].add_callback(self.pv_callback) for epics_pv in ('MoveSampleIn', 'MoveSampleOut', 'StartScan', 'AbortScan'): self.epics_pvs[epics_pv].put(0) # Synchronize the FilePathExists PV self.copy_file_path_exists() # Set ^C interrupt to abort the scan signal.signal(signal.SIGINT, self.signal_handler) # Start the watchdog timer thread thread = threading.Thread(target=self.reset_watchdog, args=(), daemon=True) thread.start()
[docs] def signal_handler(self, sig, frame): """Calls abort_scan when ^C is typed""" if sig == signal.SIGINT: self.abort_scan()
[docs] def reset_watchdog(self): """Sets the watchdog timer to 5 every 3 seconds""" while True: self.epics_pvs['Watchdog'].put(5) time.sleep(3)
[docs] def copy_file_path(self): """Copies the FilePath PV to file plugin FilePath""" value = self.epics_pvs['FilePath'].get(as_string=True) self.epics_pvs['FPFilePath'].put(value, wait=True)
[docs] def copy_file_path_exists(self): """Copies the file plugin FilePathExists_RBV PV to FilePathExists""" value = self.epics_pvs['FPFilePathExists'].value self.epics_pvs['FilePathExists'].put(value)
[docs] def pv_callback(self, pvname=None, value=None, char_value=None, **kw): """Callback function that is called by pyEpics when certain EPICS PVs are changed The PVs that are handled are: - ``StartScan`` : Calls ``run_fly_scan()`` - ``AbortScan`` : Calls ``abort_scan()`` - ``MoveSampleIn`` : Runs ``MoveSampleIn()`` in a new thread. - ``MoveSampleOut`` : Runs ``MoveSampleOut()`` in a new thread. - ``ExposureTime`` : Runs ``set_exposure_time()`` in a new thread. - ``FilePath`` : Runs ``copy_file_path`` in a new thread. - ``FPFilePathExists`` : Runs ``copy_file_path_exists`` in a new thread. - ``FPWriteStatus``: Runs ``abort_scan()`` """ log.debug('pv_callback pvName=%s, value=%s, char_value=%s', pvname, value, char_value) if (pvname.find('MoveSampleIn') != -1) and (value == 1): thread = threading.Thread(target=self.move_sample_in, args=()) thread.start() elif (pvname.find('MoveSampleOut') != -1) and (value == 1): thread = threading.Thread(target=self.move_sample_out, args=()) thread.start() elif pvname.find('ExposureTime') != -1: thread = threading.Thread(target=self.set_exposure_time, args=(value,)) thread.start() elif pvname.find('FilePathExists') != -1: thread = threading.Thread(target=self.copy_file_path_exists, args=()) thread.start() elif pvname.find('FilePath') != -1: thread = threading.Thread(target=self.copy_file_path, args=()) thread.start() elif (pvname.find('StartScan') != -1) and (value == 1): self.run_fly_scan() elif (pvname.find('AbortScan') != -1) and (value == 1): self.abort_scan() elif (pvname.find('WriteStatus') != -1) and (value == 1): self.abort_scan()
[docs] def show_pvs(self): """Prints the current values of all EPICS PVs in use. The values are printed in three sections: - config_pvs : The PVs that are part of the scan configuration and are saved by save_configuration() - control_pvs : The PVs that are used for EPICS control and status, but are not saved by save_configuration() - pv_prefixes : The prefixes for PVs that are used for the areaDetector camera, file plugin, etc. """ print('configPVS:') for config_pv in self.config_pvs: print(config_pv, ':', self.config_pvs[config_pv].get(as_string=True)) print('') print('controlPVS:') for control_pv in self.control_pvs: print(control_pv, ':', self.control_pvs[control_pv].get(as_string=True)) print('') print('pv_prefixes:') for pv_prefix in self.pv_prefixes: print(pv_prefix, ':', self.pv_prefixes[pv_prefix])
[docs] def check_pvs_connected(self): """Checks whether all EPICS PVs are connected. Returns ------- bool True if all PVs are connected, otherwise False. """ all_connected = True for key in self.epics_pvs: if not self.epics_pvs[key].connected: log.error('PV %s is not connected', self.epics_pvs[key].pvname) all_connected = False return all_connected
[docs] def read_pv_file(self, pv_file_name, macros): """Reads a file containing a list of EPICS PVs to be used by TomoScan. Parameters ---------- pv_file_name : str Name of the file to read macros: dict Dictionary of macro substitution to perform when reading the file """ pv_file = open(pv_file_name) lines = pv_file.read() pv_file.close() lines = lines.splitlines() for line in lines: is_config_pv = True if line.find('#controlPV') != -1: line = line.replace('#controlPV', '') is_config_pv = False line = line.lstrip() # Skip lines starting with # if line.startswith('#'): continue # Skip blank lines if line == '': continue pvname = line # Do macro substitution on the pvName for key in macros: pvname = pvname.replace(key, macros[key]) # Replace macros in dictionary key with nothing dictentry = line for key in macros: dictentry = dictentry.replace(key, '') epics_pv = PV(pvname) if is_config_pv: self.config_pvs[dictentry] = epics_pv else: self.control_pvs[dictentry] = epics_pv if dictentry.find('PVName') != -1: pvname = epics_pv.value key = dictentry.replace('PVName', '') self.control_pvs[key] = PV(pvname) if dictentry.find('PVPrefix') != -1: pvprefix = epics_pv.value key = dictentry.replace('PVPrefix', '') self.pv_prefixes[key] = pvprefix
[docs] def move_sample_in(self): """Moves the sample to the in beam position for collecting projections. The in-beam position is defined by the ``SampleInX`` and ``SampleInY`` PVs. Which axis to move is defined by the ``FlatFieldAxis`` PV, which can be ``X``, ``Y``, or ``Both``. """ axis = self.epics_pvs['FlatFieldAxis'].get(as_string=True) log.info('move_sample_in axis: %s', axis) if axis in ('X', 'Both'): position = self.epics_pvs['SampleInX'].value self.epics_pvs['SampleX'].put(position, wait=True, timeout=600) if axis in ('Y', 'Both'): position = self.epics_pvs['SampleInY'].value self.epics_pvs['SampleY'].put(position, wait=True, timeout=600) if 'SampleOutAngleEnable' in self.epics_pvs: if self.epics_pvs['SampleOutAngleEnable'].get() and self.rotation_save != None: if self.max_rotation_speed != None:# max_rotation_speed is not initialized when the scan has not been started cur_speed = self.epics_pvs['RotationSpeed'].get() self.epics_pvs['RotationSpeed'].put(self.max_rotation_speed) self.epics_pvs['Rotation'].put(self.rotation_save, wait=True) if self.max_rotation_speed != None: self.epics_pvs['RotationSpeed'].put(cur_speed) self.epics_pvs['MoveSampleIn'].put('Done')
[docs] def move_sample_out(self): """Moves the sample to the out of beam position for collecting flat fields. The out of beam position is defined by the ``SampleOutX`` and ``SampleOutY`` PVs. Which axis to move is defined by the ``FlatFieldAxis`` PV, which can be ``X``, ``Y``, or ``Both``. """ if 'SampleOutAngleEnable' in self.epics_pvs: if self.epics_pvs['SampleOutAngleEnable'].get(): if self.max_rotation_speed != None:# max_rotation_speed is not initialized when the scan has not been started cur_speed = self.epics_pvs['RotationSpeed'].get() self.epics_pvs['RotationSpeed'].put(self.max_rotation_speed) angle = self.epics_pvs['SampleOutAngle'].get() log.info('move_sample_out angle: %s', angle) self.rotation_save = self.epics_pvs['Rotation'].get() self.epics_pvs['Rotation'].put(angle, wait=True) if self.max_rotation_speed != None: self.epics_pvs['RotationSpeed'].put(cur_speed) axis = self.epics_pvs['FlatFieldAxis'].get(as_string=True) log.info('move_sample_out axis: %s', axis) if axis in ('X', 'Both'): position = self.epics_pvs['SampleOutX'].value self.epics_pvs['SampleX'].put(position, wait=True, timeout=600) if axis in ('Y', 'Both'): position = self.epics_pvs['SampleOutY'].value self.epics_pvs['SampleY'].put(position, wait=True, timeout=600) self.epics_pvs['MoveSampleOut'].put('Done')
[docs] def save_configuration(self, file_name): """Saves the current configuration PVs to a file. A new dictionary is created, containing the key for each PV in the ``config_pvs`` dictionary and the current value of that PV. This dictionary is written to the file in JSON format. Parameters ---------- file_name : str The name of the file to save to. """ config = {} for key in self.config_pvs: config[key] = self.config_pvs[key].get(as_string=True) try: out_file = open(file_name, 'w') json.dump(config, out_file, indent=2) out_file.close() except (PermissionError, FileNotFoundError) as error: self.epics_pvs['ScanStatus'].put('Error writing configuration')
[docs] def load_configuration(self, file_name): """Loads a configuration from a file into the EPICS PVs. Parameters ---------- file_name : str The name of the file to save to. """ in_file = open(file_name, 'r') config = json.load(in_file) in_file.close() for key in config: self.config_pvs[key].put(config[key])
[docs] def open_shutter(self): """Opens the shutter to collect flat fields or projections. The value in the ``OpenShutterValue`` PV is written to the ``OpenShutter`` PV. """ if not self.epics_pvs['OpenShutter'] is None: pv = self.epics_pvs['OpenShutter'] value = self.epics_pvs['OpenShutterValue'].get(as_string=True) log.info('open shutter: %s, value: %s', pv, value) self.epics_pvs['OpenShutter'].put(value, wait=True)
[docs] def close_shutter(self): """Closes the shutter to collect dark fields. The value in the ``CloseShutterValue`` PV is written to the ``CloseShutter`` PV. """ if not self.epics_pvs['CloseShutter'] is None: pv = self.epics_pvs['CloseShutter'] value = self.epics_pvs['CloseShutterValue'].get(as_string=True) log.info('close shutter: %s, value: %s', pv, value) self.epics_pvs['CloseShutter'].put(value, wait=True)
[docs] def set_exposure_time(self, exposure_time=None): """Sets the camera exposure time. The exposure_time is written to the camera's ``AcquireTime`` PV. Parameters ---------- exposure_time : float, optional The exposure time to use. If None then the value of the ``ExposureTime`` PV is used. """ if not self.scan_is_running: if exposure_time is None: exposure_time = self.epics_pvs['ExposureTime'].value self.epics_pvs['CamAcquireTime'].put(exposure_time, wait=True, timeout = 10.0)
[docs] def set_scan_exposure_time(self, exposure_time=None): """Sets the camera exposure time during the scan. The exposure_time is written to the camera's ``AcquireTime`` PV. Parameters ---------- exposure_time : float, optional The exposure time to use. If None then the value of the ``ExposureTime`` PV is used. """ if exposure_time is None: exposure_time = self.epics_pvs['ExposureTime'].value log.warning('Setting exposure time: %f s', exposure_time) self.epics_pvs['CamAcquireTime'].put(exposure_time, wait=True, timeout = 10.)
[docs] def begin_scan(self): """Performs the operations needed at the very start of a scan. This base class method does the following: - Sets the status string in the ``ScanStatus`` PV. - Stops the camera acquisition. - Calls ``set_exposure_time()`` - Copies the ``FilePath`` and ``FileName`` PVs to the areaDetector file plugin. - Sets class variables with the important scan parameters - Checks whether the file that will be saved by the file plugin already exists. If it does, and if the OverwriteWarning PV is 'Yes' then it opens a dialog box asking the user if they want to overwrite the file. If they answer 'No' then a FileOverwriteError exception is raised. It is expected that most derived classes will override this method. In most cases they should first call this base class method, and then perform any beamline-specific operations. """ self.scan_is_running = True self.epics_pvs['ScanStatus'].put('Beginning scan') # Stop the camera since it could be in free-run mode self.epics_pvs['CamAcquire'].put(0, wait=True) # Set the exposure time self.set_scan_exposure_time() # Set the file path, file name and file number self.epics_pvs['FPFilePath'].put(self.epics_pvs['FilePath'].value, wait=True) self.epics_pvs['FPFileName'].put(self.epics_pvs['FileName'].value, wait=True) # Copy the current values of scan parameters into class variables self.exposure_time = self.epics_pvs['ExposureTime'].value self.rotation_start = self.epics_pvs['RotationStart'].value self.rotation_step = self.epics_pvs['RotationStep'].value self.num_angles = self.epics_pvs['NumAngles'].value self.rotation_stop = self.rotation_start + (self.num_angles * self.rotation_step) self.rotation_resolution = self.epics_pvs['RotationResolution'].value self.max_rotation_speed = self.epics_pvs['RotationMaxSpeed'].value self.return_rotation = self.epics_pvs['ReturnRotation'].get(as_string=True) self.num_dark_fields = self.epics_pvs['NumDarkFields'].value self.dark_field_mode = self.epics_pvs['DarkFieldMode'].get(as_string=True) self.num_flat_fields = self.epics_pvs['NumFlatFields'].value self.flat_field_mode = self.epics_pvs['FlatFieldMode'].get(as_string=True) self.file_path_rbv = self.epics_pvs['FPFilePathRBV'].get(as_string=True) self.file_name_rbv = self.epics_pvs['FPFileNameRBV'].get(as_string=True) self.file_number = self.epics_pvs['FPFileNumber'].value self.file_template = self.epics_pvs['FPFileTemplate'].get(as_string=True) self.total_images = self.num_angles if self.dark_field_mode != 'None': self.total_images += self.num_dark_fields if self.dark_field_mode == 'Both': self.total_images += self.num_dark_fields if self.flat_field_mode != 'None': self.total_images += self.num_flat_fields if self.flat_field_mode == 'Both': self.total_images += self.num_flat_fields if self.epics_pvs['OverwriteWarning'].get(as_string=True) == 'Yes': # Make sure there is not already a file by this name try: file_name = self.file_template % (self.file_path_rbv, self.file_name_rbv, self.file_number) except: try: file_name = self.file_template % (self.file_path_rbv, self.file_name_rbv) except: try: file_name = self.file_template % (self.file_path_rbv) except: log.error("File name template: %s not supported", self.file_template) raise TypeError if os.path.exists(file_name): self.epics_pvs['ScanStatus'].put('Waiting for overwrite confirmation') reply = pymsgbox.confirm('File ' + file_name + ' exists. Overwrite?', 'Overwrite file', ['Yes', 'No']) if reply == 'No': raise FileOverwriteError
[docs] def end_scan(self): """Performs the operations needed at the very end of a scan. This base class method does the following: - Sets the status string in the ``ScanStatus`` PV. - If the ``ReturnRotation`` PV is Yes then it moves the rotation motor back to the position defined by the ``RotationStart`` PV. It does not wait for the move to complete. - Sets the ``StartScan`` PV to 0. This PV is an EPICS ``busy`` record. Normally EPICS clients that start a scan with the ``StartScan`` PV will wait for ``StartScan`` to return to 0, often using the ``ca_put_callback()`` mechanism. It is expected that most derived classes will override this method. In most cases they should first perform any beamline-specific operations and then call this base class method. This ensures that the scan is really complete before ``StartScan`` is set to 0. """ if self.return_rotation == 'Yes': self.epics_pvs['Rotation'].put(self.rotation_start) elif self.return_rotation == "Home": self.epics_pvs['RotationHomF'].put(1) log.info('Scan complete') self.epics_pvs['ScanStatus'].put('Scan complete') self.epics_pvs['StartScan'].put(0) self.scan_is_running = False full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True) self.epics_pvs['FullFileName'].put(full_file_name)
[docs] def fly_scan(self): """Performs the operations for a tomography fly scan, i.e. with continuous rotation. This base class method does the following: - Moves the rotation motor to position defined by the ``RotationStart`` PV. - Calls ``begin_scan()`` - If the ``DarkFieldMode`` PV is 'Start' or 'Both' calls ``collect_dark_fields()`` - If the ``FlatFieldMode`` PV is 'Start' or 'Both' calls ``collect_flat_fields()`` - Calls ``collect_projections()`` - If the ``FlatFieldMode`` PV is 'End' or 'Both' calls ``collect_flat_fields()`` - If the ``DarkFieldMode`` PV is 'End' or 'Both' calls ``collect_dark_fields()`` - Calls ``end_scan`` If there is either CameraTimeoutError exception or ScanAbortError exception during the scan, it jumps immediate to calling ``end_scan()`` and returns. It is not expected that most derived classes will need to override this method, but they are free to do so if required. """ try: # Prepare for scan self.begin_scan() self.epics_pvs['ScanStatus'].put('Moving rotation axis to start') # Collect the pre-scan dark fields if required if (self.num_dark_fields > 0) and (self.dark_field_mode in ('Start', 'Both')): self.collect_dark_fields() # Collect the pre-scan flat fields if required if (self.num_flat_fields > 0) and (self.flat_field_mode in ('Start', 'Both')): self.collect_flat_fields() # Collect the projections self.collect_projections() # Collect the post-scan flat fields if required if (self.num_flat_fields > 0) and (self.flat_field_mode in ('End', 'Both')): self.collect_flat_fields() # Collect the post-scan dark fields if required if (self.num_dark_fields > 0) and (self.dark_field_mode in ('End', 'Both')): self.collect_dark_fields() except ScanAbortError: log.error('Scan aborted') except CameraTimeoutError: log.error('Camera timeout') except FileOverwriteError: log.error('File overwrite aborted') #Make sure we do cleanup tasks from the end of the scan finally: self.end_scan()
[docs] def run_fly_scan(self): """Runs ``fly_scan()`` in a new thread.""" thread = threading.Thread(target=self.fly_scan, args=()) thread.start()
[docs] def collect_dark_fields(self): """Collects dark field data This base class method does the following: - Sets the scan status message - Calls close_shutter() - Sets the HDF5 data location for dark fields - Sets the FrameType to "DarkField" Derived classes must override this method to actually collect the dark fields. In most cases they should call this base class method first and then perform the beamline-specific operations. """ self.epics_pvs['ScanStatus'].put('Collecting dark fields') self.set_scan_exposure_time() self.close_shutter() self.epics_pvs['HDF5Location'].put(self.epics_pvs['HDF5DarkLocation'].value) self.epics_pvs['FrameType'].put('DarkField')
[docs] def collect_flat_fields(self): """Collects flat field data This base class method does the following: - Sets the scan status message - Calls open_shutter() - Calls move_sample_out() - Sets the HDF5 data location for flat fields - Sets the FrameType to "FlatField" Derived classes must override this method to actually collect the flat fields. In most cases they should call this base class method first and then perform the beamline-specific operations. """ self.epics_pvs['ScanStatus'].put('Collecting flat fields') if self.epics_pvs['DifferentFlatExposure'].get(as_string=True) == 'Different': self.set_scan_exposure_time(self.epics_pvs['FlatExposureTime'].value) else: self.set_scan_exposure_time() self.open_shutter() self.move_sample_out() self.epics_pvs['HDF5Location'].put(self.epics_pvs['HDF5FlatLocation'].value) self.epics_pvs['FrameType'].put('FlatField')
[docs] def collect_projections(self): """Collects projection data This base class method does the following: - Sets the scan status message - Calls open_shutter() - Calls move_sample_in() - Sets the HDF5 data location for projection data - Sets the FrameType to "Projection" Derived classes must override this method to actually collect the projections. In most cases they should call this base class method first and then perform the beamline-specific operations. """ self.epics_pvs['ScanStatus'].put('Collecting projections') self.set_scan_exposure_time() self.open_shutter() self.move_sample_in() self.epics_pvs['HDF5Location'].put(self.epics_pvs['HDF5ProjectionLocation'].value) self.epics_pvs['FrameType'].put('Projection')
[docs] def abort_scan(self): """Aborts a scan that is running and performs the operations needed when a scan is aborted. This does the following: - Sets scan_is_running, a flag that is checked in ``wait_camera_done()``. If ``wait_camera_done()`` finds the flag set then it raises a ScanAbortError exception. - Stops the rotation motor. - Stops the file saving plugin. """ self.scan_is_running = False # Stop the rotation motor self.epics_pvs['RotationStop'].put(1) # Stop the file plugin self.epics_pvs['FPCapture'].put(0) # see https://github.com/tomography/tomoscan/issues/127
[docs] def compute_frame_time(self): """Computes the time to collect and readout an image from the camera. This method is used to compute the time between triggers to the camera. This can be used, for example, to configure a trigger generator or to compute the speed of the rotation stage. The calculation is camera specific. The result can depend on the actual exposure time of the camera, and on a variety of camera configuration settings (pixel binning, pixel bit depth, video mode, etc.) The current version only supports the Point Grey Grasshopper3 GS3-U3-23S6M. The logic for additional cameras should be added to this function in the future if the camera is expected to be used at more than one beamline. If the camera is only to be used at a single beamline then the code should be added to this method in the derived class Returns ------- float The frame time, which is the minimum time allowed between triggers for the value of the ``ExposureTime`` PV. """ # The readout time of the camera depends on the model, and things like the # PixelFormat, VideoMode, etc. # The measured times in ms with 100 microsecond exposure time and 1000 frames # without dropping camera_model = self.epics_pvs['CamModel'].get(as_string=True) readout = None video_mode = None # Adding 1% read out margin to the exposure time, and at least 1 ms seems to work for FLIR cameras # This is empirical and if needed should adjusted for each camera readout_margin = 1.01 if camera_model == 'Grasshopper3 GS3-U3-23S6M': pixel_format = self.epics_pvs['CamPixelFormat'].get(as_string=True) video_mode = self.epics_pvs['CamVideoMode'].get(as_string=True) readout_times = { 'Mono8': {'Mode0': 6.2, 'Mode1': 6.2, 'Mode5': 6.2, 'Mode7': 7.9}, 'Mono12Packed': {'Mode0': 9.2, 'Mode1': 6.2, 'Mode5': 6.2, 'Mode7': 11.5}, 'Mono16': {'Mode0': 12.2, 'Mode1': 6.2, 'Mode5': 6.2, 'Mode7': 12.2} } readout = readout_times[pixel_format][video_mode]/1000. if camera_model == 'Grasshopper3 GS3-U3-51S5M': pixel_format = self.epics_pvs['CamPixelFormat'].get(as_string=True) readout_times = { 'Mono8': 6.18, 'Mono12Packed': 8.20, 'Mono12p': 8.20, 'Mono16': 12.34 } readout = readout_times[pixel_format]/1000. if camera_model == 'Oryx ORX-10G-51S5M': pixel_format = self.epics_pvs['CamPixelFormat'].get(as_string=True) readout_margin = 1.05 readout_times = { 'Mono8': 6.18, 'Mono12Packed': 8.20, 'Mono16': 12.34 } readout = readout_times[pixel_format]/1000. if camera_model == 'Oryx ORX-10G-310S9M': pixel_format = self.epics_pvs['CamPixelFormat'].get(as_string=True) readout_times = { 'Mono8': 30.0, 'Mono12Packed': 30.0, 'Mono16': 30.0 } readout_margin = 1.2 readout = readout_times[pixel_format]/1000. if camera_model == 'Q-12A180-Fm/CXP-6': pixel_format = self.epics_pvs['CamPixelFormat'].get(as_string=True) readout_times = { 'Mono8': 5.35 } readout = readout_times[pixel_format]/1000. if camera_model == 'Blackfly S BFS-PGE-161S7M': pixel_format = self.epics_pvs['CamPixelFormat'].get(as_string=True) readout_times = { 'Mono8': 83.4, 'Mono12Packed': 100.0, 'Mono16': 142.86 } readout_margin = 1.035 readout = readout_times[pixel_format]/1000. if readout is None: log.error('Unsupported combination of camera model, pixel format and video mode: %s %s %s', camera_model, pixel_format, video_mode) return 0 # We need to use the actual exposure time that the camera is using, not the requested time exposure = self.epics_pvs['CamAcquireTimeRBV'].value # Add some extra time to exposure time for margin. frame_time = exposure * readout_margin # If the time is less than the readout time then use the readout time plus 1 ms. if frame_time < readout: frame_time = readout + .001 self.readout_margin = readout_margin return frame_time
[docs] def update_status(self, start_time): """ When called updates ``ImagesCollected``, ``ImagesSaved``, ``ElapsedTime``, and ``RemainingTime``. Parameters ---------- start_time : time Start time to calculate elapsed time. Returns ------- elapsed_time : float Elapsed time to be used for time out. """ num_collected = self.epics_pvs['CamNumImagesCounter'].value num_images = self.epics_pvs['CamNumImages'].value num_saved = self.epics_pvs['FPNumCaptured'].value num_to_save = self.epics_pvs['FPNumCapture'].value current_time = time.time() elapsed_time = current_time - start_time remaining_time = (elapsed_time * (num_images - num_collected) / max(float(num_collected), 1)) collect_progress = str(num_collected) + '/' + str(num_images) log.info('Collected %s', collect_progress) self.epics_pvs['ImagesCollected'].put(collect_progress) save_progress = str(num_saved) + '/' + str(num_to_save) log.info('Saved %s', save_progress) self.epics_pvs['ImagesSaved'].put(save_progress) self.epics_pvs['ElapsedTime'].put(str(timedelta(seconds=int(elapsed_time)))) self.epics_pvs['RemainingTime'].put(str(timedelta(seconds=int(remaining_time)))) return elapsed_time
[docs] def wait_camera_done(self, timeout): """Waits for the camera acquisition to complete, or for ``abort_scan()`` to be called. While waiting this method periodically updates the status PVs ``ImagesCollected``, ``ImagesSaved``, ``ElapsedTime``, and ``RemainingTime``. Parameters ---------- timeout : float The maximum number of seconds to wait before raising a CameraTimeoutError exception. Raises ------ ScanAbortError If ``abort_scan()`` is called CameraTimeoutError If acquisition has not completed within timeout value. """ start_time = time.time() while True: if self.epics_pvs['CamAcquireBusy'].value == 0: return if not self.scan_is_running: raise ScanAbortError time.sleep(0.2) elapsed_time = self.update_status(start_time) if timeout > 0: if elapsed_time >= timeout: raise CameraTimeoutError()