模拟业务场景:多租户系统跨域转账,需要控制某租户下某用户是否可以在某域转账
open_feature_util.py
import typing
from abc import abstractmethod, ABCMeta
from typing import Sequencefrom openfeature.evaluation_context import EvaluationContext
from openfeature.flag_evaluation import FlagValueType, FlagResolutionDetails
from openfeature.provider import AbstractProviderclass AsyncProvider(AbstractProvider, metaclass=ABCMeta):def resolve_boolean_details(self,flag_key: str,default_value: bool,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[bool]:raise RuntimeError("Not allow calling sync method in AsyncProvider")@abstractmethodasync def resolve_boolean_details_async(self,flag_key: str,default_value: bool,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[bool]:passdef resolve_string_details(self,flag_key: str,default_value: str,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[str]:raise RuntimeError("Not allow calling sync method in AsyncProvider")@abstractmethodasync def resolve_string_details_async(self,flag_key: str,default_value: str,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[str]:passdef resolve_integer_details(self,flag_key: str,default_value: int,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[int]:raise RuntimeError("Not allow calling sync method in AsyncProvider")@abstractmethodasync def resolve_integer_details_async(self,flag_key: str,default_value: int,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[int]:passdef resolve_float_details(self,flag_key: str,default_value: float,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[float]:raise RuntimeError("Not allow calling sync method in AsyncProvider")@abstractmethodasync def resolve_float_details_async(self,flag_key: str,default_value: float,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[float]:passdef resolve_object_details(self,flag_key: str,default_value: typing.Union[Sequence[FlagValueType], typing.Mapping[str, FlagValueType]],evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[typing.Union[Sequence[FlagValueType], typing.Mapping[str, FlagValueType]]]:raise RuntimeError("Not allow calling sync method in AsyncProvider")
provider.py
import typingfrom openfeature.evaluation_context import EvaluationContext
from openfeature.flag_evaluation import FlagResolutionDetails
from openfeature.provider import Metadatafrom open_feature_util import AsyncProviderclass MyProvider(AsyncProvider):def get_metadata(self) -> Metadata:return Metadata(name="my_provider")async def resolve_boolean_details_async(self,flag_key: str,default_value: bool,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[bool]:tenant_id = evaluation_context.attributes.get("tenant_id")user_id = evaluation_context.attributes.get("user_id")if flag_key.startswith("cross_domain_transfer_domain_"):domain = flag_key.split("_", )[-1]# todo 查询业务规则# 使用规则评估flag,假定目前只开放 租户id 为 1 的用户id 为 1、2、3 的用户可以进行 A、B、C 域的功能,其他均关闭if not tenant_id or not user_id or tenant_id not in (1,) or user_id not in (1, 2, 3):return FlagResolutionDetails(value=False, error_message="该功能暂未开放或您暂未被授权体验~")if domain not in ("A", "B", "C"):return FlagResolutionDetails(value=False,error_message=f"账户域({domain})转账功能暂未开放或您暂未被授权体验~")else:return FlagResolutionDetails(value=False, error_message="无此功能~")return FlagResolutionDetails(value=True)async def resolve_string_details_async(self,flag_key: str,default_value: str,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[str]:passasync def resolve_integer_details_async(self,flag_key: str,default_value: int,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[int]:passasync def resolve_float_details_async(self,flag_key: str,default_value: float,evaluation_context: typing.Optional[EvaluationContext] = None,) -> FlagResolutionDetails[float]:pass
main.py
import decimal
from contextlib import asynccontextmanager
from dataclasses import asdict
from typing import Listimport uvicorn
from fastapi import FastAPI, Depends, Header, Body, Request
from openfeature import api
from openfeature.evaluation_context import EvaluationContext
from openfeature.flag_evaluation import FlagEvaluationDetails
from pydantic import BaseModel, Field
from starlette.responses import JSONResponsefrom provider import MyProvider@asynccontextmanager
async def lifespan(application: FastAPI):# 注册 Providerapi.set_provider(MyProvider())yieldapp = FastAPI(lifespan=lifespan)class FlagEvaluationException(Exception):pass@app.exception_handler(FlagEvaluationException)
async def flag_disabled_handler(request: Request, exc: FlagEvaluationException):return JSONResponse(content={"ok": False, "msg": str(exc)})class Transfer(BaseModel):demain: str = Field(..., description="账户域,A、B、C")account_id: str = Field(..., description="账户ID")amount: decimal.Decimal = Field(..., description="转账金额")operation: str = Field(..., description="操作类型,in-转入,out-转出")class Transaction(BaseModel):tx_id: str = Field(..., description="Transaction id")transfers: List[Transfer] = Field(..., description="List of transaction transfers")async def get_of_client():return api.get_client()async def check_by_flag(tenant_id: int = Header(..., alias="X-Tenant"),user_id: int = Header(..., alias="X-User"),raw: Transaction = Body(...),of_client=Depends(get_of_client),
) -> Transaction:ec = EvaluationContext(attributes={"tenant_id": tenant_id, "user_id": user_id})for tr in raw.transfers:flag_key = f"cross_domain_transfer_domain_{tr.demain}"detail: FlagEvaluationDetails[bool] = await of_client.get_boolean_details_async(flag_key, default_value=False, evaluation_context=ec)if not detail.value:raise FlagEvaluationException(detail.error_message)return raw@app.post("/cross_domain_transfer")
async def transfer(tenant_id: int = Header(..., alias="X-Tenant"),user_id: int = Header(..., alias="X-User"),transaction: Transaction = Depends(check_by_flag),
):# TODO: 真正转账逻辑print(tenant_id, user_id, transaction.transfers)return {"ok": True, "msg": "成功"}@app.post("/feature_flags")
async def transfer(tenant_id: int = Header(..., alias="X-Tenant"),user_id: int = Header(..., alias="X-User"),flags: list[str] = Body(...),of_client=Depends(get_of_client),
):flag_res = {}ec = EvaluationContext(attributes={"tenant_id": tenant_id, "user_id": user_id})for flag in flags:detail: FlagEvaluationDetails[bool] = await of_client.get_boolean_details_async(flag, default_value=False, evaluation_context=ec)flag_res[flag] = asdict(detail)return {"ok": True, "msg": "成功", "res": flag_res}if __name__ == '__main__':uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug", use_colors=True)
POST 127.0.0.1:8000/cross_domain_transfer
X-Tenant 1
X-User 2
{"tx_id": "1234567890","transfers": [{"demain": "A","account_id": "1246","amount": "6666","operation": "out"},{"demain": "B","account_id": "345345","amount": "6600","operation": "in"},{"demain": "D","account_id": "23423","amount": "66","operation": "in"}]
}
POST 127.0.0.1:8000/feature_flags
X-Tenant 1
X-User 2
["sdf"]