Source code for secop_ophyd.SECoPSignal

import asyncio
import warnings
from functools import wraps
from typing import Any, Callable, Dict

from bluesky.protocols import DataKey, Reading
from frappy.client import CacheItem
from frappy.datatypes import (
    ArrayOf,
    BLOBType,
    BoolType,
    DataType,
    FloatRange,
    IntRange,
    ScaledInteger,
    StringType,
    StructOf,
    TupleOf,
)
from ophyd_async.core import Callback, SignalBackend, SignalDatatypeT

from secop_ophyd.AsyncFrappyClient import AsyncFrappyClient
from secop_ophyd.util import Path, SECoPDataKey, SECoPdtype, SECoPReading, deep_get

atomic_dtypes = (
    StringType,
    ScaledInteger,
    IntRange,
    FloatRange,
    BoolType,
    BLOBType,
    ArrayOf,
)


# max depth for datatypes supported by tiled/databroker
MAX_DEPTH = 1


[docs] class LocalBackend(SignalBackend): """Class for the 'argument' and 'result' Signal backends of a SECoP_CMD_Device. These Signals act as a local cache for storing the command argument and result. """ def __init__( self, path: Path, secop_dtype_obj: DataType, sig_datainfo: dict ) -> None: """Initialize SECoP_CMD_IO_Backend :param path: Path to the command in the secclient module dict :type path: Path :param SECoPdtype_obj: detailed SECoP datatype object for bidirectional conversion between JSON to and numpy arrays :type SECoPdtype_obj: DataType :param sig_datainfo: SECoP datainfo string of the value represented by the signal :type sig_datainfo: dict """ self.SECoP_type_info: SECoPdtype = SECoPdtype(secop_dtype_obj) self.reading: SECoPReading = SECoPReading( secop_dt=self.SECoP_type_info, entry=None ) # module:acessible Path for reading/writing (module,accessible) self.path: Path = path # Root datainfo or memberinfo for nested datatypes # TODO check if this is really needed self.datainfo: dict = sig_datainfo self.callback: Callback | None = None self.SECoPdtype_obj: DataType = secop_dtype_obj self.describe_dict: dict self.source_name = self.path._module_name + ":" + self.path._accessible_name self.describe_dict = {} self.describe_dict["source"] = self.source("", True) self.describe_dict.update(self.SECoP_type_info.get_datakey()) for property_name, prop_val in self.datainfo.items(): if property_name == "type": property_name = "SECoP_dtype" self.describe_dict[property_name] = prop_val super().__init__(datatype=self.SECoP_type_info.np_datatype)
[docs] def source(self, name: str, read: bool) -> str: return self.source_name
[docs] async def connect(self, timeout: float): pass
[docs] async def put(self, value: Any | None, wait=True): self.reading.set_reading(self.SECoP_type_info.val2secop(value)) if self.callback is not None: self.callback(self.reading.get_reading())
[docs] async def get_datakey(self, source: str) -> DataKey: """Metadata like source, dtype, shape, precision, units""" return describedict_to_datakey(self.describe_dict)
[docs] async def get_reading(self) -> Reading[SignalDatatypeT]: return self.reading.get_reading()
[docs] async def get_value(self) -> SignalDatatypeT: return self.reading.get_value()
[docs] async def get_setpoint(self) -> SignalDatatypeT: return await self.get_value()
[docs] def set_callback(self, callback: Callback[Reading[SignalDatatypeT]] | None) -> None: self.callback = callback # type: ignore[assignment]
# TODO add return of Asyncstatus
[docs] class SECoPXBackend(SignalBackend): """ Signal backend for SignalX of a SECoP_CMD_Device, that handles command execution """ def __init__( self, path: Path, secclient: AsyncFrappyClient, argument: LocalBackend | None, result: LocalBackend | None, ) -> None: """Initializes SECoP_CMD_X_Backend :param path: Path to the command in the secclient module dict :type path: Path :param secclient: SECoP client providing communication to the SEC Node :type secclient: AsyncFrappyClient :param argument: Refence to Argument Signal :type argument: SECoP_CMD_IO_Backend | None :param result: Reference to Result Signal :type result: SECoP_CMD_IO_Backend | None """ self._secclient: AsyncFrappyClient = secclient # module:acessible Path for reading/writing (module,accessible) self.path: Path = path self.callback: Callable self.argument: LocalBackend | None = argument self.result: LocalBackend | None = result self.source_name = self.path._module_name + ":" + self.path._accessible_name super().__init__(datatype=None)
[docs] def source(self, name: str, read: bool) -> str: return self.source_name
[docs] async def connect(self, timeout: float): pass
[docs] async def put(self, value: Any | None, wait=True): if self.argument is None: argument = None else: argument = await self.argument.get_value() res, qualifiers = await asyncio.wait_for( fut=self._secclient.exec_command( module=self.path._module_name, command=self.path._accessible_name, argument=argument, ), timeout=None, ) # write return Value to corresponding Backend if self.result is None: return else: val = self.result.SECoP_type_info.secop2val(res) await self.result.put(val)
[docs] async def get_datakey(self, source: str) -> DataKey: """Metadata like source, dtype, shape, precision, units""" return DataKey(shape=[], dtype="string", source=self.source("", True))
[docs] async def get_reading(self) -> Reading[SignalDatatypeT]: raise Exception( "Cannot read _x Signal, it has no value and is only" + " used to trigger Command execution" )
[docs] async def get_value(self) -> SignalDatatypeT: raise Exception( "Cannot read _x Signal, it has no value and is only" + " used to trigger Command execution" )
[docs] def set_callback(self, callback: Callback[Reading[SignalDatatypeT]] | None) -> None: pass
[docs] async def get_setpoint(self) -> SignalDatatypeT: raise Exception( "Cannot read _x Signal, it has no value and is only" + " used to trigger Command execution" )
[docs] class SECoPParamBackend(SignalBackend): """Standard backend for a Signal that represents SECoP Parameter""" def __init__(self, path: Path, secclient: AsyncFrappyClient) -> None: """_summary_ :param path: Path to the parameter in the secclient module dict :type path: Path :param secclient: SECoP client providing communication to the SEC Node :type secclient: AsyncFrappyClient """ # secclient self._secclient: AsyncFrappyClient = secclient # module:acessible Path for reading/writing (module,accessible) self.path: Path = path self._param_description: dict = self._get_param_desc() # Root datainfo or memberinfo for nested datatypes self.datainfo: dict = deep_get( self._param_description["datainfo"], self.path.get_memberinfo_path() ) self.readonly = self._param_description.get("readonly") self.SECoPdtype_str: str self.SECoPdtype_obj: DataType = self._param_description["datatype"] self.SECoP_type_info: SECoPdtype = SECoPdtype(self.SECoPdtype_obj) if self.SECoP_type_info.max_depth > MAX_DEPTH: warnings.warn( f"The datatype of parameter '{path._accessible_name}' has a maximum " f"depth of {self.SECoP_type_info.max_depth}. Tiled & Databroker only " f"support a Depth upto {MAX_DEPTH} " f"dtype_descr: {self.SECoP_type_info.dtype_descr}" ) self.describe_dict: dict = {} self.source_name = ( secclient.uri + ":" + secclient.nodename + ":" + self.path._module_name + ":" + self.path._accessible_name ) # SECoP metadata is static and can only change when connection is reset self.describe_dict = {} self.describe_dict["source"] = self.source_name # add gathered keys from SECoPdtype: self.describe_dict.update(self.SECoP_type_info.get_datakey()) for property_name, prop_val in self._param_description.items(): # skip datainfo (treated seperately) if property_name == "datainfo" or property_name == "datatype": continue self.describe_dict[property_name] = prop_val for property_name, prop_val in self.datainfo.items(): if property_name == "type": property_name = "SECoP_dtype" if property_name == "unit": property_name = "units" self.describe_dict[property_name] = prop_val super().__init__(datatype=self.SECoP_type_info.np_datatype)
[docs] def source(self, name: str, read: bool) -> str: return self.source_name
[docs] async def connect(self, timeout: float): pass
[docs] async def put(self, value: Any | None, wait=True): # convert to frappy compatible Format secop_val = self.SECoP_type_info.val2secop(value) # frappy client has no ability to just send a secop message without # waiting for a reply await asyncio.wait_for( self._secclient.set_parameter(**self.get_param_path(), value=secop_val), timeout=None, )
[docs] async def get_datakey(self, source: str) -> DataKey: """Metadata like source, dtype, shape, precision, units""" if self.SECoP_type_info._is_composite or isinstance( self.SECoPdtype_obj, ArrayOf ): # getlast cached value dataset = await self._secclient.get_parameter( **self.get_param_path(), trycache=True ) # this ensures the datakey is updated to the latest cached value SECoPReading(entry=dataset, secop_dt=self.SECoP_type_info) self.describe_dict.update(self.SECoP_type_info.get_datakey()) return describedict_to_datakey(self.describe_dict)
[docs] async def get_reading(self) -> Reading[SignalDatatypeT]: dataset = await self._secclient.get_parameter( **self.get_param_path(), trycache=True ) sec_reading = SECoPReading(entry=dataset, secop_dt=self.SECoP_type_info) return sec_reading.get_reading()
[docs] async def get_value(self) -> SignalDatatypeT: dataset: Reading = await self.get_reading() return dataset["value"] # type: ignore
[docs] async def get_setpoint(self) -> SignalDatatypeT: return await self.get_value()
[docs] def set_callback(self, callback: Callback[Reading[SignalDatatypeT]] | None) -> None: def awaitify(sync_func): """Wrap a synchronous callable to allow ``await``'ing it""" @wraps(sync_func) async def async_func(*args, **kwargs): return sync_func(*args, **kwargs) return async_func def updateItem(module, parameter, entry: CacheItem): # noqa: N802 data = SECoPReading(secop_dt=self.SECoP_type_info, entry=entry) async_callback = awaitify(callback) asyncio.run_coroutine_threadsafe( async_callback(reading=data.get_reading()), self._secclient.loop, ) if callback is not None: self._secclient.register_callback(self.get_path_tuple(), updateItem) else: self._secclient.unregister_callback(self.get_path_tuple(), updateItem)
def _get_param_desc(self) -> dict: return deep_get(self._secclient.modules, self.path.get_param_desc_path())
[docs] def get_param_path(self): return self.path.get_param_path()
[docs] def get_path_tuple(self): return self.path.get_path_tuple()
[docs] def get_unit(self): return self.describe_dict.get("units", None)
[docs] def is_number(self) -> bool: if ( self.describe_dict["dtype"] == "number" or self.describe_dict["dtype"] == "integer" ): return True return False
[docs] class PropertyBackend(SignalBackend): """Readonly backend for static SECoP Properties of Nodes/Modules""" def __init__( self, prop_key: str, property_dict: Dict[str, Any], secclient: AsyncFrappyClient ) -> None: """Initializes PropertyBackend :param prop_key: Name of Property :type prop_key: str :param propertyDict: Dicitonary containing all properties of Node/Module :type propertyDict: Dict[str, T] :param secclient: SECoP client providing communication to the SEC Node :type secclient: AsyncFrappyClient """ # secclient self._property_dict = property_dict self._prop_key = prop_key self._prop_value = self._property_dict[self._prop_key] self.SECoPdtype_obj: DataType = secop_dtype_obj_from_json(self._prop_value) self.SECoP_type_info: SECoPdtype = SECoPdtype(self.SECoPdtype_obj) if self.SECoP_type_info.max_depth > MAX_DEPTH: warnings.warn( f"The datatype of parameter '{prop_key}' has a maximum" f"depth of {self.SECoP_type_info.max_depth}. Tiled & Databroker only" f"support a Depth upto {MAX_DEPTH}" f"dtype_descr: {self.SECoP_type_info.dtype_descr}" ) # SECoP metadata is static and can only change when connection is reset self.describe_dict = {} self.source_name = prop_key self.describe_dict["source"] = self.source_name # add gathered keys from SECoPdtype: self.describe_dict.update(self.SECoP_type_info.get_datakey()) self._secclient: AsyncFrappyClient = secclient # TODO full property path super().__init__(datatype=self.SECoP_type_info.np_datatype)
[docs] def source(self, name: str, read: bool) -> str: return str(self.source_name)
[docs] async def connect(self, timeout: float): """Connect to underlying hardware""" pass
[docs] async def put(self, value: SignalDatatypeT | None, wait=True): """Put a value to the PV, if wait then wait for completion for up to timeout""" # Properties are readonly pass
[docs] async def get_datakey(self, source: str) -> DataKey: """Metadata like source, dtype, shape, precision, units""" return describedict_to_datakey(self.describe_dict)
[docs] async def get_reading(self) -> Reading[SignalDatatypeT]: dataset = CacheItem( value=self._prop_value, timestamp=self._secclient.conn_timestamp ) sec_reading = SECoPReading(entry=dataset, secop_dt=self.SECoP_type_info) return sec_reading.get_reading()
[docs] async def get_value(self) -> SignalDatatypeT: dataset: Reading = await self.get_reading() return dataset["value"] # type: ignore
[docs] async def get_setpoint(self) -> SignalDatatypeT: return await self.get_value()
[docs] def set_callback(self, callback: Callback[Reading[SignalDatatypeT]] | None) -> None: pass
[docs] def secop_dtype_obj_from_json(prop_val): if isinstance(prop_val, str): return StringType() if isinstance(prop_val, (int, float)): return FloatRange() if isinstance(prop_val, bool): return BoolType() if isinstance(prop_val, dict): # SECoP Structs/tuples --> numpy ndarray members = {} for key, elem in prop_val.items(): members[key] = secop_dtype_obj_from_json(elem) return StructOf(**members) if isinstance(prop_val, list): # empty list, cannot infer proper type if not prop_val: return ArrayOf(FloatRange()) # check if all elements have same Type: if all(isinstance(elem, type(prop_val[0])) for elem in prop_val): members = secop_dtype_obj_from_json(prop_val[0]) return ArrayOf(members) else: members = [] # type: ignore for elem in prop_val: members.append(secop_dtype_obj_from_json(elem)) # type: ignore return TupleOf(*members) raise Exception( f"""unsupported datatype in Property: {str(prop_val.__class__.__name__)}\n propval: {prop_val}""" )
[docs] def describedict_to_datakey(describe_dict: dict) -> SECoPDataKey: """Convert a DataKey to a SECoPDataKey""" datakey = SECoPDataKey( dtype=describe_dict["dtype"], shape=describe_dict["shape"], source=describe_dict["source"], SECOP_datainfo=describe_dict["SECOP_datainfo"], ) if "units" in describe_dict: datakey["units"] = describe_dict["units"] if "dtype_str" in describe_dict: datakey["dtype_str"] = describe_dict["dtype_str"] if "dtype_descr" in describe_dict: datakey["dtype_descr"] = describe_dict["dtype_descr"] if "dtype_numpy" in describe_dict: datakey["dtype_numpy"] = describe_dict["dtype_numpy"] return datakey