Source code for qrisp.interface.batched_backend

# """
# ********************************************************************************
# * Copyright (c) 2026 the Qrisp authors
# *
# * This program and the accompanying materials are made available under the
# * terms of the Eclipse Public License 2.0 which is available at
# * http://www.eclipse.org/legal/epl-2.0.
# *
# * This Source Code may also be made available under the following Secondary
# * Licenses when the conditions for such availability set forth in the Eclipse
# * Public License, v. 2.0 are satisfied: GNU General Public License, version 2
# * with the GNU Classpath Exception which is
# * available at https://www.gnu.org/software/classpath/license.html.
# *
# * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
# ********************************************************************************
# """

"""This module defines :class:`BatchedBackend`, obtained via
:meth:`Backend.batched() <qrisp.interface.Backend.batched>`.
"""

from __future__ import annotations

import warnings
from collections.abc import Mapping, Sequence
from typing import overload

from qrisp.circuit.quantum_circuit import QuantumCircuit
from qrisp.interface.backend import Backend
from qrisp.interface.measurement_result import MeasurementResult


# NOTE: ``BatchedBackend`` intentionally does not inherit from
# ``Backend``. ``Backend.run`` is contractually required to return
# a fully populated result. A ``BatchedBackend`` cannot honour that
# contract because its ``run`` returns an empty placeholder that is
# only populated after ``dispatch`` is called. Inheriting from
# ``Backend`` would therefore violate the Liskov Substitution Principle.
[docs] class BatchedBackend: """A class that buffers circuit submissions and executes them on an explicit :meth:`dispatch` call. Obtain an instance via :meth:`~qrisp.interface.Backend.batched`: .. code-block:: python batched_backend = my_backend.batched() .. rubric:: How it works Each call to :meth:`run` registers the circuit in an internal queue and returns a :class:`~qrisp.interface.MeasurementResult` immediately. The result is *lazy*: its data is not available until :meth:`dispatch` is called. Accessing the result before dispatch raises ``RuntimeError``. After :meth:`dispatch`, every pending :class:`~qrisp.interface.MeasurementResult` is populated with raw bitstring counts by forwarding each circuit to the wrapped backend's :meth:`~qrisp.interface.Backend.run_async`. Higher-level decoding (performed by :meth:`QuantumVariable.get_measurement <qrisp.QuantumVariable.get_measurement>`) is deferred until the user actually reads the decoded result. .. note:: :class:`BatchedBackend` is not thread-safe. :meth:`run` and :meth:`dispatch` must be called from the same thread. Concurrent calls from multiple threads can silently drop queued circuits. Parameters ---------- backend : Backend The backend whose :meth:`~qrisp.interface.Backend.run_async` is called with all queued circuits when :meth:`dispatch` is invoked. name : str or None Optional name for this backend. Examples -------- In this example, we collect two measurements, then dispatch: .. code-block:: python from qrisp import QuantumFloat from qrisp.default_backend import QrispSimulatorBackend backend = QrispSimulatorBackend() batched_backend = backend.batched() a = QuantumFloat(4); a[:] = 1 b = QuantumFloat(3); b[:] = 2 c = a + b d = QuantumFloat(4); d[:] = 2 e = QuantumFloat(3); e[:] = 3 f = d + e res_c = c.get_measurement(backend=batched_backend) # lazy (returns immediately) res_f = f.get_measurement(backend=batched_backend) # lazy (returns immediately) batched_backend.dispatch() # runs both circuits, then populates results print(res_c) # {3: 1.0} print(res_f) # {5: 1.0} The same pattern is available via :func:`~qrisp.batched_measurement`: .. code-block:: python from qrisp import batched_measurement results = batched_measurement([c, f], backend=batched_backend) print(results) # [{3: 1.0}, {5: 1.0}] """ def __init__( self, backend: Backend, name: str | None = None, ): self.name = name or self.__class__.__name__ self._backend = backend # Each entry: (circuit, shots, MeasurementResult) self._queries: list[tuple] = [] @property def options(self) -> Mapping: """Current runtime options (read-only). Delegates to the wrapped backend.""" return self._backend.options @property def pending_count(self) -> int: """Number of circuits currently queued, waiting for :meth:`dispatch`.""" return len(self._queries)
[docs] def update_options(self, **kwargs) -> None: """Update existing runtime options. Delegates to the wrapped backend's :meth:`~qrisp.interface.Backend.update_options`, which validates keys and raises ``AttributeError`` for unknown ones. Changes are visible through both ``self.options`` and the wrapped backend's ``options``. Parameters ---------- **kwargs Key-value pairs to update. """ self._backend.update_options(**kwargs)
@overload def run(self, circuits: QuantumCircuit, shots: int | None = None) -> MeasurementResult: ... @overload def run(self, circuits: Sequence[QuantumCircuit], shots: int | None = None) -> list[MeasurementResult]: ...
[docs] def run( self, circuits: QuantumCircuit | Sequence[QuantumCircuit], shots: int | None = None, ) -> MeasurementResult | list[MeasurementResult]: """Register one or more circuits in the queue and return lazy result(s). Each circuit is stored alongside its shot count. The corresponding :class:`~qrisp.interface.MeasurementResult` objects are *empty* until :meth:`dispatch` is called. Parameters ---------- circuits : QuantumCircuit or Sequence[QuantumCircuit] One circuit or a sequence of circuits to include in the batch. shots : int or None Number of shots. Defaults to the backend's ``shots`` option. Returns ------- MeasurementResult or list[MeasurementResult] A single lazy result for a single circuit, or a list of lazy results when multiple circuits are given. """ Backend._validate_shots(shots) batch = not isinstance(circuits, QuantumCircuit) if not batch: circuits = [circuits] n_shots = shots if shots is not None else self._backend.options.get("shots") raw_list: list[MeasurementResult] = [] for qc in circuits: raw = MeasurementResult() self._queries.append((qc, n_shots, raw)) raw_list.append(raw) return raw_list if batch else raw_list[0]
# ------------------------------------------------------------------ # Dispatch # ------------------------------------------------------------------
[docs] def dispatch(self, timeout: float | None = None) -> None: """Execute all pending circuits and populate their results. All queued circuits are submitted via :meth:`~qrisp.interface.Backend.run_async`, eliminating redundant network round-trips and queue-wait overhead regardless of whether the circuits were queued with the same or different shot counts. When all queued circuits share the same shot count a scalar is passed to :meth:`~qrisp.interface.Backend.run_async`. When circuits carry different shot counts the full per-circuit list is passed, enabling each circuit to be executed with its own shot budget. Backends whose SDK does not natively support per-circuit shot counts should fall back to a fixed shot count and emit a ``UserWarning``. If the wrapped backend's :attr:`~qrisp.interface.Backend.max_circuits` limit is set and the queued batch exceeds it, the batch is automatically split into smaller jobs and a :exc:`UserWarning` is emitted. If the wrapped backend raises, that error is stored in every not-yet-populated :class:`~qrisp.interface.MeasurementResult` and then re-raised. Accessing any of those results later will re-raise the stored exception. Parameters ---------- timeout : float or None, optional Maximum seconds to wait for each hardware job. ``None`` (default) waits indefinitely. Passed through to :meth:`Job.result <qrisp.interface.Job.result>`. Raises ------ UserWarning If the batch is split across multiple jobs due to :attr:`~qrisp.interface.Backend.max_circuits`. TimeoutError If *timeout* is given and a hardware job does not finish in time. Results for all circuits in the batch are left unpopulated. Exception Re-raises any exception thrown by the wrapped backend during execution, after injecting the error into all pending results. """ queries = list(self._queries) self._queries = [] if not queries: return circuits, shots_list, raws = map(list, zip(*queries)) limit = self._backend.max_circuits chunk_size = limit if limit is not None else len(circuits) if limit is not None and len(circuits) > limit: n_jobs = -(-len(circuits) // limit) warnings.warn( f"The number of queued circuits ({len(circuits)}) exceeds the " f"backend's per-job circuit limit ({limit}). " f"Splitting into {n_jobs} jobs automatically.", UserWarning, stacklevel=2, ) try: for start in range(0, len(circuits), chunk_size): self._submit_chunk( circuits[start : start + chunk_size], shots_list[start : start + chunk_size], raws[start : start + chunk_size], timeout, ) except Exception as exc: for _, _, r in queries: if not r._populated: r._inject_error(exc) raise
def _submit_chunk( self, chunk_circuits: list, chunk_shots: list, chunk_raws: list, timeout: float | None, ) -> None: """Submit one chunk of circuits and inject results into the corresponding placeholders.""" unique = set(chunk_shots) effective_shots = chunk_shots[0] if len(unique) == 1 else chunk_shots chunk_counts = self._backend.run_async(chunk_circuits, shots=effective_shots).result(timeout=timeout).all_counts for raw, counts in zip(chunk_raws, chunk_counts): raw._inject(counts)
[docs] def clear(self) -> None: """Discard all queued circuits without dispatching. Any :class:`~qrisp.interface.MeasurementResult` objects previously returned by :meth:`run` for the cleared circuits remain unpopulated. Accessing them after ``clear()`` raises ``RuntimeError``. """ self._queries = []