Source code for secop_ophyd.SECoPDevices

import asyncio
import logging
import re
import time as ttime
import warnings
from abc import abstractmethod
from logging import Logger
from types import MethodType
from typing import Any, Dict, Iterator, Optional, Type

import bluesky.plan_stubs as bps
from bluesky.protocols import (
    Flyable,
    Locatable,
    Location,
    PartialEvent,
    Reading,
    Stoppable,
    Subscribable,
    Triggerable,
)
from frappy.datatypes import (
    ArrayOf,
    BLOBType,
    BoolType,
    CommandType,
    FloatRange,
    IntRange,
    ScaledInteger,
    StringType,
    StructOf,
    TupleOf,
)
from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    AsyncStatus,
    Device,
    DeviceConnector,
    DeviceFiller,
    LazyMock,
    Signal,
    SignalR,
    SignalRW,
    SignalX,
    StandardReadable,
    StandardReadableFormat,
    observe_value,
)
from ophyd_async.core._utils import Callback

from secop_ophyd.AsyncFrappyClient import AsyncFrappyClient
from secop_ophyd.logs import setup_logging
from secop_ophyd.propertykeys import DATAINFO, EQUIPMENT_ID, INTERFACE_CLASSES
from secop_ophyd.SECoPSignal import (
    AttributeType,
    LocalBackend,
    SECoPBackend,
    SECoPXBackend,
)
from secop_ophyd.util import Path

# Predefined Status Codes
DISABLED = 0
IDLE = 100
STANDBY = 130
PREPARED = 150
WARN = 200
WARN_STANDBY = 230
WARN_PREPARED = 250
NSTABLE = 270  # not in SECoP standard (yet)
BUSY = 300
DISABLING = 310
INITIALIZING = 320
PREPARING = 340
STARTING = 360
RAMPING = 370
STABILIZING = 380
FINALIZING = 390
ERROR = 400
ERROR_STANDBY = 430
ERROR_PREPARED = 450
UNKNOWN = 401  # not in SECoP standard (yet)


IGNORED_PROPS = ["meaning", "_plotly"]


[docs] def clean_identifier(anystring): return str(re.sub(r"\W+|^(?=\d)", "_", anystring))
[docs] def secop_enum_name_to_python(member_name: str) -> str: """Convert SECoP enum member name to Python identifier. Examples: 'Low Energy' -> 'LOW_ENERGY' 'high-power' -> 'HIGH_POWER' 'Mode 1' -> 'MODE_1' :param member_name: Original SECoP enum member name :return: Python-compatible identifier in UPPER_CASE """ # Replace spaces and hyphens with underscores, remove other special chars cleaned = re.sub(r"[\s-]+", "_", member_name) cleaned = re.sub(r"[^a-zA-Z0-9_]", "", cleaned) # Convert to uppercase cleaned = cleaned.upper() # Ensure it doesn't start with a digit if cleaned and cleaned[0].isdigit(): cleaned = "_" + cleaned return cleaned
[docs] def format_assigned(device: StandardReadable, signal: SignalR) -> bool: if ( signal.describe in device._describe_funcs or signal.describe in device._describe_config_funcs ): # Standard readable format already assigned return True return False
[docs] def is_read_signal(device: StandardReadable, signal: SignalR | SignalRW) -> bool: if signal.describe in device._describe_funcs: return True return False
[docs] def is_config_signal(device: StandardReadable, signal: SignalR | SignalRW) -> bool: if signal.describe in device._describe_config_funcs: return True return False
[docs] class ParameterType: """Annotation for Parameter Signals, defines the path to the parameter in the secclient module dict""" def __repr__(self) -> str: """Return repr suitable for code generation in annotations.""" return "ParamT()" def __call__(self, parent: Device, child: Device): if not isinstance(child, Signal): return backend = child._connector.backend if not isinstance(backend, SECoPBackend): return backend.attribute_type = AttributeType.PARAMETER backend._secclient = parent._client
[docs] class PropertyType: """Annotation for Module Property Signals, defines the path to the property""" def __repr__(self) -> str: """Return repr suitable for code generation in annotations.""" return "PropT()" def __call__(self, parent: Device, child: Device): if not isinstance(child, Signal): return backend = child._connector.backend if not isinstance(backend, SECoPBackend): return backend.attribute_type = AttributeType.PROPERTY backend._secclient = parent._client
[docs] class SECoPDeviceConnector(DeviceConnector): sri: str module: str | None node_id: str _auto_fill_signals: bool def __init__( self, sri: str, auto_fill_signals: bool = True, loglevel=logging.INFO, logdir: str | None = None, ) -> None: self.sri = sri self.node_id = sri.split(":")[0] + ":" + sri.split(":")[1] self._auto_fill_signals = auto_fill_signals self.loglevel = loglevel self.logdir = logdir if sri.count(":") == 2: self.module = sri.split(":")[2] elif sri.count(":") == 1: self.module = None else: raise RuntimeError(f"Invalid SECoP resource identifier: {sri}") if SECoPDevice._clients.get(self.node_id) is None: raise RuntimeError(f"No AsyncFrappyClient for URI {sri} exists") self.client: AsyncFrappyClient = SECoPDevice._clients[self.node_id]
[docs] def set_module(self, module_name: str): if self.sri.count(":") != 1: raise RuntimeError( "Module can only be set if SRI does not already contain module" ) self.module = module_name self.sri = self.sri + ":" + module_name
[docs] def create_children_from_annotations(self, device: Device): if not hasattr(self, "filler"): self.filler = DeviceFiller( device=device, signal_backend_factory=SECoPBackend, device_connector_factory=lambda: SECoPDeviceConnector( self.sri, self._auto_fill_signals, self.loglevel, self.logdir ), ) list(self.filler.create_signals_from_annotations()) list(self.filler.create_devices_from_annotations(filled=False)) self.filler.check_created()
[docs] def fill_backend_with_path(self, backend: SECoPBackend, annotations: list[Any]): unhandled = [] while annotations: annotation = annotations.pop(0) if isinstance(annotation, StandardReadableFormat): backend.format = annotation else: unhandled.append(annotation) annotations.extend(unhandled)
# These leftover annotations will now be handled by the iterator
[docs] async def connect_mock(self, device: Device, mock: LazyMock): # Make 2 entries for each DeviceVector self.filler.create_device_vector_entries_to_mock(2) # Set the name of the device to name all children device.set_name(device.name) await super().connect_mock(device, mock)
[docs] async def connect_real(self, device: Device, timeout: float, force_reconnect: bool): if not self.sri: raise RuntimeError(f"Could not connect to SEC node: {self.sri}") # Establish connection to SEC Node await self.client.connect(3) # Module Device: fill Parameters & Pproperties # (commands are done via annotated plans) if self.module: # Fill Parmeters parameter_dict = self.client.modules[self.module]["parameters"] # remove ignored signals parameters = [ child for child in parameter_dict.keys() if child not in self.filler.ignored_signals ] # Dertermine children that are declared but not yet filled not_filled = {unfilled for unfilled, _ in device.children()} for param_name in parameters: if self._auto_fill_signals or param_name in not_filled: signal_type = ( SignalR if parameter_dict[param_name]["readonly"] else SignalRW ) backend = self.filler.fill_child_signal(param_name, signal_type) from secop_ophyd.GenNodeCode import get_type_param datatype = get_type_param(parameter_dict[param_name]["datatype"]) backend.init_parameter_from_introspection( datatype=datatype, path=self.module + ":" + param_name, secclient=self.client, ) # Fill Properties module_property_dict = self.client.modules[self.module]["properties"] # remove ignored signals module_properties = [ child for child in module_property_dict.keys() if child not in self.filler.ignored_signals and child not in IGNORED_PROPS ] for mod_property_name in module_properties: if self._auto_fill_signals or mod_property_name in not_filled: # properties are always read only backend = self.filler.fill_child_signal(mod_property_name, SignalR) from secop_ophyd.GenNodeCode import get_type_prop datatype = get_type_prop(module_property_dict[mod_property_name]) backend.init_property_from_introspection( datatype=datatype, path=self.module + ":" + mod_property_name, secclient=self.client, ) # Node Device: fill child devices (modules) else: # Fill Module devices modules = self.client.modules not_filled = {unfilled for unfilled, _ in device.children()} for module_name in modules.keys(): if self._auto_fill_signals or module_name in not_filled: module_properties = modules[module_name]["properties"] device_sub_class = class_from_interface(module_properties) self.filler.fill_child_device(module_name, device_sub_class) mod_dev: SECoPDevice = getattr(device, module_name) mod_dev.set_module(module_name) # Fill Node properties node_property_dict = self.client.properties # remove ignored signals node_properties = [ child for child in node_property_dict.keys() if child not in self.filler.ignored_signals ] for node_property_name in node_properties: if self._auto_fill_signals or node_property_name in not_filled: # properties are always read only backend = self.filler.fill_child_signal(node_property_name, SignalR) from secop_ophyd.GenNodeCode import get_type_prop datatype = get_type_prop(node_property_dict[node_property_name]) backend.init_property_from_introspection( datatype=datatype, path=node_property_name, secclient=self.client, ) self.filler.check_filled(f"{self.node_id}") # Set the name of the device to name all children device.set_name(device.name) await super().connect_real(device, timeout, force_reconnect) # All Signals and child devs should be filled and connected now, in the next # all signals and child devices need to be added to the according # StandardReadableFormat with the hierarchiy: # 1. Format given in Annotation # --> these will already have been set by the DeviceFiller # 2. Module Interface Class definition (value, target,...) # --> these are set at the end of a .connect() method of the according # SECoPDevice subclass skipping any signals that have already been # set by annotations (should emit warning if there is a conflict # config vs read sig) # 3. & 4. Definition in Parameter property "_signal_format" + Defaults # - _signal_format property + default CONFIG_SIGNAL for all other Signals # --> these are set here at the end of SECoPDeviceConnector.connect_real() # for all Signals that have not yet been set to a format # - CHILD format for all child devices (SECoPDevice instances) # --> these are set at the end of SECoPNodeDevice.connect() method # the device tree has only a depth of 2 levels (Node -> Modules) # # device has to be standard readable for this to make sense if not isinstance(device, SECoPDevice): return # 2. Module Interface Class definition (value, target,...) await device._assign_interface_formats() # 3. & 4. Definition in Parameter property "_signal_format" + Defaults await device._assign_default_formats()
[docs] class SECoPCMDDevice(StandardReadable, Flyable, Triggerable): """ Command devices that have Signals for command args, return values and a signal for triggering command execution (SignalX). They themselves are triggerable. Once the CMD Device is triggered, the command args are retrieved from the 'argument' Signal. The command message is sent to the SEC Node and the return value is written to 'result' signal. """ def __init__(self, path: Path, secclient: AsyncFrappyClient): """Initialize the CMD Device :param path: Path to the command in the secclient module dict :type path: Path :param secclient: SECoP client providing communication to the SEC Node :type secclient: AsyncFrappyClient """ dev_name: str = path.get_signal_name() + "_CMD" self._secclient: AsyncFrappyClient = secclient cmd_props = secclient.modules[path._module_name]["commands"][ path._accessible_name ] # noqa: E501 cmd_datatype: CommandType = cmd_props["datatype"] datainfo = cmd_props[DATAINFO] self.description: str = cmd_props["description"] self.arg_dtype = cmd_datatype.argument self.res_dtype = cmd_datatype.result self.argument: SignalRW | None self.result: SignalR | None # result signals read = [] # argument signals config = [] self._start_time: float self.commandx: SignalX self.wait_idle: bool = False with self.add_children_as_readables( format=StandardReadableFormat.CONFIG_SIGNAL ): # Argument Signals (config Signals, can also be read) arg_path = path.append("argument") if self.arg_dtype is None: self.argument = None else: arg_backend = LocalBackend( path=arg_path, secop_dtype_obj=self.arg_dtype, sig_datainfo=datainfo["argument"], ) self.argument = SignalRW(arg_backend) config.append(self.argument) # Result Signals (read Signals) res_path = path.append("result") if self.res_dtype is None: self.result = None else: res_backend = LocalBackend( path=res_path, secop_dtype_obj=self.res_dtype, sig_datainfo=datainfo["result"], ) self.result = SignalRW(res_backend) read.append(self.argument) argument = None result = None if isinstance(self.argument, SignalR): argument = self.argument._connector.backend if isinstance(self.result, SignalR): result = self.result._connector.backend # SignalX (signal that triggers execution of the Command) exec_backend = SECoPXBackend( path=path, secclient=secclient, argument=argument, # type: ignore result=result, # type: ignore ) self.commandx = SignalX(exec_backend) super().__init__(name=dev_name)
[docs] def trigger(self) -> AsyncStatus: """Triggers the SECoPCMDDevice and sends command message to SEC Node. Command argument is taken form 'argument' Signal, and return value is written in the 'return' Signal :return: A Status object, that is marked Done once the answer from the SEC Node is received :rtype: AsyncStatus """ coro = asyncio.wait_for(fut=self._exec_cmd(), timeout=None) return AsyncStatus(awaitable=coro)
[docs] def kickoff(self) -> AsyncStatus: # trigger execution of secop command, wait until Device is Busy self._start_time = ttime.time() coro = asyncio.wait_for(fut=asyncio.sleep(1), timeout=None) return AsyncStatus(coro)
async def _exec_cmd(self): stat = self.commandx.trigger() await stat
[docs] def complete(self) -> AsyncStatus: coro = asyncio.wait_for(fut=self._exec_cmd(), timeout=None) return AsyncStatus(awaitable=coro)
[docs] def collect(self) -> Iterator[PartialEvent]: yield dict( time=self._start_time, timestamps={self.name: []}, data={self.name: []} )
[docs] class SECoPDevice(StandardReadable): _clients: Dict[str, AsyncFrappyClient] = {} _node_id: str _sri: str _host: str _port: str _module: str | None _mod_prop_devices: Dict[str, SignalR] _param_devices: Dict[str, Any] _logger: Logger hinted_signals: list[str] = [] def __init__( self, sri: str = "", # SECoP resource identifier host:port:optional[module] name: str = "", connector: SECoPDeviceConnector | None = None, loglevel=logging.INFO, logdir: str | None = None, ) -> None: if connector and sri: raise RuntimeError("Provide either sri or connector, not both") if connector: sri = connector.sri loglevel = connector.loglevel logdir = connector.logdir self._sri = sri self._host = sri.split(":")[0] self._port = sri.split(":")[1] self._mod_prop_devices = {} self._param_devices = {} self._node_id = sri.split(":")[0] + ":" + sri.split(":")[1] self._logger = setup_logging( name=f"frappy:{self._host}:{self._port}", level=loglevel, log_dir=logdir, ) self._module = None if len(sri.split(":")) > 2: self._module = sri.split(":")[2] if SECoPDevice._clients.get(self._node_id) is None: SECoPDevice._clients[self._node_id] = AsyncFrappyClient( host=self._host, port=self._port, log=self._logger ) connector = connector or SECoPDeviceConnector(sri=sri) self._client: AsyncFrappyClient = SECoPDevice._clients[self._node_id] super().__init__(name=name, connector=connector)
[docs] def set_module(self, module_name: str): if self._module is not None: raise RuntimeError("Module can only be set if it was not already set") self._module = module_name self._sri = self._sri + ":" + module_name self._connector.set_module(module_name)
[docs] async def connect( self, mock: bool | LazyMock = False, timeout: float = DEFAULT_TIMEOUT, force_reconnect: bool = False, ): if not self._client.online or force_reconnect: # Establish connection to SEC Node await self._client.connect(3) if self._module: module_desc = self._client.modules[self._module] # Initialize Command Devices for command, _ in module_desc["commands"].items(): # generate new root path cmd_path = Path(parameter_name=command, module_name=self._module) cmd_dev_name = command + "_CMD" setattr( self, cmd_dev_name, SECoPCMDDevice(path=cmd_path, secclient=self._client), ) cmd_dev: SECoPCMDDevice = getattr(self, cmd_dev_name) # Add Bluesky Plan Methods # Stop is already an ophyd native operation if command == "stop": continue cmd_plan = self.generate_cmd_plan( cmd_dev, cmd_dev.arg_dtype, cmd_dev.res_dtype ) setattr(self, command, MethodType(cmd_plan, self)) await super().connect(mock, timeout, force_reconnect) if self._module is None: # set device name from equipment id property self.set_name(self._client.properties[EQUIPMENT_ID].replace(".", "-")) else: self.set_name(self._module)
[docs] def generate_cmd_plan( self, cmd_dev: SECoPCMDDevice, argument_type: Type | None = None, return_type: Type | None = None, ): def command_plan_no_arg(self, wait_for_idle: bool = False): # Trigger the Command device, meaning that the command gets sent to the # SEC Node yield from bps.trigger(cmd_dev, wait=True) if wait_for_idle: def wait_for_idle_factory(): return self.wait_for_idle() yield from bps.wait_for([wait_for_idle_factory]) if ( return_type is not None and isinstance(cmd_dev.result, SignalR) and isinstance(cmd_dev.result._connector.backend, LocalBackend) ): return cmd_dev.result._connector.backend.reading.get_value() def command_plan(self, arg, wait_for_idle: bool = False): # TODO Type checking if arg is not None: yield from bps.abs_set(cmd_dev.argument, arg) # Trigger the Command device, meaning that the command gets sent to the # SEC Node yield from bps.trigger(cmd_dev, wait=True) if wait_for_idle: def wait_for_idle_factory(): return self.wait_for_idle() yield from bps.wait_for([wait_for_idle_factory]) if ( return_type is not None and isinstance(cmd_dev.result, SignalR) and isinstance(cmd_dev.result._connector.backend, LocalBackend) ): return cmd_dev.result._connector.backend.reading.get_value() cmd_meth = command_plan_no_arg if argument_type is None else command_plan anno_dict = cmd_meth.__annotations__ dtype_mapping = { StructOf: dict[str, Any], ArrayOf: list[Any], TupleOf: tuple[Any], BLOBType: str, BoolType: bool, FloatRange: float, IntRange: int, ScaledInteger: int, StringType: str, } if return_type is not None: anno_dict["return"] = dtype_mapping[return_type.__class__] if argument_type is not None: anno_dict["arg"] = dtype_mapping[argument_type.__class__] return cmd_meth
@abstractmethod async def _assign_interface_formats(self): """Assign signal formats specific to this device's interface class. Subclasses override this to assign formats before default fallback.""" async def _assign_default_formats(self): config_signals = [] hinted_signals = [] uncached_signals = [] hinted_uncached_signals = [] def assert_device_is_signalr(device: Device) -> SignalR: if not isinstance(device, SignalR): raise TypeError(f"{device} is not a SignalR") return device for _, child in self.children(): if not isinstance(child, Signal): continue backend = child._connector.backend if not isinstance(backend, SECoPBackend): continue param_name = backend.path_str.split(":")[-1] if param_name == "status": # status signals should not be assigned a format, # but a SignalR children (this can be removed once tiled can # hanlde composite dtypes) continue # child is a Signal with SECoPParamBackend # check if signal already has a format assigned signalr_device = assert_device_is_signalr(child) if format_assigned(self, signalr_device): # format already assigned by annotation or module IF class continue match backend.format: case StandardReadableFormat.CHILD: raise RuntimeError("Signal cannot have CHILD format") case StandardReadableFormat.CONFIG_SIGNAL: config_signals.append(signalr_device) case StandardReadableFormat.HINTED_SIGNAL: hinted_signals.append(signalr_device) case StandardReadableFormat.UNCACHED_SIGNAL: uncached_signals.append(signalr_device) case StandardReadableFormat.HINTED_UNCACHED_SIGNAL: hinted_uncached_signals.append(signalr_device) # add signals to device in the order of their priority self.add_readables(config_signals, StandardReadableFormat.CONFIG_SIGNAL) self.add_readables(hinted_signals, StandardReadableFormat.HINTED_SIGNAL) self.add_readables(uncached_signals, StandardReadableFormat.UNCACHED_SIGNAL) self.add_readables( hinted_uncached_signals, StandardReadableFormat.HINTED_UNCACHED_SIGNAL )
[docs] class SECoPNodeDevice(SECoPDevice): hinted_signals: list[str] = [] def __init__( self, sec_node_uri: str = "", # SECoP resource identifier host:port:optional[module] name: str = "", loglevel=logging.INFO, logdir: str | None = None, ): # ensure sec_node_uri only contains host:port if sec_node_uri.count(":") != 1: raise RuntimeError( f"SECoPNodeDevice SRI must only contain host:port {sec_node_uri}" ) super().__init__(sri=sec_node_uri, name=name, loglevel=loglevel, logdir=logdir)
[docs] async def connect(self, mock=False, timeout=DEFAULT_TIMEOUT, force_reconnect=False): await super().connect(mock, timeout, force_reconnect) moddevs = [] for _, moddev in self.children(): if isinstance(moddev, SECoPDevice): moddevs.append(moddev) self.add_readables(moddevs, StandardReadableFormat.CHILD) # register secclient callbacks (these are useful if sec node description # changes after a reconnect) self._client.register_callback( None, self.descriptiveDataChange, self.nodeStateChange )
[docs] def descriptiveDataChange(self, module, description): # noqa: N802 raise RuntimeError( "The descriptive data has changed upon reconnect. Descriptive data changes" "are not supported: reinstantiate device" )
[docs] def nodeStateChange(self, online, state): # noqa: N802 """called when the state of the connection changes 'online' is True when connected or reconnecting, False when disconnected or connecting 'state' is the connection state as a string """ if state == "connected" and online is True: self._client.conn_timestamp = ttime.time()
async def _assign_interface_formats(self): # Node device has no specific interface class formats pass
[docs] def class_from_instance(self, path_to_module: str | None = None): from secop_ophyd.GenNodeCode import GenNodeCode description = self._client.client.request("describe")[2] # parse genClass file if already present genCode = GenNodeCode(path=path_to_module, log=self._logger) genCode.from_json_describe(description) genCode.write_gen_node_class_file()
[docs] class SECoPCommunicatorDevice(SECoPDevice): hinted_signals: list[str] = [] def __init__( self, sri: str = "", # SECoP resource identifier host:port:optional[module] name: str = "", connector: SECoPDeviceConnector | None = None, loglevel=logging.INFO, logdir: str | None = None, ) -> None: super().__init__( sri=sri, name=name, connector=connector, loglevel=loglevel, logdir=logdir ) async def _assign_interface_formats(self): # Communicator has no specific interface class formats pass
[docs] class SECoPReadableDevice(SECoPDevice, Triggerable, Subscribable): """ Standard readable SECoP device, corresponding to a SECoP module with the interface class "Readable" """ hinted_signals: list[str] = ["value"] def __init__( self, sri: str = "", # SECoP resource identifier host:port:optional[module] name: str = "", connector: SECoPDeviceConnector | None = None, loglevel=logging.INFO, logdir: str | None = None, ): """Initializes the SECoPReadableDevice :param secclient: SECoP client providing communication to the SEC Node :type secclient: AsyncFrappyClient :param module_name: Name of the SEC Node module that is represented by this device :type module_name: str """ self.value: SignalR self.status: SignalR super().__init__( sri=sri, name=name, connector=connector, loglevel=loglevel, logdir=logdir )
[docs] async def connect(self, mock=False, timeout=DEFAULT_TIMEOUT, force_reconnect=False): await super().connect(mock, timeout, force_reconnect) if not hasattr(self, "value"): raise AttributeError( "Attribute 'value' has not been assigned," + "but is needed for Readable interface class" ) if not hasattr(self, "status"): raise AttributeError( "Attribute 'status' has not been assigned," + "but is needed for Readable interface class" )
async def _assign_interface_formats(self): if format_assigned(self, self.value): if not is_read_signal(self, self.value): warnings.warn( f"Signal 'value' of device {self.name} has format assigned " + "that is not compatible with Readable interface class" ) else: self.add_readables([self.value], StandardReadableFormat.HINTED_SIGNAL) # TODO ensure status signal must be neither config nor read format
[docs] async def wait_for_idle(self): """asynchronously waits until module is IDLE again. this is helpful, for running commands that are not done immediately """ self._logger.info(f"Waiting for {self.name} to be IDLE") if self.status is None: self._logger.error("Status Signal not initialized") raise Exception("status Signal not initialized") # force reading of fresh status from device await self.status.read(False) async for current_stat in observe_value(self.status): # status is has type Tuple and is therefore transported as # structured Numpy array ('f0':statuscode;'f1':status Message) stat_code = current_stat["f0"] # Module is in IDLE/WARN state if IDLE <= stat_code < BUSY: self._logger.info(f"Module {self.name} --> IDLE") break if hasattr(self, "_stopped"): # self.logger.info(f"Module {self.name} was stopped STOPPED") if self._stopped is True: break # Error State or DISABLED if hasattr(self, "_success"): if stat_code >= ERROR or stat_code < IDLE: self._logger.error(f"Module {self.name} --> ERROR/DISABLED") self._success = False break
# TODO add timeout
[docs] def observe_status_change(self, monitored_status_code: int): async def switch_from_status_inner(): async for current_stat in observe_value(self.status): # status is has type Tuple and is therefore transported as # structured Numpy array ('f0':statuscode;'f1':status Message) stat_code = current_stat["f0"] if monitored_status_code != stat_code: break def switch_from_status_factory(): return switch_from_status_inner() yield from bps.wait_for([switch_from_status_factory])
[docs] def trigger(self) -> AsyncStatus: self._logger.info(f"Triggering {self.name}: read fresh data from device") # get fresh reading of the value Parameter from the SEC Node return AsyncStatus( awaitable=self._client.get_parameter(self._module, "value", trycache=False) )
[docs] def subscribe(self, function: Callback[dict[str, Reading]]) -> None: """Subscribe to updates in the reading""" self.value.subscribe(function=function)
[docs] def clear_sub(self, function: Callback) -> None: """Remove a subscription.""" self.value.clear_sub(function=function)
[docs] class SECoPTriggerableDevice(SECoPReadableDevice, Stoppable): """ Standard triggerable SECoP device, corresponding to a SECoP module with the0s interface class "Triggerable" """ hinted_signals: list[str] = ["value"] def __init__( self, sri: str = "", # SECoP resource identifier host:port:optional[module] name: str = "", connector: SECoPDeviceConnector | None = None, loglevel=logging.INFO, logdir: str | None = None, ): """Initialize SECoPTriggerableDevice :param secclient: SECoP client providing communication to the SEC Node :type secclient: AsyncFrappyClient :param module_name: ame of the SEC Node module that is represented by this device :type module_name: str """ self.go_CMD: SECoPCMDDevice self._success = True self._stopped = False super().__init__( sri=sri, name=name, connector=connector, loglevel=loglevel, logdir=logdir )
[docs] class SECoPWritableDevice(SECoPReadableDevice): hinted_signals: list[str] = ["target", "value"] pass
[docs] class SECoPMoveableDevice(SECoPReadableDevice, Locatable, Stoppable): """ Standard movable SECoP device, corresponding to a SECoP module with the interface class "Drivable" """ hinted_signals: list[str] = ["target", "value"] def __init__( self, sri: str = "", # SECoP resource identifier host:port:optional[module] name: str = "", connector: SECoPDeviceConnector | None = None, loglevel=logging.INFO, logdir: str | None = None, ): """Initialize SECoPMovableDevice :param secclient: SECoP client providing communication to the SEC Node :type secclient: AsyncFrappyClient :param module_name: ame of the SEC Node module that is represented by this device :type module_name: str """ self.target: SignalRW super().__init__( sri=sri, name=name, connector=connector, loglevel=loglevel, logdir=logdir ) self._success = True self._stopped = False
[docs] async def connect(self, mock=False, timeout=DEFAULT_TIMEOUT, force_reconnect=False): await super().connect(mock, timeout, force_reconnect) if not hasattr(self, "target"): raise AttributeError( "Attribute 'target' has not been assigned, " + "but is needed for 'Drivable' interface class!" )
[docs] def set(self, new_target, timeout: Optional[float] = None) -> AsyncStatus: """Sends new target to SEC Nonde and waits until module is IDLE again :param new_target: new taget/setpoint for module :type new_target: _type_ :param timeout: timeout for set operation, defaults to None :type timeout: Optional[float], optional :return: Asyncstatus that gets set to Done once module is IDLE again :rtype: AsyncStatus """ coro = asyncio.wait_for(self._move(new_target), timeout=timeout) return AsyncStatus(coro)
async def _move(self, new_target): self._success = True self._stopped = False await self.target.set(new_target) self._logger.info(f"Moving {self.name} to {new_target}") # force reading of status from device await self.status.read(False) # observe status and wait until dvice is IDLE again async for current_stat in observe_value(self.status): stat_code = current_stat["f0"] if self._stopped is True: self._logger.info( f"Move of {self.name} to {new_target} was stopped STOPPED" ) break # Error State or DISABLED if stat_code >= ERROR or stat_code < IDLE: self._logger.error(f"Module {self.name} --> ERROR/DISABLED") self._success = False break # Module is in IDLE/WARN state if IDLE <= stat_code < BUSY: self._logger.info(f"Reached Target Module {self.name} --> IDLE") break # TODO other status transitions if not self._success: self._logger.error( f"Move of {self.name} to {new_target} was not successful" )
[docs] async def stop(self, success=True): """Calls stop command on the SEC Node module :param success: True: device is stopped as planned False: something has gone wrong (defaults to True) :type success: bool, optional """ self._success = success if not success: self._logger.info(f"Stopping {self.name} success={success}") await self._client.exec_command(self._module, "stop") self._stopped = True
[docs] async def locate(self) -> Location: # return current location of the device (setpoint and readback). # Only locally cached values are returned setpoint = await self._client.get_parameter(self._module, "target", True) readback = await self._client.get_parameter(self._module, "value", True) location: Location = { "setpoint": setpoint.value, "readback": readback.value, } return location
async def _assign_interface_formats(self): await super()._assign_interface_formats() if format_assigned(self, self.target): if not is_read_signal(self, self.target): warnings.warn( f"Signal 'target' of device {self.name} has format assigned " + "that is not compatible with Movable interface class" ) else: self.add_readables([self.target], StandardReadableFormat.HINTED_SIGNAL)
[docs] def class_from_interface(mod_properties: dict): ophyd_class = None # infer highest level IF class module_interface_classes: dict = mod_properties[INTERFACE_CLASSES] for interface_class in IF_CLASSES.keys(): if interface_class in module_interface_classes: ophyd_class = IF_CLASSES[interface_class] break # No predefined IF class was a match --> use base class (loose collection of # accessibles) if ophyd_class is None: ophyd_class = SECoPDevice # type: ignore return ophyd_class
IF_CLASSES = { "Triggerable": SECoPTriggerableDevice, "Drivable": SECoPMoveableDevice, "Writable": SECoPWritableDevice, "Readable": SECoPReadableDevice, "Communicator": SECoPCommunicatorDevice, }