Add support for a different AWG¶
Prerequisites: Experiment Manager.
If your AWG is not supported by qFabric, you can add support for it
by writing device-compatible subclasses of Segmenter
and Device.
It is important to read the AWG manual carefully to understand the operation of the AWG device. Here, we consider an example AWG device with the following specifications:
2 analog channels, 2 digital channels
Fixed sample rate of 100 Msamples/s
AWG supports the sequence mode, where we can define data blocks to stored in the AWG memory, executed with defined orders and repeats.
Each block can contain up to 1 million samples, with maximum 100 blocks available.
Minimum block length is 1000 samples, with valid block length a multiple of 8.
Segmenter¶
Each AWG should have its own Segmenter class, inheriting Segmenter.
Before defining the segmenter class, we define a ExampleBlock class representing
a single block of memory to be programmed on the AWG.
import numpy.typing as npt
class ExampleBlock:
"""Represent AWG memory data blocks."""
def __init__(self, analog_data: npt.NDArray[np.float64], digital_data: npt.NDArray[np.bool]):
self.analog_data = analog_data
self.digital_data = digital_data
We also define an ExampleSegment class, representing AWG data for a single step of the sequence.
The segment is split into ExampleBlock objects.
import numpy as np
from qfabric.sequence.function import AnalogEmpty, DigitalEmpty
from qfabric.sequence.step import DeviceStep
from qfabric.planner.segmenter import Segment
SAMPLE_RATE = int(100e6)
MIN_BLOCK_LENGTH = 1000
MAX_BLOCK_LENGTH = 1000000
def get_allowed_sample_length(nominal_sample_length: int) -> int:
"""Gets the minimum allowed sample length longer than the nominal length."""
if nominal_sample_length < MIN_SAMPLE_LENGTH:
return MIN_SAMPLE_LENGTH
elif nominal_sample_length > MAX_BLOCK_LENGTH:
raise ValueError(f"Sample length cannot be longer than {MAX_BLOCK_LENGTH}")
else:
remainder = nominal_sample_length // 8
if remainder == 0:
return nominal_sample_length
else:
return nominal_sample_length + 8 - remainder
class ExampleSegment(Segment):
"""AWG data for a device step, may contain multiple ExampleBlocks"""
def __init__(self, device_step: DeviceStep, analog_channels: list[int], digital_channels: list[int]):
super().__init__(device_step, analog_channels, digital_channels)
sample_length, analog_data, digital_data = self._get_analog_and_digital_data(
analog_channels, digital_channels
)
self.blocks: list[ExampleBlock] = self._split_to_blocks(sample_length, analog_data, digital_data)
def __eq__(self, other: "ExampleSegment") -> bool:
"""For comparing segments to check if they contain the same content."""
if self._device_step != other._device_step:
return False
return True
def _get_analog_and_digital_data(
self, analog_channels: list[int], digital_channels: list[int]
) -> tuple[int, npt.NDArray[np.float64], npt.NDArray[np.bool]]:
"""Gets analog and digital data of the entire device step."""
nominal_duration = device_step.duration
nominal_sample_length = int(SAMPLE_RATE * nominal_duration)
sample_length = get_allowed_sample_length(nominal_sample_length)
times = np.arange(len(sample_length)) / SAMPLE_RATE
analog_data = []
for analog_channel in analog_channels:
analog_function = self._device_step.analog_functions.get(
analog_channel, AnalogEmpty()
)
analog_data.append(analog_function.output(times))
analog_data = np.transpose(analog_data) # axis 0 is sample index.
digital_data = []
for digital_channel in digital_channels:
digital_function = self._device_step.digital_functions.get(
digital_channel, DigitalEmpty()
)
digital_data.append(digital_function.output(times))
digital_data = np.transpose(digital_data) # axis 0 is sample index.
return (sample_length, analog_data, digital_data)
def _split_to_blocks(
self,
sample_length: int,
analog_data: npt.NDArray[np.float64],
digital_data: npt.NDArray[np.bool],
) -> list[ExampleBlock]:
"""Splits data into ExampleBlocks"""
block_boundary_indices = [
block_index * MAX_BLOCK_LENGTH for MAX_BLOCK_LENGTH in range(sample_length // MAX_BLOCK_LENGTH)
]
block_boundary_indices += [sample_length]
# checks if the last block is too short
if len(block_boundary_indices) > 2:
if block_boundary_indices[-1] - block_boundary_indices[-2] < MIN_BLOCK_LENGTH:
block_boundary_indices[-2] = block_boundary_indices[-1] - 1000
blocks: list[ExampleBlock] = []
for block_index in range(len(block_boundary_indices) - 1):
start_sample_index = block_boundary_indices[block_index]
stop_sample_index = block_boundary_indices[block_index + 1]
blocks.append(
ExampleBlock(
analog_data=analog_data[start_sample_index:stop_sample_index],
digital_data=digital_data[start_sample_index:stop_sample_index],
)
)
return blocks
After that we can define the segmenter class which handles converting steps to segments.
In this class, we need to implement set_steps()
and get_awg_memory_data().
from qfabric.planner.segmenter import Segmenter
MAX_BLOCK_NUMBER = 100
class ExampleSegmenter(Segmenter):
def __init__(self, analog_channels: list[int], digital_channels: list[int]):
super().__init__(analog_channels, digital_channels)
if len(analog_channels) != 2:
raise ValueError("Must have 2 analog channels.")
if len(digital_channels) != 2:
raise ValueError("Must have 2 digital channels.")
def set_steps(self, steps: list[Step], sequence_to_steps_map: dict[int, list[int]]):
super().set_steps(steps, sequence_to_steps_map)
# self._device_steps and self._sequence_to_device_steps_map are defined by above.
self._device_steps_to_segments()
self._get_sequence_to_segments_map()
def _device_steps_to_segments(self):
"""Builds segments from device steps."""
self._segments: list[ExampleSegment] = []
self._device_step_to_segment_map: dict[int, int] = {}
for device_step_index, device_step in enumerate(self._device_steps):
segment = ExampleSegment(
device_step, self._analog_channels, self._digital_channels
)
# checks for duplicates in segments.
try:
segment_index = self._segments.index(segment)
except ValueError:
self._segments.append(segment)
segment_index = len(self._segments) - 1
# this is a mapping from device step indices to segment indices.
self._device_step_to_segment_map[device_step_index] = segment_index
def _get_sequence_to_segments_map(self):
"""Generates mapping from sequence indices to segment indices"""
self._sequence_to_segments_map: dict[int, list[int]] = {}
for sequence_index in self._sequence_to_device_steps_map:
self._sequence_to_segments_map[sequence_index] = []
for device_step_index in self._sequence_to_device_steps_map[sequence_index]:
self._sequence_to_segments_map[sequence_index].append(
self._device_step_to_segment_map[device_step_index]
)
def get_awg_memory_data(
self, sequence_indices: list[int]
) -> tuple[dict[str, list[ExampleSegment]], dict[int, int], list[int]]:
# list of step indices that is used in the sequences requested
step_indices: list[int] = []
for sequence_index in sequence_indices:
step_indices.extend(self._sequence_to_device_steps_map[sequence_index])
# removes duplicates
step_indices = list(dict.fromkeys(step_indices))
# segments to be programmed
segments: list[ExampleSegment] = []
# mapping from step indices to indices in the above segments list.
step_to_segment_map: dict[int, int] = {}
for step_index in step_indices:
segment = self._segments[self._device_step_to_segment_map[step_index]]
# check for duplicates.
try:
segment_index = segments.index(segment)
except ValueError:
segments.append(segment)
segment_index = len(segments) - 1
step_to_segment_map[step_index] = segment_index
# check if there are too many blocks to program
block_count = 0
for segment in segments:
block_count += len(segment.blocks)
if block_count > MAX_BLOCK_NUMBER:
# if there are too many blocks, attempt to program less sequences.
if len(sequence_indices) > 1:
return self.get_awg_memory_data(sequence_indices[:-1])
else:
raise RuntimeError("Sequence is too complex to be programmed in this AWG.")
# this is the minimum amount of data to program the AWG.
# if the AWG needs more data, it can be added as long as the Device class is compatible.
awg_data = {"segments": segments}
return awg_data, step_to_segment_map, sequence_indices
The above segmenter satisfies the specifications of the AWG listed above.
It can be made more AWG memory efficient, by checking if there are ExampleBlock
contain the same data. If they do, they can be replaced by a single AWG memory block.
However, the above example shows what is generally needed in a segmenter.
Device¶
The device class, inherting from Device,
takes the data from the Segmenter class and programs the AWG.
Before writing the device class, let’s consider a skeleton driver class,
mimicking a AWG driver class implementing its functions.
class ExampleDriver:
def __init__(resource: str):
self._awg = self._connect(resource)
def _connect(resource: str): ...
def set_memory_block(
self,
block_index: int,
analog_data: npt.NDArray[np.float64],
digital_data: npt.NDArray[np.bool],
): ...
def set_step(
self,
step_index: int,
repeats: int,
next_step: int = -1,
): ...
def start(self): ...
def wait_for_complete(self): ...
def stop(self): ...
def set_trigger_mode(self, external: bool): ...
Then we write the ExampleDevice class linking these AWG functions to
data from the Segmenter.
from qfabric.programmer.device import Device
class ExampleDevice(Device):
def __init__(segmenter: ExampleSegmenter, resource: str, principal_device: bool):
super().__init__(segmenter, resource, principal_device)
self._driver = ExampleDriver(resource)
if self._principal_device:
self.setup_software_trigger()
else:
# other devices are triggered by the principal device.
self.setup_external_trigger()
def program_memory(self, instructions: dict[str, list[ExampleSegment]]):
"""
It needs to work with the first returned value in ExampleSegmenter.get_awg_memory_data.
Adds each block in each segment into AWG memory.
"""
self._segment_to_block_map: dict[int, list[int]] = {}
block_counter = 0
for segment_index, segment in enumerate(instructions["segments"]):
block_indices = []
for block in segment.blocks:
self._driver.set_memory_block(
block_counter, block.analog_data, block.digital_data
)
block_indices.append(block_counter)
block_counter += 1
self._segment_to_block_map[segment_index] = block_indices
def program_segment_steps(self, segment_indices_and_repeats: list[tuple[int, int]]):
block_step_counter = 0
for segment_step_index, (segment_index, segment_repeat) in enumerate(
segment_indices_and_repeats
):
block_indices_this_segment = self._segment_to_block_map[segment_index]
for _ in range(segment_repeat):
for block_step_index, block_index in enumerate(block_indices_this_segment):
if (
(segment_step_index == len(segment_indices_and_repeats) - 1) # last segment
and (block_step_index == len(block_indices_this_segment) - 1) # last block
):
next_block_step = -1
else:
next_block_step = block_step_counter + 1
self._driver.set_step(block_step_counter, 1, next_block_step)
def start(self):
self._driver.start()
def wait_until_complete(self):
self._driver.wait_until_complete()
def stop(self):
self._driver.stop()
def setup_external_trigger(self):
self.set_trigger_mode(True)
def setup_software_trigger(self):
self.set_trigger_mode(False)
Similar to the segmenter class above, this ExampleDevice class does not
optimize for everything (e.g. check previously loaded blocks for duplicates,
or combine multiple steps with the same block to be a single step with repeats).
However, it shows general steps to write a device class.
Following above, you can write a Segmenter and Device class for your device,
and then you can add them into the config file for use (See How to write a config file).