# Copyright 2019-2024 Cambridge Quantum Computing## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.importwarningsfromcollectionsimportCounter,defaultdictfromcollections.abcimportCallablefromtypingimportAny,Generic,TypeVar,Unionimportnumpyasnpfromscipy.statsimportrv_discreteNumber=Union[float,complex]T0=TypeVar("T0")T1=TypeVar("T1")
[docs]defas_counter(self)->Counter[T0]:"""Return the distribution as a :py:class:`collections.Counter` object."""returnself._C
@propertydeftotal(self)->int:"""Return the total number of observations."""returnself._C.total()@propertydefsupport(self)->set[T0]:"""Return the support of the distribution (set of all observations)."""returnset(self._C.keys())
[docs]def__eq__(self,other:object)->bool:"""Compare distributions for equality."""ifnotisinstance(other,EmpiricalDistribution):returnNotImplementedreturnself._C==other._C
[docs]def__getitem__(self,x:T0)->int:"""Get the count associated with an observation."""returnself._C[x]
[docs]def__add__(self,other:"EmpiricalDistribution[T0]")->"EmpiricalDistribution[T0]":"""Combine two distributions."""returnEmpiricalDistribution(self._C+other._C)
[docs]defcondition(self,criterion:Callable[[T0],bool])->"EmpiricalDistribution[T0]":"""Return a new distribution conditioned on the given criterion. :param criterion: A boolean function defined on all possible observations. """returnEmpiricalDistribution(Counter({x:cforx,cinself._C.items()ifcriterion(x)}))
[docs]defmap(self,mapping:Callable[[T0],T1])->"EmpiricalDistribution[T1]":"""Return a distribution over a transformed domain. The provided function maps elements in the original domain to new elements. If it is not injective, counts are combined. :param mapping: A function defined on all possible observations, mapping them to another domain. """C:Counter[T1]=Counter()forx,cinself._C.items():C[mapping(x)]+=creturnEmpiricalDistribution(C)
[docs]defsample_mean(self,f:Callable[[T0],Number])->Number:"""Compute the sample mean of a functional. The provided function maps observations to numerical values. :return: Estimate of the mean of the functional based on the observations."""returnsum(c*f(x)forx,cinself._C.items())/self.total
[docs]defsample_variance(self,f:Callable[[T0],Number])->Number:"""Compute the sample variance of a functional. The provided function maps observations to numerical values. The sample variance is an unbiased estimate of the variance of the underlying distribution. :return: Estimate of the variance of the functional based on the observations."""ifself.total<2:raiseRuntimeError("At least two samples are required in order to compute the sample ""variance.")fs=[(f(x),c)forx,cinself._C.items()]M0=self.totalM1=sum(c*vforv,cinfs)M2=sum(c*v**2forv,cinfs)return(M2-M1**2/M0)/(M0-1)
[docs]classProbabilityDistribution(Generic[T0]):"""Represents an exact probability distribution. Supports methods for combination, marginalization, expectation value, etc. May be derived from an :py:class:`EmpriricalDistribution`. """def__init__(self,P:dict[T0,float],min_p:float=0.0):"""Initialize with a dictionary of probabilities. :param P: Dictionary of probabilities. :param min_p: Optional probability below which to ignore values. Default 0. Distribution is renormalized after removing these values. The values must be non-negative. If they do not sum to 1, a warning is emitted; the distribution will contain normalized values. """ifany(x<0forxinP.values()):raiseValueError("Distribution contains negative probabilities")s0=sum(P.values())ifnp.isclose(s0,0):raiseValueError("Distribution has zero weight")ifnotnp.isclose(s0,1):warnings.warn("Probabilities used to initialize ProbabilityDistribution do ""not sum to 1: renormalizing.")newP={x:pforx,pinP.items()ifp>min_p}s=sum(newP.values())self._P={x:p/sforx,pinnewP.items()}
[docs]defas_dict(self)->dict[T0,float]:"""Return the distribution as a :py:class:`dict` object."""returnself._P
[docs]defas_rv_discrete(self)->tuple[rv_discrete,list[T0]]:"""Return the distribution as a :py:class:`scipy.stats.rv_discrete` object. This method returns an RV over integers {0, 1, ..., k-1} where k is the size of the support, and a list whose i'th member is the item corresponding to the value i of the RV. """X=list(self._P.keys())return(rv_discrete(values=(range(len(X)),[self._P[x]forxinX])),X)
@propertydefsupport(self)->set[T0]:"""Return the support of the distribution (set of all possible outcomes)."""returnset(self._P.keys())def__eq__(self,other:object)->bool:"""Compare distributions for equality."""ifnotisinstance(other,ProbabilityDistribution):returnNotImplementedkeys0=frozenset(self._P.keys())keys1=frozenset(other._P.keys())ifkeys0!=keys1:returnFalsereturnall(np.isclose(self._P[x],other._P[x])forxinkeys0)def__repr__(self)->str:returnf"{self.__class__.__name__}({self._P!r})"
[docs]def__getitem__(self,x:T0)->float:"""Get the probability associated with a possible outcome."""returnself._P.get(x,0.0)
[docs]@classmethoddeffrom_empirical_distribution(cls,ed:EmpiricalDistribution[T0])->"ProbabilityDistribution[T0]":"""Estimate a probability distribution from an empirical distribution."""S=ed.totalifS==0:raiseValueError("Empirical distribution has no values")f=1/Sreturncls({x:f*cforx,cined.as_counter().items()})
[docs]defcondition(self,criterion:Callable[[T0],bool])->"ProbabilityDistribution[T0]":"""Return a new distribution conditioned on the given criterion. :param criterion: A boolean function defined on all possible outcomes. """S=sum(cforx,cinself._P.items()ifcriterion(x))ifnp.isclose(S,0):raiseValueError("Condition has probability zero")f=1/SreturnProbabilityDistribution({x:f*cforx,cinself._P.items()ifcriterion(x)})
[docs]defmap(self,mapping:Callable[[T0],T1])->"ProbabilityDistribution[T1]":"""Return a distribution over a transformed domain. The provided function maps elements in the original domain to new elements. If it is not injective, probabilities are combined. :param mapping: A function defined on all possible outcomes, mapping them to another domain. """P:defaultdict[Any,float]=defaultdict(float)forx,pinself._P.items():P[mapping(x)]+=preturnProbabilityDistribution(P)
[docs]defexpectation(self,f:Callable[[T0],Number])->Number:"""Compute the expectation value of a functional. The provided function maps possible outcomes to numerical values. :return: Expectation of the functional. """returnsum(p*f(x)forx,pinself._P.items())
[docs]defvariance(self,f:Callable[[T0],Number])->Number:"""Compute the variance of a functional. The provided function maps possible outcomes to numerical values. :return: Variance of the functional. """fs=[(f(x),p)forx,pinself._P.items()]returnsum(p*v**2forv,pinfs)-(sum(p*vforv,pinfs))**2
[docs]defconvex_combination(dists:list[tuple[ProbabilityDistribution[T0],float]])->ProbabilityDistribution[T0]:"""Return a convex combination of probability distributions. Each pair in the list comprises a distribution and a weight. The weights must be non-negative and sum to 1. >>> dist1 = ProbabilityDistribution({0: 0.25, 1: 0.5, 2: 0.25}) >>> dist2 = ProbabilityDistribution({0: 0.5, 1: 0.5}) >>> dist3 = convex_combination([(dist1, 0.25), (dist2, 0.75)]) >>> dist3 ProbabilityDistribution({0: 0.4375, 1: 0.5, 2: 0.0625}) >>> dist3.expectation(lambda x : x**2) 0.75 """P:defaultdict[T0,float]=defaultdict(float)S=0.0forpd,aindists:ifa<0:raiseValueError("Weights must be non-negative.")forx,pinpd._P.items():P[x]+=a*pS+=aifnotnp.isclose(S,1):raiseValueError("Weights must sum to 1.")returnProbabilityDistribution(P)