# Copyright 2019-2024 Quantinuum
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import cast, Iterable, List, Optional, Sequence, Union, Any
from uuid import uuid4
from logging import warning
import numpy as np
from pyquil.api import (
QuantumComputer,
WavefunctionSimulator,
list_quantum_computers,
get_qc,
)
from pyquil.gates import I
from pyquil.paulis import ID, PauliSum, PauliTerm
from pyquil.quilatom import Qubit as Qubit_
from pytket.circuit import Circuit, OpType, Qubit, Node
from pytket.backends import (
Backend,
CircuitNotRunError,
CircuitStatus,
ResultHandle,
StatusEnum,
)
from pytket.backends.backend import KwargTypes
from pytket.backends.backendinfo import BackendInfo
from pytket.backends.backendresult import BackendResult
from pytket.backends.resulthandle import _ResultIdTuple
from pytket.extensions.pyquil._metadata import __extension_version__
from pytket.passes import (
BasePass,
EulerAngleReduction,
CXMappingPass,
auto_rebase_pass,
KAKDecomposition,
SequencePass,
SynthesiseTket,
DecomposeBoxes,
FullPeepholeOptimise,
CliffordSimp,
FlattenRegisters,
SimplifyInitial,
NaivePlacementPass,
)
from pytket.pauli import QubitPauliString
from pytket.predicates import (
NoSymbolsPredicate,
ConnectivityPredicate,
GateSetPredicate,
NoClassicalControlPredicate,
NoFastFeedforwardPredicate,
NoMidMeasurePredicate,
DefaultRegisterPredicate,
Predicate,
)
from pytket.extensions.pyquil.pyquil_convert import (
process_characterisation,
get_avg_characterisation,
tk_to_pyquil,
)
from pytket.placement import NoiseAwarePlacement
from pytket.architecture import Architecture
from pytket.utils import prepare_circuit
from pytket.utils.operators import QubitPauliOperator
from pytket.utils.outcomearray import OutcomeArray
class PyQuilJobStatusUnavailable(Exception):
"""Raised when trying to retrieve unknown job status."""
def __init__(self) -> None:
super().__init__("The job status cannot be retrieved.")
_STATUS_MAP = {
"done": StatusEnum.COMPLETED,
"running": StatusEnum.RUNNING,
"loaded": StatusEnum.SUBMITTED,
"connected": StatusEnum.SUBMITTED,
}
def _default_q_index(q: Qubit) -> int:
if q.reg_name != "q" or len(q.index) != 1:
raise ValueError("Non-default qubit register")
return int(q.index[0])
[docs]
class ForestBackend(Backend):
"""
Interface to a Rigetti device.
"""
_supports_shots = True
_supports_counts = True
_supports_contextual_optimisation = True
_persistent_handles = True
_GATE_SET = {
OpType.CZ,
OpType.Rx,
OpType.Rz,
OpType.Measure,
OpType.Barrier,
OpType.ISWAP,
}
def __init__(self, qc: QuantumComputer):
"""Backend for running circuits with the Rigetti QVM.
:param qc: The particular QuantumComputer to use. See the pyQuil docs for more
details.
:type qc: QuantumComputer
"""
super().__init__()
self._qc: QuantumComputer = qc
self._backend_info = self._get_backend_info(self._qc)
@property
def required_predicates(self) -> List[Predicate]:
return [
NoClassicalControlPredicate(),
NoFastFeedforwardPredicate(),
NoMidMeasurePredicate(),
GateSetPredicate(self.backend_info.gate_set),
ConnectivityPredicate(self.backend_info.architecture), # type: ignore
]
[docs]
def rebase_pass(self) -> BasePass:
return auto_rebase_pass({OpType.CZ, OpType.Rz, OpType.Rx})
[docs]
def default_compilation_pass(self, optimisation_level: int = 2) -> BasePass:
assert optimisation_level in range(3)
passlist = [
DecomposeBoxes(),
FlattenRegisters(),
]
if optimisation_level == 1:
passlist.append(SynthesiseTket())
elif optimisation_level == 2:
passlist.append(FullPeepholeOptimise())
passlist.append(
CXMappingPass(
self.backend_info.architecture, # type: ignore
NoiseAwarePlacement(
self._backend_info.architecture, # type: ignore
self._backend_info.averaged_node_gate_errors, # type: ignore
self._backend_info.averaged_edge_gate_errors, # type: ignore
),
directed_cx=False,
delay_measures=True,
)
)
passlist.append(NaivePlacementPass(self.backend_info.architecture)) # type: ignore
if optimisation_level == 2:
# Add some connectivity preserving optimisations after routing.
passlist.extend(
[KAKDecomposition(allow_swaps=False), CliffordSimp(allow_swaps=False)]
)
if optimisation_level > 0:
passlist.append(SynthesiseTket())
passlist.append(self.rebase_pass())
if optimisation_level > 0:
passlist.append(
EulerAngleReduction(OpType.Rx, OpType.Rz),
)
return SequencePass(passlist)
@property
def _result_id_type(self) -> _ResultIdTuple:
return (int, str)
[docs]
def process_circuits(
self,
circuits: Sequence[Circuit],
n_shots: Union[None, int, Sequence[Optional[int]]] = None,
valid_check: bool = True,
**kwargs: KwargTypes,
) -> List[ResultHandle]:
"""
See :py:meth:`pytket.backends.Backend.process_circuits`.
Supported kwargs:
* `seed`
* `postprocess`: apply end-of-circuit simplifications and classical
postprocessing to improve fidelity of results (bool, default False)
* `simplify_initial`: apply the pytket ``SimplifyInitial`` pass to improve
fidelity of results assuming all qubits initialized to zero (bool, default
False)
"""
circuits = list(circuits)
n_shots_list = Backend._get_n_shots_as_list(
n_shots, len(circuits), optional=False
)
if valid_check:
self._check_all_circuits(circuits)
postprocess = kwargs.get("postprocess", False)
simplify_initial = kwargs.get("simplify_initial", False)
handle_list = []
for circuit, n_shots in zip(circuits, n_shots_list):
if postprocess:
c0, ppcirc = prepare_circuit(circuit, allow_classical=False)
ppcirc_rep = ppcirc.to_dict()
else:
c0, ppcirc_rep = circuit, None
if simplify_initial:
_x_circ = Circuit(1).Rx(1, 0)
SimplifyInitial(
allow_classical=False, create_all_qubits=True, xcirc=_x_circ
).apply(circuit)
p, bits = tk_to_pyquil(c0, return_used_bits=True)
p.wrap_in_numshots_loop(n_shots)
ex = self._qc.compiler.native_quil_to_executable(p)
qam = self._qc.qam
qam.random_seed = kwargs.get("seed") # type: ignore
pyquil_handle = qam.execute(ex)
handle = ResultHandle(uuid4().int, json.dumps(ppcirc_rep))
measures = circuit.n_gates_of_type(OpType.Measure)
if measures == 0:
self._cache[handle] = {
"handle": pyquil_handle,
"c_bits": sorted(bits),
"result": self.empty_result(circuit, n_shots=n_shots),
}
else:
self._cache[handle] = {"handle": pyquil_handle, "c_bits": sorted(bits)}
handle_list.append(handle)
return handle_list
[docs]
def circuit_status(self, handle: ResultHandle) -> CircuitStatus:
"""
Return a CircuitStatus reporting the status of the circuit execution
corresponding to the ResultHandle.
This will throw an PyQuilJobStatusUnavailable exception if the results
have not been retrieved yet, as pyQuil does not currently support asynchronous
job status queries.
:param handle: The handle to the submitted job.
:type handle: ResultHandle
:returns: The status of the submitted job.
:raises PyQuilJobStatusUnavailable: Cannot retrieve job status.
:raises CircuitNotRunError: The handle does not correspond to a valid job.
"""
if handle in self._cache and "result" in self._cache[handle]:
return CircuitStatus(StatusEnum.COMPLETED)
if handle in self._cache:
# retrieving status is not supported yet
# see https://github.com/rigetti/pyquil/issues/1370
raise PyQuilJobStatusUnavailable()
raise CircuitNotRunError(handle)
[docs]
def get_result(self, handle: ResultHandle, **kwargs: KwargTypes) -> BackendResult:
"""
See :py:meth:`pytket.backends.Backend.get_result`.
Supported kwargs: none.
"""
try:
return super().get_result(handle)
except CircuitNotRunError:
if handle not in self._cache:
raise CircuitNotRunError(handle)
pyquil_handle = self._cache[handle]["handle"]
raw_shots = self._qc.qam.get_result(pyquil_handle).readout_data["ro"]
if raw_shots is None:
raise ValueError("Could not read job results in memory")
shots = OutcomeArray.from_readouts(raw_shots.tolist())
ppcirc_rep = json.loads(cast(str, handle[1]))
ppcirc = Circuit.from_dict(ppcirc_rep) if ppcirc_rep is not None else None
res = BackendResult(
shots=shots, c_bits=self._cache[handle]["c_bits"], ppcirc=ppcirc
)
self._cache[handle].update({"result": res})
return res
@property
def backend_info(self) -> BackendInfo:
return self._backend_info
@classmethod
def _get_backend_info(cls, qc: QuantumComputer) -> BackendInfo:
char_dict: dict = process_characterisation(qc)
arch = char_dict.get("Architecture")
node_errors = char_dict.get("NodeErrors")
link_errors: dict[tuple[Node, Node], float] = char_dict.get("EdgeErrors") # type: ignore
averaged_errors = get_avg_characterisation(char_dict)
return BackendInfo(
cls.__name__,
qc.name,
__extension_version__,
arch,
cls._GATE_SET,
all_node_gate_errors=node_errors,
all_edge_gate_errors=link_errors, # type: ignore
averaged_node_gate_errors=averaged_errors["node_errors"],
averaged_edge_gate_errors=averaged_errors["link_errors"], # type: ignore
)
[docs]
@classmethod
def available_devices(cls, **kwargs: Any) -> List[BackendInfo]:
"""
See :py:meth:`pytket.backends.Backend.available_devices`.
Supported kwargs: `qpus` (default true), `qvms` (default false).
"""
if "qvms" not in kwargs:
kwargs["qvms"] = False
qc_name_list = list_quantum_computers(**kwargs)
return [cls._get_backend_info(get_qc(name)) for name in qc_name_list]
[docs]
class ForestStateBackend(Backend):
"""
State based interface to a Rigetti device.
"""
_supports_state = True
_supports_expectation = True
_expectation_allows_nonhermitian = False
_persistent_handles = False
_GATE_SET = {
OpType.X,
OpType.Y,
OpType.Z,
OpType.H,
OpType.S,
OpType.T,
OpType.Rx,
OpType.Ry,
OpType.Rz,
OpType.CZ,
OpType.CX,
OpType.CCX,
OpType.CU1,
OpType.U1,
OpType.SWAP,
}
def __init__(self) -> None:
"""Backend for running simulations on the Rigetti QVM Wavefunction Simulator."""
super().__init__()
self._sim = WavefunctionSimulator()
@property
def required_predicates(self) -> List[Predicate]:
return [
NoClassicalControlPredicate(),
NoFastFeedforwardPredicate(),
NoMidMeasurePredicate(),
NoSymbolsPredicate(),
GateSetPredicate(self._GATE_SET),
DefaultRegisterPredicate(),
]
[docs]
def rebase_pass(self) -> BasePass:
return auto_rebase_pass({OpType.CZ, OpType.Rz, OpType.Rx})
[docs]
def default_compilation_pass(self, optimisation_level: int = 2) -> BasePass:
assert optimisation_level in range(3)
passlist = [DecomposeBoxes(), FlattenRegisters()]
if optimisation_level == 1:
passlist.append(SynthesiseTket())
elif optimisation_level == 2:
passlist.append(FullPeepholeOptimise())
passlist.append(self.rebase_pass())
if optimisation_level > 0:
passlist.append(EulerAngleReduction(OpType.Rx, OpType.Rz))
return SequencePass(passlist)
@property
def _result_id_type(self) -> _ResultIdTuple:
return (int,)
[docs]
def process_circuits(
self,
circuits: Iterable[Circuit],
n_shots: Optional[Union[int, Sequence[int]]] = None,
valid_check: bool = True,
**kwargs: KwargTypes,
) -> List[ResultHandle]:
handle_list = []
if valid_check:
self._check_all_circuits(circuits)
for circuit in circuits:
p = tk_to_pyquil(circuit)
for qb in circuit.qubits:
# Qubits with no gates will not be included in the Program
# Add identities to ensure all qubits are present and dimension
# is as expected
p += I(Qubit_(qb.index[0]))
handle = ResultHandle(uuid4().int)
state = np.array(self._sim.wavefunction(p).amplitudes)
try:
phase = float(circuit.phase)
coeff = np.exp(phase * np.pi * 1j)
state *= coeff
except ValueError:
warning(
"Global phase is dependent on a symbolic parameter, so cannot "
"adjust for phase"
)
implicit_perm = circuit.implicit_qubit_permutation()
res_qubits = [
implicit_perm[qb] for qb in sorted(circuit.qubits, reverse=True)
]
res = BackendResult(q_bits=res_qubits, state=state)
self._cache[handle] = {"result": res}
handle_list.append(handle)
return handle_list
[docs]
def circuit_status(self, handle: ResultHandle) -> CircuitStatus:
if handle in self._cache:
return CircuitStatus(StatusEnum.COMPLETED)
raise CircuitNotRunError(handle)
def _gen_PauliTerm(self, term: QubitPauliString, coeff: complex = 1.0) -> PauliTerm:
pauli_term = ID() * coeff
for q, p in term.map.items():
pauli_term *= PauliTerm(p.name, _default_q_index(q))
return pauli_term # type: ignore
[docs]
def get_pauli_expectation_value(
self, state_circuit: Circuit, pauli: QubitPauliString
) -> complex:
"""Calculates the expectation value of the given circuit using the built-in QVM
functionality
:param state_circuit: Circuit that generates the desired state
:math:`\\left|\\psi\\right>`.
:type state_circuit: Circuit
:param pauli: Pauli operator
:type pauli: QubitPauliString
:return: :math:`\\left<\\psi | P | \\psi \\right>`
:rtype: complex
"""
prog = tk_to_pyquil(state_circuit)
pauli_term = self._gen_PauliTerm(pauli)
return complex(self._sim.expectation(prog, [pauli_term]))
[docs]
def get_operator_expectation_value(
self, state_circuit: Circuit, operator: QubitPauliOperator
) -> complex:
"""Calculates the expectation value of the given circuit with respect to the
operator using the built-in QVM functionality
:param state_circuit: Circuit that generates the desired state
:math:`\\left|\\psi\\right>`.
:type state_circuit: Circuit
:param operator: Operator :math:`H`.
:type operator: QubitPauliOperator
:return: :math:`\\left<\\psi | H | \\psi \\right>`
:rtype: complex
"""
prog = tk_to_pyquil(state_circuit)
pauli_sum = PauliSum(
[self._gen_PauliTerm(term, coeff) for term, coeff in operator._dict.items()] # type: ignore
)
return complex(self._sim.expectation(prog, pauli_sum))
_xcirc = Circuit(1).Rx(1, 0)
_xcirc.add_phase(0.5)