import asyncio
import inspect
import logging
import re
import time as ttime
from logging import Logger
from types import MethodType
from typing import Any, Dict, Iterator, Optional, Type
import bluesky.plan_stubs as bps
from bluesky.protocols import (
Flyable,
Locatable,
Location,
PartialEvent,
Reading,
Stoppable,
Subscribable,
Triggerable,
)
from frappy.datatypes import (
ArrayOf,
BLOBType,
BoolType,
CommandType,
FloatRange,
IntRange,
ScaledInteger,
StringType,
StructOf,
TupleOf,
)
from ophyd_async.core import (
DEFAULT_TIMEOUT,
AsyncStatus,
LazyMock,
SignalR,
SignalRW,
SignalX,
StandardReadable,
StandardReadableFormat,
observe_value,
)
from ophyd_async.core._utils import Callback
from secop_ophyd.AsyncFrappyClient import AsyncFrappyClient
from secop_ophyd.GenNodeCode import GenNodeCode, Method
from secop_ophyd.logs import LOG_LEVELS, setup_logging
from secop_ophyd.propertykeys import DATAINFO, EQUIPMENT_ID, INTERFACE_CLASSES
from secop_ophyd.SECoPSignal import (
LocalBackend,
PropertyBackend,
SECoPParamBackend,
SECoPXBackend,
)
from secop_ophyd.util import Path
# Predefined Status Codes
DISABLED = 0
IDLE = 100
STANDBY = 130
PREPARED = 150
WARN = 200
WARN_STANDBY = 230
WARN_PREPARED = 250
NSTABLE = 270 # not in SECoP standard (yet)
BUSY = 300
DISABLING = 310
INITIALIZING = 320
PREPARING = 340
STARTING = 360
RAMPING = 370
STABILIZING = 380
FINALIZING = 390
ERROR = 400
ERROR_STANDBY = 430
ERROR_PREPARED = 450
UNKNOWN = 401 # not in SECoP standard (yet)
[docs]
def clean_identifier(anystring):
return str(re.sub(r"\W+|^(?=\d)", "_", anystring))
[docs]
class SECoPCMDDevice(StandardReadable, Flyable, Triggerable):
"""
Command devices that have Signals for command args, return values and a signal
for triggering command execution (SignalX). They themselves are triggerable.
Once the CMD Device is triggered, the command args are retrieved from the 'argument'
Signal. The command message is sent to the SEC Node and the return value is written
to 'result' signal.
"""
def __init__(self, path: Path, secclient: AsyncFrappyClient):
"""Initialize the CMD Device
:param path: Path to the command in the secclient module dict
:type path: Path
:param secclient: SECoP client providing communication to the SEC Node
:type secclient: AsyncFrappyClient
"""
dev_name: str = path.get_signal_name() + "_CMD"
self._secclient: AsyncFrappyClient = secclient
cmd_props = secclient.modules[path._module_name]["commands"][
path._accessible_name
] # noqa: E501
cmd_datatype: CommandType = cmd_props["datatype"]
datainfo = cmd_props[DATAINFO]
self.description: str = cmd_props["description"]
self.arg_dtype = cmd_datatype.argument
self.res_dtype = cmd_datatype.result
self.argument: SignalRW | None
self.result: SignalR | None
# result signals
read = []
# argument signals
config = []
self._start_time: float
self.commandx: SignalX
self.wait_idle: bool = False
with self.add_children_as_readables(
format=StandardReadableFormat.CONFIG_SIGNAL
):
# Argument Signals (config Signals, can also be read)
arg_path = path.append("argument")
if self.arg_dtype is None:
self.argument = None
else:
arg_backend = LocalBackend(
path=arg_path,
secop_dtype_obj=self.arg_dtype,
sig_datainfo=datainfo["argument"],
)
self.argument = SignalRW(arg_backend)
config.append(self.argument)
# Result Signals (read Signals)
res_path = path.append("result")
if self.res_dtype is None:
self.result = None
else:
res_backend = LocalBackend(
path=res_path,
secop_dtype_obj=self.res_dtype,
sig_datainfo=datainfo["result"],
)
self.result = SignalRW(res_backend)
read.append(self.argument)
argument = None
result = None
if isinstance(self.argument, SignalR):
argument = self.argument._connector.backend
if isinstance(self.result, SignalR):
result = self.result._connector.backend
# SignalX (signal that triggers execution of the Command)
exec_backend = SECoPXBackend(
path=path,
secclient=secclient,
argument=argument, # type: ignore
result=result, # type: ignore
)
self.commandx = SignalX(exec_backend)
super().__init__(name=dev_name)
[docs]
def trigger(self) -> AsyncStatus:
"""Triggers the SECoPCMDDevice and sends command message to SEC Node.
Command argument is taken form 'argument' Signal, and return value is
written in the 'return' Signal
:return: A Status object, that is marked Done once the answer from the
SEC Node is received
:rtype: AsyncStatus
"""
coro = asyncio.wait_for(fut=self._exec_cmd(), timeout=None)
return AsyncStatus(awaitable=coro)
[docs]
def kickoff(self) -> AsyncStatus:
# trigger execution of secop command, wait until Device is Busy
self._start_time = ttime.time()
coro = asyncio.wait_for(fut=asyncio.sleep(1), timeout=None)
return AsyncStatus(coro)
async def _exec_cmd(self):
stat = self.commandx.trigger()
await stat
[docs]
def complete(self) -> AsyncStatus:
coro = asyncio.wait_for(fut=self._exec_cmd(), timeout=None)
return AsyncStatus(awaitable=coro)
[docs]
def collect(self) -> Iterator[PartialEvent]:
yield dict(
time=self._start_time, timestamps={self.name: []}, data={self.name: []}
)
[docs]
class SECoPBaseDevice(StandardReadable):
"""Base Class for generating Opyd devices from SEC Node modules,
objects of type SECoPBaseDevice are not supposed to be instanciated
"""
def __init__(
self,
secclient: AsyncFrappyClient,
module_name: str,
loglevel=logging.INFO,
logdir: str | None = None,
) -> None:
"""Initiate A SECoPBaseDevice
:param secclient: SECoP client providing communication to the SEC Node
:type secclient: AsyncFrappyClient
"""
# config_params is default
self._hinted_params: list[str] = ["value", "target"]
self._uncached_params: list[str] = []
self._hinted_uncached_params: list[str] = []
self._secclient: AsyncFrappyClient = secclient
self.impl: str | None = None
self._module = module_name
module_desc = secclient.modules[module_name]
self.plans: list[Method] = []
self.mod_prop_devices: Dict[str, SignalR] = {}
self.param_devices: Dict[str, Any] = {}
name = self._secclient.properties[EQUIPMENT_ID].replace(".", "-")
for parameter, properties in module_desc["parameters"].items():
match properties.get("_signal_format", None):
case "HINTED_SIGNAL":
if parameter not in self._hinted_params:
self._hinted_params.append(parameter)
case "HINTED_UNCACHED_SIGNAL":
if parameter not in self._hinted_uncached_params:
self._hinted_uncached_params.append(parameter)
case "UNCACHED_SIGNAL":
if parameter not in self._uncached_params:
self._uncached_params.append(parameter)
case _:
continue
self.logger: Logger = setup_logging(
name=f"secop-ophyd:{name}:{module_name}", level=loglevel, log_dir=logdir
)
self.logger.info(f"Initializing SECoPBaseDevice for module {module_name}")
# Add configuration Signals
with self.add_children_as_readables(
format=StandardReadableFormat.CONFIG_SIGNAL
):
# generate Signals from Module Properties
for property in module_desc["properties"]:
if property == "implementation":
self.impl = module_desc["properties"]["implementation"]
if property in ["meaning", "_plotly"]:
continue
propb = PropertyBackend(property, module_desc["properties"], secclient)
setattr(self, property, SignalR(backend=propb))
self.mod_prop_devices[property] = getattr(self, property)
# generate Signals from Module parameters eiter r or rw
for parameter, properties in module_desc["parameters"].items():
if (
parameter
in self._hinted_params
+ self._uncached_params
+ self._hinted_uncached_params
):
continue
# generate new root path
param_path = Path(parameter_name=parameter, module_name=module_name)
# readonly propertyns to plans and plan stubs.
readonly: bool = properties.get("readonly", None)
# Normal types + (struct and tuple as JSON object Strings)
self._signal_from_parameter(
path=param_path,
sig_name=parameter,
readonly=readonly,
)
self.param_devices[parameter] = getattr(self, parameter)
self.add_signals_by_format(
format=StandardReadableFormat.HINTED_SIGNAL,
format_params=self._hinted_params,
module_name=module_name,
module_desc=module_desc,
)
self.add_signals_by_format(
format=StandardReadableFormat.UNCACHED_SIGNAL,
format_params=self._uncached_params,
module_name=module_name,
module_desc=module_desc,
)
self.add_signals_by_format(
format=StandardReadableFormat.HINTED_UNCACHED_SIGNAL,
format_params=self._hinted_uncached_params,
module_name=module_name,
module_desc=module_desc,
)
# Initialize Command Devices
for command, properties in module_desc["commands"].items():
# generate new root path
cmd_path = Path(parameter_name=command, module_name=module_name)
cmd_dev_name = command + "_CMD"
setattr(
self,
cmd_dev_name,
SECoPCMDDevice(path=cmd_path, secclient=secclient),
)
cmd_dev: SECoPCMDDevice = getattr(self, cmd_dev_name)
# Add Bluesky Plan Methods
# Stop is already an ophyd native operation
if command == "stop":
continue
cmd_plan = self.generate_cmd_plan(
cmd_dev, cmd_dev.arg_dtype, cmd_dev.res_dtype
)
setattr(self, command, MethodType(cmd_plan, self))
description: str = ""
description += f"{cmd_dev.description}\n"
description += f" argument: {str(cmd_dev.arg_dtype)}\n"
description += f" result: {str(cmd_dev.res_dtype)}"
plan = Method(
cmd_name=command,
description=description,
cmd_sign=inspect.signature(getattr(self, command)),
)
self.plans.append(plan)
self.set_name(module_name)
# Add status Signal AFTER set_name() to avoid auto-registration as config/hinted
# This is only needed as long as Tiled can't handle structured numpy arrays
if "status" in module_desc["parameters"].keys():
properties = module_desc["parameters"]["status"]
param_path = Path(parameter_name="status", module_name=module_name)
readonly = properties.get("readonly", None)
# Create signal without adding to readables
self._signal_from_parameter(
path=param_path,
sig_name="status",
readonly=readonly,
)
self.param_devices["status"] = getattr(self, "status")
[docs]
def generate_cmd_plan(
self,
cmd_dev: SECoPCMDDevice,
argument_type: Type | None = None,
return_type: Type | None = None,
):
def command_plan_no_arg(self, wait_for_idle: bool = False):
# Trigger the Command device, meaning that the command gets sent to the
# SEC Node
yield from bps.trigger(cmd_dev, wait=True)
if wait_for_idle:
def wait_for_idle_factory():
return self.wait_for_idle()
yield from bps.wait_for([wait_for_idle_factory])
if (
return_type is not None
and isinstance(cmd_dev.result, SignalR)
and isinstance(cmd_dev.result._connector.backend, LocalBackend)
):
return cmd_dev.result._connector.backend.reading.get_value()
def command_plan(self, arg, wait_for_idle: bool = False):
# TODO Type checking
if arg is not None:
yield from bps.abs_set(cmd_dev.argument, arg)
# Trigger the Command device, meaning that the command gets sent to the
# SEC Node
yield from bps.trigger(cmd_dev, wait=True)
if wait_for_idle:
def wait_for_idle_factory():
return self.wait_for_idle()
yield from bps.wait_for([wait_for_idle_factory])
if (
return_type is not None
and isinstance(cmd_dev.result, SignalR)
and isinstance(cmd_dev.result._connector.backend, LocalBackend)
):
return cmd_dev.result._connector.backend.reading.get_value()
cmd_meth = command_plan_no_arg if argument_type is None else command_plan
anno_dict = cmd_meth.__annotations__
dtype_mapping = {
StructOf: dict[str, Any],
ArrayOf: list[Any],
TupleOf: tuple[Any],
BLOBType: str,
BoolType: bool,
FloatRange: float,
IntRange: int,
ScaledInteger: int,
StringType: str,
}
if return_type is not None:
anno_dict["return"] = dtype_mapping[return_type.__class__]
if argument_type is not None:
anno_dict["arg"] = dtype_mapping[argument_type.__class__]
return cmd_meth
def _signal_from_parameter(self, path: Path, sig_name: str, readonly: bool):
"""Generates an Ophyd Signal from a Module Parameter
:param path: Path to the Parameter in the secclient module dict
:type path: Path
:param sig_name: Name of the new Signal
:type sig_name: str
:param readonly: Signal is R or RW
:type readonly: bool
"""
# Normal types + (struct and tuple as JSON object Strings)
paramb = SECoPParamBackend(path=path, secclient=self._secclient)
# construct signal
if readonly:
setattr(self, sig_name, SignalR(paramb))
else:
setattr(self, sig_name, SignalRW(paramb))
[docs]
class SECoPCommunicatorDevice(SECoPBaseDevice):
def __init__(
self,
secclient: AsyncFrappyClient,
module_name: str,
loglevel=logging.INFO,
logdir: str | None = None,
):
"""Initializes the SECoPCommunicatorDevice
:param secclient: SECoP client providing communication to the SEC Node
:type secclient: AsyncFrappyClient
:param module_name: Name of the SEC Node module that is represented by
this device
:type module_name: str"""
super().__init__(
secclient=secclient,
module_name=module_name,
loglevel=loglevel,
logdir=logdir,
)
[docs]
class SECoPReadableDevice(SECoPCommunicatorDevice, Triggerable, Subscribable):
"""
Standard readable SECoP device, corresponding to a SECoP module with the
interface class "Readable"
"""
def __init__(
self,
secclient: AsyncFrappyClient,
module_name: str,
loglevel=logging.INFO,
logdir: str | None = None,
):
"""Initializes the SECoPReadableDevice
:param secclient: SECoP client providing communication to the SEC Node
:type secclient: AsyncFrappyClient
:param module_name: Name of the SEC Node module that is represented by
this device
:type module_name: str
"""
self.value: SignalR
self.status: SignalR
super().__init__(
secclient=secclient,
module_name=module_name,
loglevel=loglevel,
logdir=logdir,
)
if not hasattr(self, "value"):
raise AttributeError(
"Attribute 'value' has not been assigned,"
+ "but is needed for Readable interface class"
)
if not hasattr(self, "status"):
raise AttributeError(
"Attribute 'status' has not been assigned,"
+ "but is needed for Readable interface class"
)
[docs]
async def wait_for_idle(self):
"""asynchronously waits until module is IDLE again. this is helpful,
for running commands that are not done immediately
"""
self.logger.info(f"Waiting for {self.name} to be IDLE")
if self.status is None:
self.logger.error("Status Signal not initialized")
raise Exception("status Signal not initialized")
# force reading of fresh status from device
await self.status.read(False)
async for current_stat in observe_value(self.status):
# status is has type Tuple and is therefore transported as
# structured Numpy array ('f0':statuscode;'f1':status Message)
stat_code = current_stat["f0"]
# Module is in IDLE/WARN state
if IDLE <= stat_code < BUSY:
self.logger.info(f"Module {self.name} --> IDLE")
break
if hasattr(self, "_stopped"):
self.logger.info(f"Module {self.name} was stopped STOPPED")
if self._stopped is True:
break
# Error State or DISABLED
if hasattr(self, "_success"):
if stat_code >= ERROR or stat_code < IDLE:
self.logger.error(f"Module {self.name} --> ERROR/DISABLED")
self._success = False
break
# TODO add timeout
[docs]
def observe_status_change(self, monitored_status_code: int):
async def switch_from_status_inner():
async for current_stat in observe_value(self.status):
# status is has type Tuple and is therefore transported as
# structured Numpy array ('f0':statuscode;'f1':status Message)
stat_code = current_stat["f0"]
if monitored_status_code != stat_code:
break
def switch_from_status_factory():
return switch_from_status_inner()
yield from bps.wait_for([switch_from_status_factory])
[docs]
def trigger(self) -> AsyncStatus:
self.logger.info(f"Triggering {self.name}: read fresh data from device")
# get fresh reading of the value Parameter from the SEC Node
return AsyncStatus(
awaitable=self._secclient.get_parameter(
self._module, "value", trycache=False
)
)
[docs]
def subscribe(self, function: Callback[dict[str, Reading]]) -> None:
"""Subscribe to updates in the reading"""
self.value.subscribe(function=function)
[docs]
def clear_sub(self, function: Callback) -> None:
"""Remove a subscription."""
self.value.clear_sub(function=function)
[docs]
class SECoPTriggerableDevice(SECoPReadableDevice, Stoppable):
"""
Standard triggerable SECoP device, corresponding to a SECoP module with the0s
interface class "Triggerable"
"""
def __init__(
self,
secclient: AsyncFrappyClient,
module_name: str,
loglevel=logging.info,
logdir: str | None = None,
):
"""Initialize SECoPTriggerableDevice
:param secclient: SECoP client providing communication to the SEC Node
:type secclient: AsyncFrappyClient
:param module_name: ame of the SEC Node module that is represented by
this device
:type module_name: str
"""
self.go_CMD: SECoPCMDDevice
self._success = True
self._stopped = False
super().__init__(secclient, module_name, loglevel=loglevel, logdir=logdir)
async def __go_coro(self, wait_for_idle: bool):
await self._secclient.exec_command(module=self._module, command="go")
self._success = True
self._stopped = False
await asyncio.sleep(0.2)
if wait_for_idle:
await self.wait_for_idle()
[docs]
def wait_for_prepared(self):
yield from self.observe_status_change(IDLE)
yield from self.observe_status_change(PREPARING)
[docs]
def trigger(self) -> AsyncStatus:
self.logger.info(f"Triggering {self.name} go command")
async def go_or_read_on_busy():
module_status = await self.status.get_value(False)
stat_code = module_status["f0"]
if BUSY <= stat_code <= ERROR:
return
await self.__go_coro(True)
return AsyncStatus(awaitable=go_or_read_on_busy())
[docs]
async def stop(self, success=True):
"""Calls stop command on the SEC Node module
:param success:
True: device is stopped as planned
False: something has gone wrong
(defaults to True)
:type success: bool, optional
"""
self._success = success
self.logger.info(f"Stopping {self.name} success={success}")
await self._secclient.exec_command(self._module, "stop")
self._stopped = True
[docs]
class SECoPWritableDevice(SECoPReadableDevice):
"""Fast settable device target"""
pass
[docs]
class SECoPMoveableDevice(SECoPWritableDevice, Locatable, Stoppable):
"""
Standard movable SECoP device, corresponding to a SECoP module with the
interface class "Drivable"
"""
def __init__(
self,
secclient: AsyncFrappyClient,
module_name: str,
loglevel=logging.INFO,
logdir: str | None = None,
):
"""Initialize SECoPMovableDevice
:param secclient: SECoP client providing communication to the SEC Node
:type secclient: AsyncFrappyClient
:param module_name: ame of the SEC Node module that is represented by
this device
:type module_name: str
"""
self.target: SignalRW
super().__init__(secclient, module_name, loglevel=loglevel, logdir=logdir)
if not hasattr(self, "target"):
raise AttributeError(
"Attribute 'target' has not been assigned, "
+ "but is needed for 'Drivable' interface class!"
)
self._success = True
self._stopped = False
[docs]
def set(self, new_target, timeout: Optional[float] = None) -> AsyncStatus:
"""Sends new target to SEC Nonde and waits until module is IDLE again
:param new_target: new taget/setpoint for module
:type new_target: _type_
:param timeout: timeout for set operation, defaults to None
:type timeout: Optional[float], optional
:return: Asyncstatus that gets set to Done once module is IDLE again
:rtype: AsyncStatus
"""
coro = asyncio.wait_for(self._move(new_target), timeout=timeout)
return AsyncStatus(coro)
async def _move(self, new_target):
self._success = True
self._stopped = False
await self.target.set(new_target, wait=False)
self.logger.info(f"Moving {self.name} to {new_target}")
# force reading of status from device
await self.status.read(False)
# observe status and wait until dvice is IDLE again
async for current_stat in observe_value(self.status):
stat_code = current_stat["f0"]
if self._stopped is True:
break
# Error State or DISABLED
if stat_code >= ERROR or stat_code < IDLE:
self.logger.error(f"Module {self.name} --> ERROR/DISABLED")
self._success = False
break
# Module is in IDLE/WARN state
if IDLE <= stat_code < BUSY:
self.logger.info(f"Reached Target Module {self.name} --> IDLE")
break
# TODO other status transitions
if not self._success:
raise RuntimeError("Module was stopped")
[docs]
async def stop(self, success=True):
"""Calls stop command on the SEC Node module
:param success:
True: device is stopped as planned
False: something has gone wrong
(defaults to True)
:type success: bool, optional
"""
self._success = success
if not success:
self.logger.info(f"Stopping {self.name} success={success}")
await self._secclient.exec_command(self._module, "stop")
self._stopped = True
[docs]
async def locate(self) -> Location:
# return current location of the device (setpoint and readback).
# Only locally cached values are returned
setpoint = await self._secclient.get_parameter(self._module, "target", True)
readback = await self._secclient.get_parameter(self._module, "value", True)
location: Location = {
"setpoint": setpoint.value,
"readback": readback.value,
}
return location
[docs]
class SECoPNodeDevice(StandardReadable):
"""
Generates the root ophyd device from a Sec-node. Signals of this Device correspond
to the Sec-node properties
"""
name: str = ""
def __init__(
self,
sec_node_uri: str,
# `prefix` not used, it's just that the device connecter requires it for
# some reason.
prefix: str = "",
name: str = "",
loglevel: str = "INFO",
logdir: str | None = None,
):
"""Initializes the node device and generates all node signals and subdevices
corresponding to the SECoP-modules of the secnode
:param secclient: SECoP client providing communication to the SEC Node
:type secclient: AsyncFrappyClient
"""
self.host, self.port = sec_node_uri.rsplit(":", maxsplit=1)
self.logger: Logger = setup_logging(
name=f"frappy:{self.host}:{self.port}",
level=LOG_LEVELS[loglevel],
log_dir=logdir,
)
self.logdir = logdir
self.name = name
self.prefix = prefix
[docs]
async def connect(
self,
mock: bool | LazyMock = False,
timeout: float = DEFAULT_TIMEOUT,
force_reconnect: bool = False,
):
if not hasattr(self, "_secclient"):
secclient: AsyncFrappyClient
secclient = await AsyncFrappyClient.create(
host=self.host,
port=self.port,
loop=asyncio.get_running_loop(),
log=self.logger,
)
self.equipment_id: SignalR
self.description: SignalR
self.version: SignalR
self._secclient: AsyncFrappyClient = secclient
self._module_name: str = ""
self._node_cls_name: str = ""
self.mod_devices: Dict[str, SECoPReadableDevice] = {}
self.node_prop_devices: Dict[str, SignalR] = {}
self.genCode: GenNodeCode
if self.name == "":
self.name = self._secclient.properties[EQUIPMENT_ID].replace(".", "-")
self.name = self.prefix + self.name
config = []
self.logger.info(
"Initializing SECoPNodeDevice "
+ f"({self._secclient.host}:{self._secclient.port})"
)
with self.add_children_as_readables(
format=StandardReadableFormat.CONFIG_SIGNAL
):
for property in self._secclient.properties:
propb = PropertyBackend(
property, self._secclient.properties, secclient
)
setattr(self, property, SignalR(backend=propb))
config.append(getattr(self, property))
self.node_prop_devices[property] = getattr(self, property)
with self.add_children_as_readables(format=StandardReadableFormat.CHILD):
for module, module_desc in self._secclient.modules.items():
secop_dev_class = self.class_from_interface(
module_desc["properties"]
)
if secop_dev_class is not None:
setattr(
self,
module,
secop_dev_class(
self._secclient,
module,
loglevel=self.logger.level,
logdir=self.logdir,
),
)
self.mod_devices[module] = getattr(self, module)
# register secclient callbacks (these are useful if sec node description
# changes after a reconnect)
secclient.client.register_callback(
None, self.descriptiveDataChange, self.nodeStateChange
)
super().__init__(name=self.name)
elif force_reconnect or self._secclient.client.online is False:
await self._secclient.disconnect(True)
await self._secclient.connect(try_period=DEFAULT_TIMEOUT)
[docs]
def class_from_instance(self, path_to_module: str | None = None):
"""Dynamically generate python class file for the SECoP_Node_Device, this
allows autocompletion in IDEs and eases working with the generated Ophyd
devices
"""
# parse genClass file if already present
self.genCode = GenNodeCode(path=path_to_module, log=self._secclient.log)
self.genCode.add_import(self.__module__, self.__class__.__name__)
node_dict = self.__dict__
# NodeClass Name
self._node_cls_name = self.name.replace("-", "_").capitalize()
node_bases = [self.__class__.__name__, "ABC"]
node_class_attrs = []
for attr_name, attr_value in node_dict.items():
# Modules
if isinstance(
attr_value,
(
SECoPBaseDevice,
SECoPCommunicatorDevice,
SECoPReadableDevice,
SECoPWritableDevice,
SECoPMoveableDevice,
SECoPTriggerableDevice,
),
):
attr_type = type(attr_value)
module = str(getattr(attr_type, "__module__", None))
# add imports for module attributes
self.genCode.add_import(module, attr_type.__name__)
module_dict = attr_value.__dict__
# modclass is baseclass of derived class
mod_bases = [attr_value.__class__.__name__, "ABC"]
module_class_attrs = []
# Name for derived class
module_class_name = attr_name
if attr_value.impl is not None:
module_class_name = attr_value.impl.split(".").pop()
# Module:Acessibles
for module_attr_name, module_attr_value in module_dict.items():
if isinstance(
module_attr_value,
(SignalR, SignalX, SignalRW, SignalR, SECoPCMDDevice),
):
# add imports for module attributes
self.genCode.add_import(
module_attr_value.__module__,
type(module_attr_value).__name__,
)
module_class_attrs.append(
(module_attr_name, type(module_attr_value).__name__)
)
self.genCode.add_mod_class(
module_class_name, mod_bases, module_class_attrs, attr_value.plans
)
node_class_attrs.append((attr_name, module_class_name))
# Poperty Signals
if isinstance(attr_value, (SignalR)):
self.genCode.add_import(
attr_value.__module__, type(attr_value).__name__
)
node_class_attrs.append((attr_name, attr_value.__class__.__name__))
self.genCode.add_node_class(self._node_cls_name, node_bases, node_class_attrs)
self.genCode.write_gen_node_class_file()
[docs]
def descriptiveDataChange(self, module, description): # noqa: N802
"""called when the description has changed
this callback is called on the node with module=None
and on every changed module with module==<module name>
:param module: module name of the module that has changes
:type module: _type_
:param description: new Node description string
:type description: _type_
"""
# TODO this functionality is untested and will probably break the generated
# ophyd device since a changed module description would lead a newly
# instanciated module object while references to the old one are broken
# mitigation: alway call methods via:
#
# 'node_obj.module_obj.method()'
self._secclient.conn_timestamp = ttime.time()
if module is None:
# Refresh signals that correspond to Node Properties
config = []
for property in self._secclient.properties:
propb = PropertyBackend(
property, self._secclient.properties, self._secclient
)
setattr(self, property, SignalR(backend=propb))
config.append(getattr(self, property))
self.add_readables(config, format=StandardReadableFormat.CONFIG_SIGNAL)
else:
# Refresh changed modules
module_desc = self._secclient.modules[module]
secop_dev_class = self.class_from_interface(module_desc["properties"])
setattr(self, module, secop_dev_class(self._secclient, module))
# TODO what about removing Modules during disconn
[docs]
def nodeStateChange(self, online, state): # noqa: N802
"""called when the state of the connection changes
'online' is True when connected or reconnecting, False when disconnected
or connecting 'state' is the connection state as a string
"""
if state == "connected" and online is True:
self._secclient.conn_timestamp = ttime.time()
[docs]
def class_from_interface(self, mod_properties: dict):
ophyd_class = None
# infer highest level IF class
module_interface_classes: dict = mod_properties[INTERFACE_CLASSES]
for interface_class in IF_CLASSES.keys():
if interface_class in module_interface_classes:
ophyd_class = IF_CLASSES[interface_class]
break
# No predefined IF class was a match --> use base class (loose collection of
# accessibles)
if ophyd_class is None:
ophyd_class = SECoPBaseDevice # type: ignore
return ophyd_class
IF_CLASSES = {
"Triggerable": SECoPTriggerableDevice,
"Drivable": SECoPMoveableDevice,
"Writable": SECoPWritableDevice,
"Readable": SECoPReadableDevice,
"Communicator": SECoPCommunicatorDevice,
}
SECOP_TO_NEXUS_TYPE = {
"double": "NX_FLOAT64",
"int": "NX_INT64",
"scaled": "NX_FLOAT64",
}
ALL_IF_CLASSES = set(IF_CLASSES.values())
# TODO
# FEATURES = {
# 'HasLimits': SecopHasLimits,
# 'HasOffset': SecopHasOffset,
# }