from __future__ import annotations
import copy
import time
import warnings
from abc import ABC, abstractmethod
from functools import reduce
from itertools import chain
from typing import Any, List, Union
import numpy as np
from bluesky.protocols import Reading
from event_model import DataKey
from frappy.client import CacheItem
from frappy.datatypes import (
ArrayOf,
BLOBType,
BoolType,
DataType,
EnumType,
FloatRange,
IntRange,
ScaledInteger,
StringType,
StructOf,
TupleOf,
)
from ophyd_async.core._utils import StrictEnum
from typing_extensions import NotRequired
SCALAR_DATATYPES = (
IntRange,
BoolType,
FloatRange,
ScaledInteger,
StringType,
EnumType,
)
[docs]
class SECoPDataKey(DataKey):
"""
A DataKey that is used to describe the SECoP Datatype.
"""
dtype_str: NotRequired[str] # type: ignore
"""
The array-protocol typestring of the data-type object.
"""
dtype_descr: NotRequired[list] # type: ignore
"""
String representation of the numpy structured array dtype.
"""
SECOP_datainfo: str
"""
String representation of the original secop datatype.
"""
SECoP_dtype: NotRequired[str] # type: ignore
"""
The SECoP datatype of the data.
"""
[docs]
class NestedRaggedArray(Exception):
"""The Datatype contains nested ragged arrays"""
[docs]
def deep_get(dictionary, keys, default=None) -> dict:
def get_val(obj, key, default):
if isinstance(obj, dict):
return obj.get(key, default)
if isinstance(obj, list):
return obj[key]
if isinstance(obj, tuple):
return obj[key]
return default
return reduce(lambda d, key: get_val(d, key, default), keys, dictionary)
[docs]
class Path:
def __init__(self, parameter_name: str, module_name: str) -> None:
self._accessible_name = parameter_name
self._module_name = module_name
self._last_named_param: int | None = None
self._dev_path: List[Union[str, int]] = []
# Path is extended
[docs]
def append(self, elem: Union[str, int]) -> Path:
new_path = copy.deepcopy(self)
if isinstance(elem, str):
new_path._last_named_param = len(new_path._dev_path)
new_path._dev_path.append(elem)
return new_path
[docs]
def get_param_path(self):
return {"module": self._module_name, "parameter": self._accessible_name}
[docs]
def get_path_tuple(self):
return (self._module_name, self._accessible_name)
# TODO typing of return value
[docs]
def get_memberinfo_path(self):
# inserting K after every Nth number
# using itertool.chain()
# insert element
k = "members"
# insert after every other element
n = 1
# using itertool.chain()
# inserting K after every Nth number
return list(
chain(
*[
(
[k] + self._dev_path[i : i + n] # type: ignore
if len(self._dev_path[i : i + n]) == n
else self._dev_path[i : i + n]
)
for i in range(0, len(self._dev_path), n)
]
)
)
[docs]
def get_signal_name(self):
# top level: signal name == Parameter name
if self._dev_path == []:
return self._accessible_name
sig_postfix = self._dev_path[self._last_named_param :]
if self._last_named_param is None:
sig_postfix = [self._accessible_name] + sig_postfix # type: ignore
delim = "-"
return delim.join(map(str, sig_postfix))
[docs]
def get_param_desc_path(self):
return [self._module_name, "parameters", self._accessible_name]
[docs]
def get_cmd_desc_path(self):
return [self._module_name, "commands", self._accessible_name]
[docs]
def get_leaf(self):
if self._dev_path == []:
return None
return self._dev_path[-1]
[docs]
def insert_val(self, dic: dict, new_val):
if self._dev_path == []:
return dic
d = dic
for key in self._dev_path[:-1]:
if key in d:
d = d[key]
else:
# wrong path
raise Exception(
"path is incorrect " + str(key) + " is not in dict: " + str(dic)
)
# insert new value
if self._dev_path[-1] in d:
d[self._dev_path[-1]] = new_val
else:
# wrong path
raise Exception(
"path is incorrect " + str(key) + " is not in dict: " + str(dic)
)
return dic
[docs]
class DtypeNP(ABC):
secop_dtype: DataType
name: str | None
array_element: bool = False
max_depth: int = 0
[docs]
@abstractmethod
def make_numpy_dtype(self) -> tuple:
"""Create Numpy Compatible structured Datatype"""
[docs]
@abstractmethod
def make_concrete_numpy_dtype(self, value) -> tuple:
"""Create Numpy Compatible structured Datatype from a concrete data value"""
[docs]
@abstractmethod
def make_numpy_compatible_list(self, value) -> Any:
"""make a make a list that is ready for numpy array import"""
[docs]
@abstractmethod
def make_secop_compatible_object(self, value) -> Any:
"""make a make an SECoP Compatible Object"""
[docs]
def dt_factory(
secop_dt: DataType, name: str = "", array_element: bool = False
) -> DtypeNP:
dt_class = secop_dt.__class__
dt_converters = {
StructOf: StructNP,
TupleOf: TupleNP,
ArrayOf: ArrayNP,
IntRange: IntNP,
FloatRange: FloatNP,
ScaledInteger: ScaledIntNP,
BLOBType: BLOBNP,
BoolType: BoolNP,
EnumType: EnumNP,
StringType: StringNP,
}
return dt_converters[dt_class](secop_dt, name, array_element) # type: ignore
STR_LEN_DEFAULT = 100
[docs]
class BLOBNP(DtypeNP):
def __init__(
self, blob_dt: BLOBType, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype: BLOBType = blob_dt
self.array_element = array_element
[docs]
def make_numpy_dtype(self) -> tuple:
return (self.name, "U" + str(self.secop_dtype.maxbytes))
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
return (self.name, "U" + str(self.secop_dtype.maxbytes))
[docs]
def make_numpy_compatible_list(self, value: str):
return value
[docs]
def make_secop_compatible_object(self, value) -> Any:
return value
[docs]
class BoolNP(DtypeNP):
def __init__(
self, bool_dt: BoolType, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype: BoolType = bool_dt
self.array_element = array_element
[docs]
def make_numpy_dtype(self) -> tuple:
return (self.name, "<b1")
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
return self.make_numpy_dtype()
[docs]
def make_numpy_compatible_list(self, value: bool):
return value
[docs]
def make_secop_compatible_object(self, value) -> Any:
return value
[docs]
class EnumNP(DtypeNP):
def __init__(
self, enum_dt: EnumType, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype: EnumType = enum_dt
self.array_element = array_element
[docs]
def make_numpy_dtype(self) -> tuple:
return (self.name, "<i8")
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
return self.make_numpy_dtype()
[docs]
def make_numpy_compatible_list(self, value: int):
return value
[docs]
def make_secop_compatible_object(self, value) -> Any:
return value
[docs]
class FloatNP(DtypeNP):
def __init__(
self, float_dt: FloatRange, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype: FloatRange = float_dt
self.array_element = array_element
[docs]
def make_numpy_dtype(self) -> tuple:
return (self.name, "<f8")
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
return self.make_numpy_dtype()
[docs]
def make_numpy_compatible_list(self, value: float):
return value
[docs]
def make_secop_compatible_object(self, value) -> Any:
return value
[docs]
class IntNP(DtypeNP):
def __init__(
self, int_dt: IntRange, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype: IntRange = int_dt
self.array_element = array_element
[docs]
def make_numpy_dtype(self) -> tuple:
return (self.name, "<i8")
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
return self.make_numpy_dtype()
[docs]
def make_numpy_compatible_list(self, value: int):
return value
[docs]
def make_secop_compatible_object(self, value) -> Any:
return value
[docs]
class ScaledIntNP(DtypeNP):
def __init__(
self, scaled_int_dt: ScaledInteger, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype: ScaledInteger = scaled_int_dt
self.array_element = array_element
[docs]
def make_numpy_dtype(self) -> tuple:
return (self.name, "<i8")
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
return self.make_numpy_dtype()
[docs]
def make_numpy_compatible_list(self, value: int):
return value
[docs]
def make_secop_compatible_object(self, value) -> Any:
return value
[docs]
class StringNP(DtypeNP):
def __init__(
self, string_dt: StringType, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype: StringType = string_dt
self.array_element = array_element
if string_dt.maxchars == 1 << 64:
# warnings.warn(
# "maxchars was not set, default max char lenght is set to: "
# + str(STR_LEN_DEFAULT)
# )
self.strlen = STR_LEN_DEFAULT
else:
self.strlen = string_dt.maxchars
[docs]
def make_numpy_dtype(self) -> tuple:
return (self.name, "<U" + str(self.strlen))
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
return (self.name, "<U" + str(self.strlen))
[docs]
def make_numpy_compatible_list(self, value: str):
return value
[docs]
def make_secop_compatible_object(self, value) -> Any:
return value
[docs]
class StructNP(DtypeNP):
def __init__(
self, struct_dt: StructOf, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype: StructOf = struct_dt
self.array_element = array_element
self.members: dict[str, DtypeNP] = {
name: dt_factory(member, name, self.array_element)
for (name, member) in struct_dt.members.items()
}
max_depth = [member.max_depth for member in self.members.values()]
self.max_depth = 1 + max(max_depth)
[docs]
def make_numpy_dtype(self) -> tuple:
dt_list = []
for member in self.members.values():
dt_list.append(member.make_numpy_dtype())
return (self.name, dt_list)
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
dt_list = []
for key, member in self.members.items():
member_val = value[key]
dt_list.append(member.make_concrete_numpy_dtype(member_val))
return (self.name, dt_list)
[docs]
def make_numpy_compatible_list(self, value: dict):
return tuple(
[
self.members[name].make_numpy_compatible_list(value[name])
for name in self.members.keys()
]
)
[docs]
def make_secop_compatible_object(self, value) -> Any:
return {
member: np_dtype.make_secop_compatible_object(value=value[member])
for (member, np_dtype) in self.members.items()
}
[docs]
class TupleNP(DtypeNP):
def __init__(
self, tuple_dt: TupleOf, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype = tuple_dt
self.array_element = array_element
self.members: list[DtypeNP] = [
dt_factory(
secop_dt=member, name="f" + str(idx), array_element=self.array_element
)
for idx, member in enumerate(tuple_dt.members)
]
max_depth = [member.max_depth for member in self.members]
self.max_depth = 1 + max(max_depth)
[docs]
def make_numpy_dtype(self) -> tuple:
dt_list = []
for member in self.members:
dt_list.append(member.make_numpy_dtype())
return (self.name, dt_list)
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
dt_list = []
for index, member in enumerate(self.members):
member_val = value[index]
dt_list.append(member.make_concrete_numpy_dtype(member_val))
return (self.name, dt_list)
[docs]
def make_numpy_compatible_list(self, value: tuple):
return tuple(
[
self.members[idx].make_numpy_compatible_list(value[idx])
for idx, member in enumerate(self.members)
]
)
[docs]
def make_secop_compatible_object(self, value) -> Any:
return tuple(
[
np_dtype.make_secop_compatible_object(value=value["f" + str(idx)])
for idx, np_dtype in enumerate(self.members)
]
)
[docs]
class ArrayNP(DtypeNP):
def __init__(
self, array_dt: ArrayOf, name: str = "", array_element: bool = False
) -> None:
self.name: str = name
self.secop_dtype = array_dt
self.maxlen = array_dt.maxlen
self.minlen = array_dt.minlen
self.array_element = array_element
self.ragged: bool = self.minlen != self.maxlen
self.shape = [self.maxlen]
self.members: DtypeNP = dt_factory(array_dt.members, array_element=True)
self.root_type: DtypeNP
self.max_depth = self.members.max_depth
if isinstance(self.members, ArrayNP):
self.shape.extend(self.members.shape)
self.root_type = self.members.root_type
self.members.shape = []
if self.members.ragged:
pass
# raise NestedRaggedArray(
# "ragged arrays with more than a single dimension are not supported"
# )int
else:
self.root_type = self.members
if self.array_element and self.ragged:
warnings.warn(
"ragged arrays inside of arrays of copmposite datatypes (struct/tuple)"
"are not supported"
)
[docs]
def get_root_np_str(self) -> str:
dtype_list = self.root_type.make_numpy_dtype()
return dtype_list[1]
[docs]
def make_numpy_dtype(self) -> tuple:
if self.shape == []:
return self.members.make_numpy_dtype()
else:
return (self.name, list(self.members.make_numpy_dtype()).pop(), self.shape)
[docs]
def make_concrete_numpy_dtype(self, value) -> tuple:
if self.ragged is False:
return (
self.name,
list(self.members.make_concrete_numpy_dtype(value)).pop(),
self.shape,
)
else:
if value == []:
member_np = self.members.make_concrete_numpy_dtype(None)
val_shape = [0]
elif value is None:
member_np = self.members.make_concrete_numpy_dtype(None)
val_shape = []
else:
member_np = self.members.make_concrete_numpy_dtype(value[0])
val_shape = [len(value)]
if isinstance(self.members, ArrayNP):
val_shape = val_shape + member_np[2]
return (self.name, member_np[1], val_shape)
[docs]
def make_numpy_compatible_list(self, value: list):
return [self.members.make_numpy_compatible_list(elem) for elem in value]
[docs]
def make_secop_compatible_object(self, value: np.ndarray) -> Any:
return [
self.members.make_secop_compatible_object(value[idx])
for idx in range(0, self.maxlen)
]
[docs]
class SECoPdtype:
def __init__(self, datatype: DataType) -> None:
self.raw_dtype: DataType = datatype
# Describe Fields ------------------------
self.dtype: str
# The array-protocol typestring of the data-type object.
self.dtype_str: str
# String representation of the numpy structured array dtype
self.dtype_descr: list
# string representation of the original secop datatype
self.secop_dtype_str = str(datatype.export_datatype())
# Shape of Data
self.shape = []
self.np_datatype: Any
# Describe Fields ------------------------
self.numpy_dtype: np.dtype
self.dtype_tree: DtypeNP
self._is_composite: bool = False
self._is_array: bool = False
self.dtype_tree = dt_factory(datatype)
self.max_depth: int = self.dtype_tree.max_depth
if isinstance(self.dtype_tree, ArrayNP):
self.shape = self.dtype_tree.shape
self._is_array = True
self._is_composite = (
True
if isinstance(self.dtype_tree.root_type, (StructNP, TupleNP))
else False
)
if isinstance(self.dtype_tree, (TupleNP, StructNP)):
self._is_composite = True
# Composite Datatypes & Arrays of COmposite Datatypes
if self._is_composite:
dt = self.dtype_tree.make_numpy_dtype()
# Top level elements are not named and shape is
# already covered by the shape var
dt = dt[1]
self.numpy_dtype = np.dtype(dt)
# all composite Dtypes are transported as numpy arrays
self.dtype = "array"
self.dtype_str = self.numpy_dtype.str
self.dtype_descr = self.numpy_dtype.descr
self.np_datatype = np.ndarray
# Scalar atomic Datatypes and arrays of atomic dataypes
else:
if self._is_array:
# root secop datatype that is contained in the array
self.dtype = "array"
self.np_datatype = np.ndarray
# Primitive datatypes
else:
self.np_datatype = SECOP2DTYPE[datatype.__class__][0]
self.dtype = SECOP2DTYPE[datatype.__class__][1]
[docs]
def get_datakey(self) -> dict:
describe_dict: dict = {}
# Composite Datatypes & Arrays of COmposite Datatypes
if self._is_composite:
describe_dict["dtype_str"] = self.dtype_str
# describe_dict["dtype_numpy"] = self.dtype_descr
describe_dict["dtype_descr"] = self.dtype_descr
if isinstance(self.dtype_tree, ArrayNP):
describe_dict["dtype_numpy"] = self.dtype_tree.get_root_np_str()
describe_dict["dtype"] = self.dtype
describe_dict["shape"] = self.shape
describe_dict["SECOP_datainfo"] = self.secop_dtype_str
return describe_dict
def _secop2numpy_array(self, value) -> np.ndarray:
np_list = self.dtype_tree.make_numpy_compatible_list(value)
return np.array(np_list, dtype=self.numpy_dtype)
[docs]
def secop2val(self, reading_val) -> Any:
if self._is_composite:
return self._secop2numpy_array(reading_val)
else:
return reading_val
[docs]
def val2secop(self, input_val) -> Any:
# TODO check input_Val for conformity with datatype
# TODO check if it is already in SECoP Format
if self._is_composite and isinstance(input_val, np.ndarray):
return self.dtype_tree.make_secop_compatible_object(input_val)
else:
return self.raw_dtype.validate(input_val)
[docs]
def update_dtype(self, input_val):
if self._is_composite:
# Composite Datatypes & Arrays of Composite Datatypes
dt = self.dtype_tree.make_concrete_numpy_dtype(input_val)
# Top level elements are not named and shape is
# already covered by the shape var
inner_dt = dt[1]
if isinstance(self.raw_dtype, ArrayOf):
self.shape = dt[2]
self.numpy_dtype = np.dtype(inner_dt)
self.dtype_str = self.numpy_dtype.str
self.dtype_descr = self.numpy_dtype.descr
return
if isinstance(self.raw_dtype, ArrayOf):
dt = self.dtype_tree.make_concrete_numpy_dtype(input_val)
self.shape = dt[2]
[docs]
class SECoPReading:
def __init__(
self,
secop_dt: SECoPdtype,
entry: CacheItem | None = None,
) -> None:
self.secop_dt: SECoPdtype = secop_dt
if entry is None:
self.timestamp: float = time.time()
self.value = None
self.readerror = None
return
if entry.readerror is not None:
raise entry.readerror
exported_val = secop_dt.raw_dtype.export_value(entry.value)
self.secop_dt.update_dtype(exported_val)
self.value = secop_dt.secop2val(exported_val)
self.secop_val = exported_val
self.timestamp = entry.timestamp
[docs]
def get_reading(self) -> Reading:
return {"value": self.value, "timestamp": self.timestamp}
[docs]
def get_value(self):
return self.value
[docs]
def get_secop_value(self):
return self.secop_val
[docs]
def set_reading(self, value) -> None:
self.value = value
self.secop_val = self.secop_dt.val2secop(value)
self.timestamp = time.time()
SECOP2DTYPE = {
FloatRange: (float, "number"),
IntRange: (int, "integer"),
ScaledInteger: (int, "integer"),
BoolType: (bool, "boolean"),
EnumType: (StrictEnum, "string"),
StringType: (str, "string"),
BLOBType: (str, "string"),
}