Tutorial: Gas Dosing Demo#

This tutorial demonstrates the full capabilities of SECoP-Ophyd integration using a simulated gas dosing system. We’ll walk through connecting to SECoP nodes, generating device classes, and building complex experimental plans.

Demo Setup#

The demo uses Docker containers running SECoP nodes for a gas dosing system and reactor cell. If you’re using the secop-sim demo repository, you can start the simulation with:

make sim      # Start Docker containers with SEC nodes
make frappy   # Start containers and Frappy GUI

The demo creates two SECoP nodes:

  • gas_dosing on port 10801 - Controls mass flow controllers for different gases

  • reactor_cell on port 10802 - Controls temperature regulation and monitoring

Setting Up the Run Engine#

First, we set up the Bluesky Run Engine with standard callbacks:

from bluesky import RunEngine
import bluesky.plan_stubs as bps
from bluesky.plans import scan
from bluesky.preprocessors import run_decorator
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.utils import ProgressBarManager
from bluesky.log import config_bluesky_logging

from ophyd_async.core import init_devices
from secop_ophyd.SECoPDevices import SECoPNodeDevice

# Configure logging
config_bluesky_logging(level='WARNING')

# Create Run Engine
RE = RunEngine({}, call_returns_result=True)

# Add Best Effort Callback for live plotting and table display
bec = BestEffortCallback()
RE.subscribe(bec)

# Add progress bar for long-running plans
RE.waiting_hook = ProgressBarManager()
RE.ignore_callback_exceptions = False

# Add metadata to all runs
RE.md["investigation_id"] = "demo_experiment_001"

Connecting to SECoP Nodes#

The SECoPNodeDevice automatically builds the device tree from the node’s description upon connection:

with init_devices():
    gas_dosing_introspected = SECoPNodeDevice('localhost:10801', loglevel="DEBUG")
    reactor_cell_introspected = SECoPNodeDevice('localhost:10802', loglevel="INFO")

Log Levels:

  • "DEBUG": Logs all messages sent/received by the Frappy client

  • "INFO": Standard operational messages (default)

  • "WARNING": Only warnings and errors

  • "ERROR": Only errors

The init_devices() context manager is part of ophyd-async and ensures proper connection and cleanup. Learn more about device connection strategies.

Generating Class Files for Type Hints#

While SECoP-Ophyd devices work immediately after connection, generating class files provides:

  • Type hints for IDE autocompletion

  • Documentation of device structure

  • Better development experience when writing plans

Generate class files with:

gas_dosing_introspected.class_from_instance()
reactor_cell_introspected.class_from_instance()

This creates genNodeClass.py in your current working directory. You can specify a custom path:

gas_dosing_introspected.class_from_instance('/path/to/output/directory')
The generated class file contains declarative device definitions that match the

structure of the SECoP node. Signals are annotated with their types, and commands are exposed as method stubs that get overwritten on .connect().

Warning

Regenerate class files whenever the SECoP node structure changes (e.g., after firmware updates or configuration modifications).

Using the Generated Classes#

Import the generated classes and instantiate your devices from the generated class definitions:

from genNodeClass import Gas_dosing, Reactor_cell

# once the class files are generated, instantiate your devices using the generated classes
with init_devices():
    gas_dosing = Gas_dosing('localhost:10801', loglevel="DEBUG")
    reactor_cell = Reactor_cell('localhost:10802', loglevel="INFO")

Now your IDE will provide autocompletion and type checking for all device attributes.

Basic Device Usage#

SECoP-Ophyd devices work like any other ophyd-async device in Bluesky plans:

Setting Parameters#

# Set the temperature ramp rate to 50 K/min
RE(bps.abs_set(reactor_cell.temperature_reg.ramp, 50))

Scanning#

# Scan sample temperature from 25 to 100°C in 10 steps
RE(scan([reactor_cell.temperature_sam],
        reactor_cell.temperature_reg,
        25, 100, 10))

Reading Values#

from bluesky.plan_stubs import rd

def read_temperature():
    temp = yield from rd(reactor_cell.temperature_sam.value)
    print(f"Current temperature: {temp}°C")
    return temp

RE(read_temperature())

SECoP Commands as Bluesky Plans#

SECoP commands are special operations implemented on the SEC node. In SECoP-Ophyd, they’re exposed as instance methods that return Bluesky plans.

Understanding Command Signatures#

Commands can have arguments and return values. From the generated genNodeClass.py, you can see the signature:

@abstractmethod
def test_cmd(self, arg: dict[str, Any], wait_for_idle: bool = False) -> int:
    """Testing with ophyd secop integration

    argument: StructOf(name=StringType(), id=IntRange(0, 1000), sort=BoolType())
    result: IntRange()
    """

This command:

  • Takes a dict argument with ‘name’ (str), ‘id’ (int 0-1000), and ‘sort’ (bool)

  • Returns an integer

  • Optionally waits for the device to return to IDLE state

Using Commands#

def run_command_example():
    # Command with wait_for_idle=False (returns immediately)
    result = yield from gas_dosing.massflow_contr1.test_cmd(
        arg={'name': 'test', 'id': 245, 'sort': True},
        wait_for_idle=False
    )
    print(f"Command returned: {result}")

RE(run_command_example())

The wait_for_idle Parameter#

SECoP commands should return quickly. If an operation takes time, the module communicates this via a BUSY state. Use wait_for_idle=True to wait for completion:

def wait_for_completion():
    # This will wait until the device returns to IDLE
    result = yield from gas_dosing.massflow_contr1.test_cmd(
        arg={'name': 'test', 'id': 245, 'sort': True},
        wait_for_idle=True
    )
    print("Operation completed!")

RE(wait_for_completion())

Error Handling#

Commands validate their arguments and raise errors for invalid inputs:

try:
    # This will fail due to invalid name
    RE(gas_dosing.massflow_contr1.test_cmd(
        arg={'name': 'bad_name', 'id': 245, 'sort': True},
        wait_for_idle=True
    ))
except Exception as e:
    print(f"Command failed: {e}")

Advanced Example: Catalysis Experiment#

This example demonstrates a fictional catalysis experiment with temperature ramping and gas switching.

Defining Gas Mixtures#

# Gas flows in ml/min for each mass flow controller
# Format: (MFC1, MFC2, MFC3) = (H2, He, CO)
gas_flows = {
    "N2": (30, 0, 0),
    "H2": (0, 15, 0),
    "CO": (0, 0, 5),
    "Off": (0, 0, 0)
}

Gas Setting Plan#

def set_gas(mfc_tuple):
    """Set mass flow controllers to specified values."""
    yield from bps.mv(
        gas_dosing.massflow_contr1, mfc_tuple[0],
        gas_dosing.massflow_contr2, mfc_tuple[1],
        gas_dosing.massflow_contr3, mfc_tuple[2],
        group='mfc_setting'
    )
    yield from bps.wait(group='mfc_setting', timeout=10)
    print(f"Gas set to {mfc_tuple}")

Temperature Ramping with Data Collection#

def ramp(gas_sel, temp_setpoint, temp_ramp, interval=10):
    """
    Ramp to target temperature while continuously acquiring data.

    Parameters
    ----------
    gas_sel : str
        Gas selection ('N2', 'H2', 'CO', 'Off')
    temp_setpoint : float
        Target temperature in Kelvin
    temp_ramp : float
        Ramp rate in K/min
    interval : int
        Seconds between data acquisitions
    """
    # Set ramp rate
    yield from bps.abs_set(reactor_cell.temperature_reg.ramp, float(temp_ramp), wait=True)

    # Set gas atmosphere
    print(f"Setting gas to {gas_sel}")
    yield from set_gas(gas_flows[gas_sel])

    # Start temperature change
    print(f"Ramping to {temp_setpoint} K at {temp_ramp} K/min")
    from bluesky.utils import Msg
    complete_status = yield Msg('set', reactor_cell.temperature_reg, temp_setpoint)

    # Acquire data while ramping
    while not complete_status.done:
        yield from bps.one_shot([
            reactor_cell.temperature_reg,
            reactor_cell.temperature_sam
        ])
        yield from bps.checkpoint()
        yield from bps.sleep(interval)

    # Confirm target reached
    temp = yield from rd(reactor_cell.temperature_reg.value)
    print(f"Temperature achieved: {temp} K")

Dwell Plan#

def dwell(gas_sel, dwell_time, interval=10):
    """
    Hold at current temperature with specified gas for given time.

    Parameters
    ----------
    gas_sel : str
        Gas selection ('N2', 'H2', 'CO', 'Off')
    dwell_time : int
        Dwell time in seconds
    interval : int
        Seconds between data acquisitions
    """
    from ophyd.status import Status

    # Set gas
    print(f"Setting gas to {gas_sel}")
    yield from set_gas(gas_flows[gas_sel])

    # Dwell with continuous acquisition
    dwell_status = Status(timeout=dwell_time)
    while not dwell_status.done:
        yield from bps.one_shot([
            reactor_cell.temperature_reg,
            reactor_cell.temperature_sam
        ])
        yield from bps.checkpoint()
        yield from bps.sleep(interval)

Complete Catalysis Experiment#

def catalysis_experiment():
    """Complete catalysis experiment with multiple ramps and dwells."""

    md = {
        'sample': 'sample_5',
        'operator': 'HZB',
        'experiment_type': 'catalysis'
    }

    @run_decorator(md=md)
    def inner_plan():
        # N2 purge at low temperature
        yield from ramp(gas_sel='N2', temp_setpoint=308.15, temp_ramp=60)
        yield from dwell(gas_sel='N2', dwell_time=60)

        # H2 reduction at elevated temperature
        yield from ramp(gas_sel='H2', temp_setpoint=398.15, temp_ramp=60)
        yield from dwell(gas_sel='H2', dwell_time=60)

        # CO exposure while cooling
        yield from ramp(gas_sel='CO', temp_setpoint=308.15, temp_ramp=60)
        yield from dwell(gas_sel='Off', dwell_time=60)

    return (yield from inner_plan())

# Execute the experiment
RE(catalysis_experiment())