目录

  • MIP-DQN 算法概述
    • 建模基础
    • 训练阶段(Training)
    • 部署阶段(Online Execution)
    • DNN 网络转化为 MIP 表达式
    • 性能指标
  • 完整 Python 代码实现
    • 主函数:random_generator_battery
    • 模型函数:MIP_DQN
      • 基础/专用库包安装
      • 模型运行(完整Python代码)
    • 参数设置函数:Parameters
  • 参考

本博客根据论文《Optimal energy system scheduling using a constraint-aware reinforcement learning algorithm》(2023)中提出的 MIP-DQN 算法,对其完整研究方法进行详细介绍。

内容包括:输入、输出,状态空间与动作空间定义,训练过程,在线部署过程,以及如何将 DNN 转换为 MIP 进行约束优化。

MIP-DQN 算法概述

MIP-DQN(Mixed-Integer Programming Deep Q-Network)是一种 值函数驱动的深度强化学习算法,与传统 深度强化学习DRL 不同,它在 执行阶段能够严格满足操作约束,例如功率平衡、爬坡限制等。

建模基础

输入(State 状态空间):

每个时刻 ( t ) 的状态 ( s_t ) 包括:

  • ( P^V_t ):当前时刻的光伏出力(向量)
  • ( P^L_t ):当前时刻的用户负荷(向量)
  • ( P^G_{t-1} ):上一时刻 DG(分布式发电机)出力(向量)
  • ( SOC_t ):当前时刻储能系统(ESS)电量状态(向量)
s_t = (P^V_t, P^L_t, P^G_{t-1}, SOC_t)

在这里插入图片描述


输出(Action 动作空间):

每个时刻 ( t ) 的动作 ( a_t ) 包括:

  • ( P^G_{i,t} ):每个 DG 单元的出力(连续变量)
  • ( P^B_{j,t} ):每个 ESS 的充放电功率(连续变量)
a_t = (P^G_{i,t}, P^B_{j,t})

注意:系统与电网的交易功率 ( P^N_t ) 不由 DRL 控制,它由系统自动调整以维持功率平衡。

训练阶段(Training)

🎯 目标:
学习一个 Q 函数,估计在状态 ( s_t ) 下采取动作 ( a_t ) 所带来的期望回报。
在这里插入图片描述

奖励函数定义(Reward Function):

R(s_t, a_t) = -σ_1 × \text{运行成本} - σ_2 × \text{功率不平衡}
  • 运行成本 = DG 成本 + 与电网交易成本
  • 功率不平衡 ( ΔP_t ):公式如下:
ΔP_t = \left| \sum_i P^G_{i,t} + \sum_j P^B_{j,t} + \sum_m P^V_{m,t} + P^N_t - \sum_k P^L_{k,t} \right|

在这里插入图片描述
训练流程(见算法 1):

  1. 初始化 Q 网络 ( Q_\theta(s, a) )、目标网络 ( Q_{\theta_{target}} ),以及策略网络 ( \pi_\omega )
  2. 采样动作:从策略网络 ( \pi_\omega(s) ) 加上噪声(用于探索)
  3. 与环境交互:得到 ( (s_t, a_t, r_t, s_{t+1}) )
  4. 存入回放缓冲区 Replay Buffer
  5. 采样 mini-batch 批量更新:
    • 更新 Q 网络:使用目标网络计算 target Q 值
    • 更新策略网络:最大化 Q 网络的期望值
  6. 定期软更新目标网络:( \theta_{target} = \tau \theta + (1 - \tau)\theta_{target} )
    在这里插入图片描述

部署阶段(Online Execution)

训练完成后,丢弃策略网络 ( \pi_\omega ),仅保留 Q 网络 ( Q_\theta(s, a) ),并将其转化为 带约束的整数规划问题(MIP) 进行决策。

步骤如下(见算法 2):

  1. 将训练好的 DNN ( Q_\theta(s, a) ) 转化为 MIP 表达形式(见下节)
  2. 添加操作约束:
    • 功率平衡约束(等式)
    • DG 出力上下限、爬坡约束
    • ESS 充放电限制、SOC 约束等
  3. 使用商业 MIP 求解器(如 Gurobi)求解:
\max_{a \in \mathcal{A}} Q_\theta(s, a) \quad \text{subject to all constraints}

在这里插入图片描述

求解结果中的 ( a^* ) 即为当前状态下最优可行动作!

在这里插入图片描述

DNN 网络转化为 MIP 表达式

假设 Q 网络结构为 多层前馈神经网络(ReLU 激活),则每一层输出满足:

x^k = \text{ReLU}(W^{k-1} x^{k-1} + b^{k-1})

ReLU 可通过如下线性混合整数形式建模:

对于每一个神经元 ( x ):

x = \max(0, \hat{x}) \Rightarrow
\begin{cases}
x \geq 0 \\
x \geq \hat{x} \\
x \leq M z \\
x - \hat{x} \leq M (1 - z) \\
z \in \{0, 1\}
\end{cases}

其中 ( z ) 是二进制变量,( M ) 是足够大的常数。

性能指标

DRL 算法性能评估指标:

  • 运行成本:目标函数(越小越好)
  • 功率不平衡(ΔP):是否满足功率平衡(越小越好)
  • 算法运行时间:是否满足实时性要求

完整 Python 代码实现

所用工具:

  • PyTorch:训练 Q 网络和策略网络
  • OMLT(Optimization and Machine Learning Toolkit):将 PyTorch 模型转为 MIP 形式
  • Gurobi / CPLEX / CBC:商业混合整数规划求解器
  • Pyomo:建模数学规划问题
  • OpenAI Gym / 自定义环境:训练环境

主函数:random_generator_battery

此段代码构建了一个 能源管理环境 ESSEnv,用于模拟以下多个能源设备的运行:

  • 🔋 电池系统(Battery)
  • ⚡ 三台分布式发电机(DG1、DG2、DG3)
  • 🌞 光伏发电(PV)
  • 🔌 电网供电(Grid)
  • 📈 能源价格、负荷、电量数据(通过 DataManager 管理)

该环境继承自 gym.Env,兼容 OpenAI Gym 强化学习 API,支持 reset()step()render() 等接口,用于训练强化学习智能体。

模型函数:MIP_DQN

基础/专用库包安装

安装基础依赖:

conda install numpy pandas matplotlib scikit-learn -y
conda install pytorch torchvision -c pytorch -y

安装 Pyomo(用于建模 MIP):

conda install -c conda-forge pyomo -y

安装 Gurobi(用于求解 MIP)

⚠️注意: Gurobi 是商业软件,需注册并获取 Academic License(免费用于学术用途)

官网-Gurobi Optimization。Gurobi的安装及 license 获取可参考我的另一博客-。

1. 安装 Gurobi

conda install -c gurobi gurobi -y

2. 设置 license(首次使用)

grbgetkey <your-license-key>grbgetkeyex: grbgetkey ae36ac20-16e6-acd2-f242-4da6e765fa0a

然后按提示操作。若你已有 gurobi.lic 文件,请放在 ~/.gurobi/ 或 C:\gurobi\ 目录下。
在这里插入图片描述


安装 OMLT(Optimization and Machine Learning Toolkit,优化与机器学习集成工具)

pip install omltconda install omlt

注意:OMLT 依赖 pyomo 和 onnx,会自动安装。


安装 ONNX(用于神经网络转模型)

pip install onnx onnxruntime

说明,onnx 是模型格式,onnxruntime 是运行时推理引擎


安装 Weights & Biases(可选,日志可视化工具)
wandb 是一个轻量级第三方 Python 包,不依赖底层 C/C++ 库,pip 安装非常稳定,适合用于 Conda 环境中。

官网-wandb,进去官网后注册即可获取个人API。

下载命令如下:

pip install wandb

输入以下命令,按照指示输入个人API即可。

wandb login

模型运行(完整Python代码)

import pickle
import torch
import os
import numpy as np
import numpy.random as rd
import pandas as pd
import pyomo.environ as pyo
import pyomo.kernel as pmo
from omlt import OmltBlock
from gurobipy import *
from omlt.neuralnet import NetworkDefinition, FullSpaceNNFormulation,ReluBigMFormulation
from omlt.io.onnx import write_onnx_model_with_bounds,load_onnx_neural_network_with_bounds
import tempfile
import torch.onnx
import torch.nn as nn
from copy import deepcopy
import wandb
from random_generator_battery import ESSEnv## define net
# 经验回放缓冲区 ReplayBuffer:用于存储 agent 与环境交互的轨迹,以供后续训练使用。
class ReplayBuffer:def __init__(self, max_len, state_dim, action_dim, gpu_id=0):self.now_len = 0self.next_idx = 0self.if_full = Falseself.max_len = max_lenself.data_type = torch.float32self.action_dim = action_dimself.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")other_dim = 1 + 1 + self.action_dimself.buf_other = torch.empty(size=(max_len, other_dim), dtype=self.data_type, device=self.device)if isinstance(state_dim, int):  # state is pixelself.buf_state = torch.empty((max_len, state_dim), dtype=torch.float32, device=self.device)elif isinstance(state_dim, tuple):self.buf_state = torch.empty((max_len, *state_dim), dtype=torch.uint8, device=self.device)else:raise ValueError('state_dim')# extend_buffer():添加新数据def extend_buffer(self, state, other):  # CPU array to CPU arraysize = len(other)next_idx = self.next_idx + sizeif next_idx > self.max_len:self.buf_state[self.next_idx:self.max_len] = state[:self.max_len - self.next_idx]self.buf_other[self.next_idx:self.max_len] = other[:self.max_len - self.next_idx]self.if_full = Truenext_idx = next_idx - self.max_lenself.buf_state[0:next_idx] = state[-next_idx:]self.buf_other[0:next_idx] = other[-next_idx:]else:self.buf_state[self.next_idx:next_idx] = stateself.buf_other[self.next_idx:next_idx] = otherself.next_idx = next_idx# sample_batch():采样 mini-batch(训练用)def sample_batch(self, batch_size) -> tuple:indices = rd.randint(self.now_len - 1, size=batch_size)r_m_a = self.buf_other[indices]return (r_m_a[:, 0:1],r_m_a[:, 1:2],r_m_a[:, 2:],self.buf_state[indices],self.buf_state[indices + 1])def update_now_len(self):self.now_len = self.max_len if self.if_full else self.next_idxclass Arguments:def __init__(self, agent=None, env=None):self.agent = agent  # Deep Reinforcement Learning algorithmself.env = env  # the environment for trainingself.cwd = None  # current work directory. None means set automaticallyself.if_remove = False  # remove the cwd folder? (True, False, None:ask me)self.visible_gpu = '0,1,2,3'  # for example: os.environ['CUDA_VISIBLE_DEVICES'] = '0, 2,'self.worker_num = 2  # rollout workers number pre GPU (adjust it to get high GPU usage)self.num_threads = 8  # cpu_num for evaluate model, torch.set_num_threads(self.num_threads)self.if_per_or_gae = False'''Arguments for training'''self.num_episode=3000self.gamma = 0.995  # discount factor of future rewardsself.learning_rate = 1e-4  # 2 ** -14 ~= 6e-5self.soft_update_tau = 1e-2  # 2 ** -8 ~= 5e-3self.net_dim = 64  # the network width 256self.batch_size = 256  # num of transitions sampled from replay buffer.self.repeat_times = 2 ** 3  # repeatedly update network to keep critic's loss smallself.target_step = 1000 # collect target_step experiences , then update network, 1024self.max_memo = 50000  # capacity of replay buffer## arguments for controlling explorationself.explorate_decay=0.99self.explorate_min=0.3'''Arguments for evaluate'''self.random_seed_list=[1234,2234,3234,4234,5234]# self.random_seed_list=[2234]self.run_name='MIP_DQN_experiments''''Arguments for save'''self.train=Trueself.save_network=Truedef init_before_training(self, if_main):if self.cwd is None:agent_name = self.agent.__class__.__name__self.cwd = f'./{agent_name}/{self.run_name}'if if_main:import shutil  # remove history according to bool(if_remove)if self.if_remove is None:self.if_remove = bool(input(f"| PRESS 'y' to REMOVE: {self.cwd}? ") == 'y')elif self.if_remove:shutil.rmtree(self.cwd, ignore_errors=True)print(f"| Remove cwd: {self.cwd}")os.makedirs(self.cwd, exist_ok=True)np.random.seed(self.random_seed)torch.manual_seed(self.random_seed)torch.set_num_threads(self.num_threads)torch.set_default_dtype(torch.float32)os.environ['CUDA_VISIBLE_DEVICES'] = str(self.visible_gpu)# control how many GPU is used  # 模型定义 Actor 网络(策略网络)
class Actor(nn.Module):def __init__(self,mid_dim,state_dim,action_dim):super().__init__()self.net=nn.Sequential(nn.Linear(state_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,action_dim))def forward(self,state):return self.net(state).tanh()# make the data from -1 to 1# 用于探索时加入噪声def get_action(self,state,action_std):#action=self.net(state).tanh()noise=(torch.randn_like(action)*action_std).clamp(-0.5,0.5)#return (action+noise).clamp(-1.0,1.0)# 模型定义 CriticQ 网络(双 Q 网络)
class CriticQ(nn.Module):def __init__(self,mid_dim,state_dim,action_dim):super().__init__()self.net_head=nn.Sequential(nn.Linear(state_dim+action_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU())self.net_q1=nn.Sequential(nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,1))# we get q1 valueself.net_q2=nn.Sequential(nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,1))# we get q2 valuedef forward(self,value):mid=self.net_head(value)return self.net_q1(mid)def get_q1_q2(self,value):mid=self.net_head(value)return self.net_q1(mid),self.net_q2(mid)class AgentBase:def __init__(self):self.state = Noneself.device = Noneself.action_dim = Noneself.if_off_policy = Noneself.explore_noise = Noneself.trajectory_list = Noneself.explore_rate = 1.0self.criterion = torch.nn.SmoothL1Loss()def init(self, net_dim, state_dim, action_dim, learning_rate=1e-4, _if_per_or_gae=False, gpu_id=0):self.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")self.action_dim = action_dimself.cri = self.ClassCri(net_dim, state_dim, action_dim).to(self.device)self.act = self.ClassAct(net_dim, state_dim, action_dim).to(self.device) if self.ClassAct else self.criself.cri_target = deepcopy(self.cri) if self.if_use_cri_target else self.criself.act_target = deepcopy(self.act) if self.if_use_act_target else self.actself.cri_optim = torch.optim.Adam(self.cri.parameters(), learning_rate)self.act_optim = torch.optim.Adam(self.act.parameters(),learning_rate) if self.ClassAct else self.cridel self.ClassCri, self.ClassActdef select_action(self, state) -> np.ndarray:states = torch.as_tensor((state,), dtype=torch.float32, device=self.device)action = self.act(states)[0]if rd.rand()<self.explore_rate:action = (action + torch.randn_like(action) * self.explore_noise).clamp(-1, 1)return action.detach().cpu().numpy()def explore_env(self, env, target_step):trajectory = list()state = self.statefor _ in range(target_step):action = self.select_action(state)state, next_state, reward, done, = env.step(action)trajectory.append((state, (reward, done, *action)))state = env.reset() if done else next_stateself.state = statereturn trajectory@staticmethoddef optim_update(optimizer, objective):optimizer.zero_grad()objective.backward()optimizer.step()@staticmethoddef soft_update(target_net, current_net, tau):for tar, cur in zip(target_net.parameters(), current_net.parameters()):tar.data.copy_(cur.data * tau + tar.data * (1.0 - tau))def save_or_load_agent(self, cwd, if_save):def load_torch_file(model_or_optim, _path):state_dict = torch.load(_path, map_location=lambda storage, loc: storage)model_or_optim.load_state_dict(state_dict)name_obj_list = [('actor', self.act), ('act_target', self.act_target), ('act_optim', self.act_optim),('critic', self.cri), ('cri_target', self.cri_target), ('cri_optim', self.cri_optim), ]name_obj_list = [(name, obj) for name, obj in name_obj_list if obj is not None]if if_save:for name, obj in name_obj_list:save_path = f"{cwd}/{name}.pth"torch.save(obj.state_dict(), save_path)else:for name, obj in name_obj_list:save_path = f"{cwd}/{name}.pth"load_torch_file(obj, save_path) if os.path.isfile(save_path) else Nonedef _update_exploration_rate(self,explorate_decay,explore_rate_min):self.explore_rate = max(self.explore_rate * explorate_decay, explore_rate_min)'''this function is used to update the explorate probability when select action'''# 模型定义 AgentMIPDQN 算法(继承基础类 AgentBase)
class AgentMIPDQN(AgentBase):def __init__(self):super().__init__()self.explore_noise = 0.5  # standard deviation of exploration noiseself.policy_noise = 0.2  # standard deviation of policy noiseself.update_freq = 2  # delay update frequencyself.if_use_cri_target = self.if_use_act_target = Trueself.ClassCri = CriticQself.ClassAct = Actor# update_net():更新策略网络和双 Q 网络def update_net(self, buffer, batch_size, repeat_times, soft_update_tau) -> tuple:buffer.update_now_len()obj_critic = obj_actor = Nonefor update_c in range(int(buffer.now_len / batch_size * repeat_times)):# we update too much time?obj_critic, state = self.get_obj_critic(buffer, batch_size)self.optim_update(self.cri_optim, obj_critic)action_pg = self.act(state)  # policy gradientobj_actor = -self.cri_target(torch.cat((state, action_pg),dim=-1)).mean()  # use cri_target instead of cri for stable trainingself.optim_update(self.act_optim, obj_actor)if update_c % self.update_freq == 0:  # delay updateself.soft_update(self.cri_target, self.cri, soft_update_tau)self.soft_update(self.act_target, self.act, soft_update_tau)return obj_critic.item() / 2, obj_actor.item()# get_obj_critic():获取目标 Q 值并计算 critic lossdef get_obj_critic(self, buffer, batch_size) -> (torch.Tensor, torch.Tensor):with torch.no_grad():reward, mask, action, state, next_s = buffer.sample_batch(batch_size)next_a = self.act_target.get_action(next_s, self.policy_noise)  # policy noise,next_q = torch.min(*self.cri_target.get_q1_q2(torch.cat((next_s, next_a),dim=-1)))  # twin criticsq_label = reward + mask * next_qq1, q2 = self.cri.get_q1_q2(torch.cat((state, action),dim=-1))obj_critic = self.criterion(q1, q_label) + self.criterion(q2, q_label)  # twin criticsreturn obj_critic, statedef update_buffer(_trajectory):ten_state = torch.as_tensor([item[0] for item in _trajectory], dtype=torch.float32)ary_other = torch.as_tensor([item[1] for item in _trajectory])ary_other[:, 0] = ary_other[:, 0]   # ten_rewardary_other[:, 1] = (1.0 - ary_other[:, 1]) * gamma  # ten_mask = (1.0 - ary_done) * gammabuffer.extend_buffer(ten_state, ary_other)_steps = ten_state.shape[0]_r_exp = ary_other[:, 0].mean()  # other = (reward, mask, action)return _steps, _r_expdef get_episode_return(env, act, device):'''get information of one episode during the training'''episode_return = 0.0  # sum of rewards in an episodeepisode_unbalance=0.0episode_operation_cost=0.0state = env.reset()for i in range(24):s_tensor = torch.as_tensor((state,), device=device)a_tensor = act(s_tensor)action = a_tensor.detach().cpu().numpy()[0]  # not need detach(), because with torch.no_grad() outsidestate, next_state, reward, done,= env.step(action)state=next_stateepisode_return += rewardepisode_unbalance+=env.real_unbalanceepisode_operation_cost+=env.operation_costif done:breakreturn episode_return,episode_unbalance,episode_operation_cost# 网络导出与 MIP 调用模块 Actor_MIP
# 用于将训练好的 Q 网络转换成 ONNX + OMLT + Pyomo 表达形式,并结合 Gurobi 求解最优动作
class Actor_MIP:'''this actor is used to get the best action and Q function, the only input should be batch tensor state, action, and network, while the output should bebatch tensor max_action, batch tensor max_Q'''def __init__(self,scaled_parameters,batch_size,net,state_dim,action_dim,env,constrain_on=False):self.batch_size = batch_sizeself.net = netself.state_dim = state_dimself.action_dim =action_dimself.env = envself.constrain_on=constrain_onself.scaled_parameters=scaled_parametersdef get_input_bounds(self,input_batch_state):batch_size = self.batch_sizebatch_input_bounds = []lbs_states = input_batch_state.detach().numpy()ubs_states = lbs_statesfor i in range(batch_size):input_bounds = {}for j in range(self.action_dim + self.state_dim):if j < self.state_dim:input_bounds[j] = (float(lbs_states[i][j]), float(ubs_states[i][j]))else:input_bounds[j] = (float(-1), float(1))batch_input_bounds.append(input_bounds)return batch_input_boundsdef predict_best_action(self, state):state=state.detach().cpu().numpy()v1 = torch.zeros((1, self.state_dim+self.action_dim), dtype=torch.float32)'''this function is used to get the best action based on current net'''model = self.net.to('cpu')input_bounds = {}lb_state = stateub_state = statefor i in range(self.action_dim + self.state_dim):if i < self.state_dim:input_bounds[i] = (float(lb_state[0][i]), float(ub_state[0][i]))else:input_bounds[i] = (float(-1), float(1))with tempfile.NamedTemporaryFile(suffix='.onnx', delete=False) as f:# export neural network to ONNXtorch.onnx.export(model,v1,f,input_names=['state_action'],output_names=['Q_value'],dynamic_axes={'state_action': {0: 'batch_size'},'Q_value': {0: 'batch_size'}})# write ONNX model and its bounds using OMLTwrite_onnx_model_with_bounds(f.name, None, input_bounds)# load the network definition from the ONNX modelnetwork_definition = load_onnx_neural_network_with_bounds(f.name)# global optimalityformulation = ReluBigMFormulation(network_definition)m = pyo.ConcreteModel()m.nn = OmltBlock()m.nn.build_formulation(formulation)'''# we are now building the surrogate model between action and state'''# constrain for battery,if self.constrain_on:m.power_balance_con1 = pyo.Constraint(expr=((-m.nn.inputs[7] * self.scaled_parameters[0])+\((m.nn.inputs[8] * self.scaled_parameters[1])+m.nn.inputs[4]*self.scaled_parameters[5]) +\((m.nn.inputs[9] * self.scaled_parameters[2])+m.nn.inputs[5]*self.scaled_parameters[6]) +\((m.nn.inputs[10] * self.scaled_parameters[3])+m.nn.inputs[6]*self.scaled_parameters[7])>=\m.nn.inputs[3] *self.scaled_parameters[4]-self.env.grid.exchange_ability))m.power_balance_con2 = pyo.Constraint(expr=((-m.nn.inputs[7] * self.scaled_parameters[0])+\(m.nn.inputs[8] * self.scaled_parameters[1]+m.nn.inputs[4]*self.scaled_parameters[5]) +\(m.nn.inputs[9] * self.scaled_parameters[2]+m.nn.inputs[5]*self.scaled_parameters[6]) +\(m.nn.inputs[10] * self.scaled_parameters[3]+m.nn.inputs[6]*self.scaled_parameters[7])<=\m.nn.inputs[3] *self.scaled_parameters[4]+self.env.grid.exchange_ability))m.obj = pyo.Objective(expr=(m.nn.outputs[0]), sense=pyo.maximize)pyo.SolverFactory('gurobi').solve(m, tee=False)best_input = pyo.value(m.nn.inputs[:])best_action = (best_input[self.state_dim::])return best_action# define test function
if __name__ == '__main__':args = Arguments()'''here record real unbalance'''reward_record = {'episode': [], 'steps': [], 'mean_episode_reward': [], 'unbalance': [],'episode_operation_cost': []}loss_record = {'episode': [], 'steps': [], 'critic_loss': [], 'actor_loss': [], 'entropy_loss': []}args.visible_gpu = '2'for seed in args.random_seed_list:args.random_seed = seed# set different seedargs.agent = AgentMIPDQN()agent_name = f'{args.agent.__class__.__name__}'args.agent.cri_target = Trueargs.env = ESSEnv()args.init_before_training(if_main=True)'''init agent and environment'''agent = args.agentenv = args.envagent.init(args.net_dim, env.state_space.shape[0], env.action_space.shape[0], args.learning_rate,args.if_per_or_gae)'''init replay buffer'''buffer = ReplayBuffer(max_len=args.max_memo, state_dim=env.state_space.shape[0],action_dim=env.action_space.shape[0])'''start training'''cwd = args.cwdgamma = args.gammabatch_size = args.batch_size  # how much data should be used to update nettarget_step = args.target_step  # how manysteps of one episode should stoprepeat_times = args.repeat_times  # how many times should update for one batch size datasoft_update_tau = args.soft_update_tauagent.state = env.reset()'''collect data and train and update network'''num_episode = args.num_episodeargs.train=Trueargs.save_network=True# 自动记录每集的 reward、loss、unbalance 等wandb.init(project='MIP_DQN_experiments',name=args.run_name,settings=wandb.Settings(start_method="fork"))wandb.config = {"epochs": num_episode,"batch_size": batch_size}wandb.define_metric('custom_step')if args.train:collect_data = Truewhile collect_data:print(f'buffer:{buffer.now_len}')with torch.no_grad():trajectory = agent.explore_env(env, target_step)steps, r_exp = update_buffer(trajectory)buffer.update_now_len()if buffer.now_len >= 10000:collect_data = Falsefor i_episode in range(num_episode):critic_loss, actor_loss = agent.update_net(buffer, batch_size, repeat_times, soft_update_tau)wandb.log({'critic loss':critic_loss,'custom_step':i_episode})wandb.log({'actor loss': actor_loss,'custom_step':i_episode})loss_record['critic_loss'].append(critic_loss)loss_record['actor_loss'].append(actor_loss)with torch.no_grad():episode_reward, episode_unbalance, episode_operation_cost = get_episode_return(env, agent.act,agent.device)wandb.log({'mean_episode_reward': episode_reward,'custom_step':i_episode})wandb.log({'unbalance':episode_unbalance,'custom_step':i_episode})wandb.log({'episode_operation_cost':episode_operation_cost,'custom_step':i_episode})reward_record['mean_episode_reward'].append(episode_reward)reward_record['unbalance'].append(episode_unbalance)reward_record['episode_operation_cost'].append(episode_operation_cost)print(f'curren epsiode is {i_episode}, reward:{episode_reward},unbalance:{episode_unbalance},buffer_length: {buffer.now_len}')if i_episode % 10 == 0:# target_stepwith torch.no_grad():agent._update_exploration_rate(args.explorate_decay,args.explorate_min)trajectory = agent.explore_env(env, target_step)steps, r_exp = update_buffer(trajectory)wandb.finish()if args.update_training_data:loss_record_path = f'{args.cwd}/loss_data.pkl'reward_record_path = f'{args.cwd}/reward_data.pkl'with open(loss_record_path, 'wb') as tf:pickle.dump(loss_record, tf)with open(reward_record_path, 'wb') as tf:pickle.dump(reward_record, tf)act_save_path = f'{args.cwd}/actor.pth'cri_save_path = f'{args.cwd}/critic.pth'print('training data have been saved')if args.save_network:# 模型保存与结果存储torch.save(agent.act.state_dict(), act_save_path)torch.save(agent.cri.state_dict(), cri_save_path)print('training finished and actor and critic parameters have been saved')

参数设置函数:Parameters

1、电池参数(battery_parameters)

battery_parameters = {'capacity': 500,           # 电池总容量(kWh)'max_charge': 100,         # 最大充电功率(kW)'max_discharge': 100,      # 最大放电功率(kW)'efficiency': 0.9,         # 充放电效率(90%)'degradation': 0,          # 电池退化成本(€/kW,未启用)'max_soc': 0.8,            # 最大SOC(80%)'min_soc': 0.2,            # 最小SOC(20%)'initial_capacity': 0.2    # 初始SOC(20%)
}

参数解释:

参数名含义用途
capacity电池的总电量容量(单位 kWh)用于计算 SOC 的绝对值
max_charge每小时最大充电功率(kW)限制动作空间中充电方向的动作上限
max_discharge每小时最大放电功率(kW)限制动作空间中放电方向的动作下限
efficiency充/放电的能量转换效率影响 SOC 更新公式,通常 < 1
degradation每单位放电引起的电池退化成本可选项,未启用
max_socSOC 最大值(占比)建模约束,防止过充(如 0.8×500 kWh)
min_socSOC 最小值(占比)建模约束,防止过放
initial_capacity初始时刻的 SOC(占比)环境初始化状态使用

在 MIP-DQN 中的作用:

  • 在状态空间中,SOC 是环境的一个组成变量;
  • 在动作空间中,充放电功率是 agent 决策的一部分;
  • 在约束中,必须满足:
    • min_soc × capacity ≤ SOC_t ≤ max_soc × capacity
    • -max_discharge ≤ P_battery_t ≤ max_charge
    • SOC_{t+1} = SOC_t + η × P_battery_t × Δt / capacity

2、发电机参数(dg_parameters

结构是一个字典嵌套字典,每一个 key (如 'gen_1')代表一个 DG 单元,value 是该 DG 的参数。

dg_parameters = {'gen_1': {...},'gen_2': {...},'gen_3': {...}
}

参数结构(以 gen_1 为例):

{'a': 0.0034,         # 成本函数二次项系数'b': 3,              # 成本函数一次项系数'c': 30,             # 成本函数常数项'd': 0.03,           # 热电参数(未使用)'e': 4.2,'f': 0.031,'power_output_max': 150,   # 最大出力(kW)'power_output_min': 0,     # 最小出力(kW)'heat_output_max': None,   # 若为热电联产系统使用(未启用)'heat_output_min': None,'ramping_up': 100,         # 每小时最大爬坡(上升)能力(kW/h)'ramping_down': 100,       # 每小时最大爬坡(下降)能力(kW/h)'min_up': 2,               # 最小连续开机时间(小时)'min_down': 1              # 最小连续关机时间(小时)
}

成本函数定义:

发电成本函数为二次型:

C_{DG}(P) = a × P^2 + b × P + c

gen_1 为例:

C_{DG_1}(P) = 0.0034 × P^2 + 3 × P + 30

这在论文中公式 (2) 中有体现。

约束相关参数:

参数名含义用途
power_output_max/min发电出力上下限控制动作空间边界
ramping_up/down爬坡约束限制连续两个时刻出力差值
min_up/down连续开/关机约束状态转换约束(若考虑启停状态)

注意:最小启停时间在本文中未显式建模,若要考虑,需要引入二进制变量进行建模(Unit Commitment 问题)。

参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/89621.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/89621.shtml
英文地址,请注明出处:http://en.pswp.cn/web/89621.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序 wx.request() 的封装

基于微信小程序的wx.request()方法封装下面是一个封装方案&#xff0c;满足您提出的所有要求&#xff1a;class HttpService {constructor() {this.baseUrl ; // 基础URLthis.pendingRequests new Map(); // 请求缓存池this.interceptors {request: [],response: []};}// 设…

yolo8实时识别目标(和平精英敌人+骨骼关键点)

现在需要识别人物的肢体&#xff08;姿态/骨骼关键点&#xff09;&#xff0c;即所谓的「姿态估计&#xff08;pose estimation&#xff09;」&#xff0c;以下是一些主流、训练好可直接使用的开源模型推荐&#xff0c;支持多人识别与骨骼关键点检测&#xff0c;适合你后续用于…

MyBatis动态SQL全解析:五大核心标签实战指南

MyBatis动态SQL全解析&#xff1a;五大核心标签实战指南 一、动态SQL的价值&#xff1a;告别硬编码时代 传统SQL拼接的痛点 // 传统方式需要手动拼接SQL字符串 StringBuilder sql new StringBuilder("SELECT * FROM orders WHERE 11"); if (status ! null) {sql.app…

线上 CPU 过高怎么排查

通过以下几个命令解决1、top命令&#xff0c;找到 CPU 过高的pid(进程); ​编辑 2、根据pid(进程)找到CPU过高的线程id;top -H -p pid(进程)3、把线程id转换16 进制的printf 0x%x\n 线程id4、导致CPU 飙升的线程异常信息&#xff0c;-A 30表示打印 30 行记录jstack pid(进程id)…

Letter Combination of a Phone Number

IntroduceProblem Analysis (Using “258” as example) //2 a b c //5 j k l //8 t u vPossible letter combinations: a, j, t (no further options, this is one combination)a, j, u (no further options, another combination)a, j, v (another c…

【问题解决】npm包下载速度慢

问题描述&#xff1a; npm包下载速度慢 问题原因&#xff1a; 为什么下载 npm 包速度慢&#xff1f; 在使用npm下包的时候&#xff0c;默认从国外的https://regitry.npmjs.org/服务器进行下载。此时&#xff0c;网络数据的传输需要经过漫长的海底光缆&#xff0c;因此下包速度…

Apache DolphinScheduler介绍与部署

目录 一、软件介绍 1、软件概述 2、发展历史 3、名词解释 4、模块介绍 软件部署 1、下载发布包 2、上传与解压 3、启动 4、浏览器验证 一、软件介绍 1、软件概述 Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景&…

Selenium 启动的浏览器自动退出问题分析

当 Selenium 启动的浏览器自动关闭时&#xff0c;通常是由于以下原因导致的&#xff1a;1. 脚本执行完毕原因&#xff1a;Selenium 脚本执行到末尾时&#xff0c;如果没有保持浏览器打开的代码&#xff08;如time.sleep()或循环&#xff09;&#xff0c;浏览器会自动关闭。解决…

rust实现的快捷补全到剪贴板的实用工具

最近在兼职项目中老是遇到这样的场景&#xff1a; 在云服务器之间通过scp命令传输文件&#xff0c;密码太长记不住(客户服务器不方便ssh-copy-id)在服务器上使用mysql命令登录修改数据&#xff0c;数据库密码太长记不住&#xff08;客户设置的密码&#xff0c;直接改掉哈&#…

信息系统风险的安全技术防范思路

针对信息系统风险的安全技术防范思路 降低风险&#xff0c;即提升了安全能力和水平 保护资产 加强信息系统软硬件及数据安全保护&#xff1b;减少脆弱性 通过研发、部署、应用各环节来尽量减少或避免脆弱性&#xff1b;应对威胁 采取防御措施&#xff0c;实施攻防对抗。

Java项目:基于SSM框架实现的网盘管理系统【ssm+B/S架构+源码+数据库+毕业论文】

摘 要 网络技术和计算机技术发展至今&#xff0c;已经拥有了深厚的理论基础&#xff0c;并在现实中进行了充分运用&#xff0c;尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代&#xff0c;所以对于信息的宣传和管理就很关键。因此文件信息的管理…

Echart 地图放大缩小

文章目录 常用方法 1. **开启 `roam` 属性** 2. **通过鼠标滚轮或手势缩放** 3. **设置初始缩放比例** 4. **通过按钮控制缩放** 5. **限制缩放范围** 6. **监听缩放和平移事件** 7. **结合 `dataZoom` 实现数据缩放** 总结 相关文章 在 ECharts 中,可以通过设置地图的 roam …

针对VMware虚拟化环境迁移的复杂场景,我将从技术架构、迁移方案、代码实现、可视化流程四个维度进行专业解析,并提供完整的解决方案框架。

针对VMware虚拟化环境迁移的复杂场景&#xff0c;我将从技术架构、迁移方案、代码实现、可视化流程四个维度进行专业解析&#xff0c;并提供完整的解决方案框架。一、技术架构分析&#xff08;架构图表格对比&#xff09;graph TDA[源环境] -->|vMotion| B[目标环境]A -->…

揭秘 AIGC 背后的技术:GPT、BERT 与 Transformer 模型的工作原理

一、引言AIGC 的崛起与重要性人工智能生成内容&#xff08;AIGC&#xff09;已经不再是未来的技术&#xff0c;它正以惊人的速度渗透到各行各业&#xff0c;重新定义了内容创作、媒体生产、甚至人类认知的边界。从深度学习到大规模自然语言处理&#xff0c;AIGC 的崛起代表着一…

Compose笔记(三十五)--ModalBottomSheetLayout

这一节主要了解一下Compose中的ModalBottomSheetLayout&#xff0c;在Jetpack Compose开发中&#xff0c;ModalBottomSheetLayout是Material Design组件库中用于实现模态底部面板的核心组件&#xff0c;其核心作用是通过声明式API管理底部面板的显示、隐藏及交互逻辑。API Moda…

AWS Partner: Accreditation (Technical)

AWS Partner: Accreditation &#xff08;Technical&#xff09;AWS 核心技术简介云计算的优势AWS 全球基础设施核心技术&#xff1a;计算 Amazon Elastic Compute Cloud (Amazon EC2)存储数据库联网安全性从服务到解决方案解决方案设计简介迁移策略架构最佳实践AWS Well-Archi…

【52】MFC入门到精通——(CComboBox)下拉框选项顺序与初始化不一致,默认显示项也不一致

文章目录1 问题描述2 问题分析与解决上一讲&#xff0c;我们实现了MFC串口助手初级版。 MFC入门到精通——MFC串口助手(一)—初级版&#xff08;串口设置、初始化、打开/关闭、状态显示&#xff09;,附源码1 问题描述 程序运行后串口默认参数&#xff0c;与我们预期不完全一致…

Astro:前端性能革命!从原生 HTML 到 Astro + React 的升级指南

为什么程序员必须关注 Astro在网站性能和 SEO 日益关键的今天&#xff0c;静态站点生成&#xff08;SSG&#xff09;再次成为焦点。Astro 作为一款专为内容驱动网站设计的现代前端框架&#xff0c;正引领一场轻盈革命。它强调服务器优先渲染&#xff0c;将页面预先转为纯 HTML&…

格式转换Total Excel Converter:20 种格式XLS XLSX 批量转 PDFWord

各位办公小能手们&#xff01;今天给大家介绍一款超厉害的软件&#xff0c;叫Total Excel Converter&#xff0c;软件下载地址安装包 它可是专业的Excel文件格式转换工具。你知道吗&#xff0c;它能把Excel工作簿&#xff0c;像XLS、XLSX、XLSM这些格式&#xff0c;批量转换成…

Thread,ThreadLocal,ThreadLocalMap 三者的关系, 以及在实际开发中的应用【AI记录用】

在 Java 多线程编程中&#xff0c;Thread、ThreadLocal 和 ThreadLocalMap 是三个紧密相关的类&#xff0c;它们共同构成了 Java 中**线程本地变量&#xff08;Thread-Local Storage&#xff09;**机制的基础。下面我将从 三者的关系、实现原理 以及 实际开发中的应用 三个方面…