"""Integrated System Model: Ground Source Heat Pump Boiler (GSHPB).
This system class orchestrates the dynamic interaction between distinct thermodynamic
sub-components to simulate the overall heating performance. While implemented as an
integrated model, its physical calculations represent the behavior of:
1. **Refrigerant Cycle (Vapor-Compression):**
Evaluates thermodynamic states using CoolProp, enforcing superheat/subcool margins.
2. **Heat Pump Compressor:**
Models the compression process using isentropic and volumetric efficiencies to compute
the actual discharge enthalpy and mass flow rate. The compressor power is determined
from the enthalpy difference and the mass flow rate.
3. **Expansion Valve:**
Modeled as an isenthalpic expansion device (constant enthalpy) that throttles the
refrigerant from the condensing pressure down to the evaporating pressure.
4. **Heat Exchangers (Condenser & Evaporator):**
- **Condenser:** Placed inside the hot-water tank (hydronic), utilizing a static
overall heat transfer coefficient (UA_cond_design).
- **Evaporator:** Coupled to a borehole heat exchanger (BHE) fluid loop, acting as
a secondary heat exchanger to absorb heat from the circulating ground fluid.
5. **Thermal Storage Tank:**
Modeled with lumped-capacitance and DHW mixing logic.
6. **Ground Source Heat Exchanger (Borefield):**
A dynamic multi-borehole simulation using pygfunction-based g-functions. It tracks
the transient thermal response of the ground, enabling robust modeling of long-term
ground temperature drift due to continuous heat extraction.
At each time step the model finds the minimum-power operating point via 1D Brent
optimization over the evaporator approach temperature difference, while the
condenser temperature is solved analytically.
.. note::
이 통합 시스템의 작동 원리와 개별 서브 컴포넌트 모델링에 대한 상세 이론적 배경은
:doc:`/theory/systems/gshp_boiler` 및 :doc:`/theory/components/index` 를 참조하세요.
"""
from __future__ import annotations
import math
from typing import TYPE_CHECKING, Any, Callable
import CoolProp.CoolProp as CP
import numpy as np
import pandas as pd
from tqdm import tqdm
from . import calc_util as cu
from .constants import c_w, k_w, mu_w, rho_w
from .dynamic_context import (
ControlState,
StepContext,
determine_heat_source_on_off,
determine_tank_refill_flow,
tank_mass_energy_residual,
)
from .enex_functions import (
calc_mixing_valve_flows,
calc_mixing_valve_temp,
)
from .heat_transfer import calc_simple_tank_UA
from .refrigerant import calc_ref_state
from .thermodynamics import calc_exergy_flow
from .g_function import precompute_gfunction
if TYPE_CHECKING:
from .subsystems import SolarThermalCollector
[docs]
class GroundSourceHeatPumpBoiler:
"""Ground source heat pump boiler with BHE and lumped-tank model.
The refrigerant cycle is resolved via CoolProp with user-specified
superheat / subcool margins. An optimizer minimises total cycle
electrical input subject to NTU-based evaporator constraints and
analytical condenser temperature relations.
"""
[docs]
def __init__(
self,
# 1. Refrigerant / cycle / compressor
refrigerant: str = "R410A",
V_disp_cmp: float = 0.0005,
eta_cmp_isen: float | Callable | None = None,
eta_cmp_vol: float | Callable | None = None,
eta_cmp_mech: float | Callable = 0.855,
# 2. Heat exchanger UA
UA_cond_design: float = 500,
UA_evap_design: float = 500,
# 3. Tank / control / load
T0: float = 0.0,
Ts: float = 16.0,
T_tank_w_upper_bound: float = 65.0,
T_tank_w_lower_bound: float = 55.0,
T_mix_w_out: float = 40.0,
T_tank_w_in: float = 15.0,
hp_capacity: float = 8000.0,
dV_mix_w_out_max: float = 0.0001,
# Tank / insulation
r0: float = 0.2,
H: float = 0.8,
x_shell: float = 0.01,
x_ins: float = 0.05,
k_shell: float = 25,
k_ins: float = 0.03,
h_o: float = 15,
# 4. Borehole heat exchanger (Field + Params)
N_1: int = 1,
N_2: int = 1,
B: float = 6.0,
D_b: float = 0,
H_b: float = 200,
r_b: float = 0.08,
R_b: float | None = None,
k_g: float = 1.5,
k_p: float = 0.4,
r_out: float = 0.016,
r_in: float = 0.013,
D_s: float = 0.025,
dV_b_f_lpm: float = 24,
k_s: float = 2.0,
c_s: float = 800,
rho_s: float = 2000,
E_pmp: float = 200,
# 6. Superheat / subcool
dT_superheat: float = 3.0,
dT_subcool: float = 3.0,
# 7. Tank fluid limits
tank_always_full: bool = True,
prevent_simultaneous_flow: bool = False,
tank_level_lower_bound: float = 0.5,
tank_level_upper_bound: float = 1.0,
dV_tank_w_in_refill: float = 0.001,
# 8. Operation Schedule
hp_on_schedule: list[tuple[float, float]] | None = None,
# 9. Subsystems
stc: SolarThermalCollector | None = None,
pv=None,
uv=None,
# 10. Simulation scope (for precomputing g-functions)
t_max_s: float = 8760 * 3600,
dt_s: float = 3600,
boundary_condition: str = "uniform_temperature",
) -> None:
self.tank_physical = {
"r0": r0,
"H": H,
"x_shell": x_shell,
"x_ins": x_ins,
"k_shell": k_shell,
"k_ins": k_ins,
"h_o": h_o,
}
self.UA_tank = calc_simple_tank_UA(**self.tank_physical)
self.V_tank_full: float = math.pi * r0**2 * H
self.C_tank = c_w * rho_w * self.V_tank_full
self.ref = refrigerant
self.V_disp_cmp = V_disp_cmp
self.eta_cmp_isen = eta_cmp_isen
self.eta_cmp_vol = eta_cmp_vol
self.eta_cmp_mech = eta_cmp_mech
self.UA_cond = UA_cond_design
self.UA_evap = UA_evap_design
self.T0_K = cu.C2K(T0)
self.Ts = Ts
self.Ts_K = cu.C2K(self.Ts)
self.T_bhe_f_in = Ts
self.T_bhe_f_in_K = self.Ts_K
self.hp_capacity = hp_capacity
self.hp_on_schedule = hp_on_schedule or [(0.0, 24.0)]
self.dV_mix_w_out_max = dV_mix_w_out_max
self.T_tank_w_upper_bound = T_tank_w_upper_bound
self.T_tank_w_lower_bound = T_tank_w_lower_bound
self.T_mix_w_out = T_mix_w_out
self.T_mix_w_out_K = cu.C2K(T_mix_w_out)
self.T_tank_w_in = T_tank_w_in
self.T_tank_w_in_K = cu.C2K(T_tank_w_in)
self.tank_always_full = tank_always_full
self.prevent_simultaneous_flow = prevent_simultaneous_flow
self.tank_level_lower_bound = tank_level_lower_bound
self.tank_level_upper_bound = tank_level_upper_bound
self.dV_tank_w_in_refill = dV_tank_w_in_refill
self.dT_superheat = dT_superheat
self.dT_subcool = dT_subcool
# BHE properties
self.N_1 = N_1
self.N_2 = N_2
self.B = B
self.D_b = D_b
self.H_b = H_b
self.r_b = r_b
self.k_s = k_s
self.c_s = c_s
self.rho_s = rho_s
self.alp_s = k_s / (c_s * rho_s)
self.E_pmp = E_pmp
self.dV_b_f_m3s = dV_b_f_lpm * cu.L2m3 / cu.m2s
if R_b is None:
from .g_function import calc_local_borehole_thermal_resistance, calc_effective_borehole_thermal_resistance
n_boreholes = max(1, self.N_1 * self.N_2)
m_flow_total = self.dV_b_f_m3s * rho_w
m_flow_pipe = m_flow_total / n_boreholes
R_b_local, R_a = calc_local_borehole_thermal_resistance(
k_s=self.k_s,
k_g=k_g,
k_p=k_p,
r_b=self.r_b,
r_out=r_out,
r_in=r_in,
D_s=D_s,
m_flow_pipe=m_flow_pipe,
rho_f=rho_w,
mu_f=mu_w,
cp_f=c_w,
k_f=k_w,
)
self.R_b = calc_effective_borehole_thermal_resistance(
R_b=R_b_local,
R_a=R_a,
H=self.H_b,
m_flow_pipe=m_flow_pipe,
cp_f=c_w,
boundary_condition=boundary_condition,
)
else:
self.R_b = R_b
# Subsystems
self.stc = stc
self.pv = pv
self._subsystems: dict[str, Any] = {}
if stc is not None:
self._subsystems["stc"] = stc
if pv is not None:
self._subsystems["pv"] = pv
if uv is not None:
self._subsystems["uv"] = uv
self.Q_cond_LOAD_OFF_TOL: float = 50.0 # W
# Precompute g-function
self.dt_s: float = dt_s
self._gfunc_interp = precompute_gfunction(
N_1=N_1, N_2=N_2, B=B, H_b=H_b, D_b=D_b, r_b=r_b, alpha_s=self.alp_s, k_s=k_s, t_max_s=t_max_s, dt_s=dt_s
)
# Simulation state tracking (dynamically updated in analyze_dynamic)
self.time: np.ndarray = np.array([])
self.dt: float = dt_s
self._opt_evals: int = 0
self.T_bhe_f: float = self.Ts
self.T_bhe: float = self.Ts
self.T_bhe_f_out: float = self.Ts
self.T_bhe_f_out_K: float = self.Ts_K
self.Q_bhe: float = 0.0
# NOTE: Removed self.dV_mix_w_out, self.dV_tank_w_in, self.dV_mix_sup_w_in
# They will be passed inside `flow_state: dict`.
@staticmethod
def _calc_tank_flow_context(
dV_mix_w_out: float,
T_tank_w_K: float,
T_tank_w_in_K: float,
T_mix_w_out_K: float,
dV_tank_w_in_override: float | None = None,
) -> dict:
mix_state = calc_mixing_valve_temp(T_tank_w_K, T_tank_w_in_K, T_mix_w_out_K)
flows = calc_mixing_valve_flows(dV_mix_w_out, mix_state["alp"])
dV_tank_w_out = flows["dV_hot_in"]
dV_tank_w_in = dV_tank_w_out if dV_tank_w_in_override is None else dV_tank_w_in_override
return {
"alp": mix_state["alp"],
"dV_mix_w_out": dV_mix_w_out,
"dV_tank_w_out": dV_tank_w_out,
"dV_tank_w_in": dV_tank_w_in,
"dV_mix_sup_w_in": flows["dV_cold_in"],
}
def _calc_off_state(self, T_tank_w: float, T0: float, flow_state: dict) -> dict:
T_tank_w_K = cu.C2K(T_tank_w)
mix = calc_mixing_valve_temp(T_tank_w_K, self.T_tank_w_in_K, self.T_mix_w_out_K)
# Bound temperatures for PropsSI to prevent crashes when tank overheats
# R410A critical temp is ~344.49K (71.3 °C)
T_cond_K_calc = min(max(T_tank_w_K, 250.0), 340.0)
T_evap_K_calc = min(max(self.T_bhe_f_in_K, 250.0), 340.0)
P_ref_evap_sat = CP.PropsSI("P", "T", T_evap_K_calc, "Q", 1, self.ref)
h_ref_evap_sat = CP.PropsSI("H", "P", P_ref_evap_sat, "Q", 1, self.ref)
s_ref_evap_sat = CP.PropsSI("S", "P", P_ref_evap_sat, "Q", 1, self.ref)
P_ref_cond_sat = CP.PropsSI("P", "T", T_cond_K_calc, "Q", 0, self.ref)
h_ref_cond_sat_l = CP.PropsSI("H", "P", P_ref_cond_sat, "Q", 0, self.ref)
s_ref_cond_sat_l = CP.PropsSI("S", "P", P_ref_cond_sat, "Q", 0, self.ref)
return {
"hp_is_on": False,
"converged": True,
"T_tank_w [°C]": T_tank_w,
"T0 [°C]": T0,
"T_mix_w_out [°C]": cu.K2C(mix["T_mix_w_out_K"]),
"T_tank_w_in [°C]": self.T_tank_w_in,
"Ts [°C]": self.Ts,
"T_bhe [°C]": getattr(self, "T_bhe", self.Ts),
"T_bhe_f [°C]": getattr(self, "T_bhe_f", self.Ts),
"T_bhe_f_in [°C]": cu.K2C(getattr(self, "T_bhe_f_in_K", self.Ts_K)),
"T_bhe_f_out [°C]": cu.K2C(getattr(self, "T_bhe_f_out_K", self.Ts_K)),
"T_ref_evap_sat [°C]": cu.K2C(self.T_bhe_f_in_K),
"T_ref_cond_sat_v [°C]": T_tank_w,
"T_ref_cond_sat_l [°C]": T_tank_w,
"T_ref_cmp_in [°C]": cu.K2C(self.T_bhe_f_in_K),
"T_ref_cmp_out [°C]": T_tank_w,
"T_ref_exp_in [°C]": T_tank_w,
"T_ref_exp_out [°C]": cu.K2C(self.T_bhe_f_in_K),
"T_cond [°C]": T_tank_w,
"dV_mix_w_out [m3/s]": flow_state.get("dV_mix_w_out", 0.0),
"dV_tank_w_in [m3/s]": flow_state.get("dV_tank_w_in", 0.0),
"dV_tank_w_out [m3/s]": flow_state.get("dV_tank_w_out", 0.0),
"dV_mix_sup_w_in [m3/s]": flow_state.get("dV_mix_sup_w_in", 0.0),
"dV_bhe_f [m3/s]": self.dV_b_f_m3s,
"P_ref_cmp_in [Pa]": P_ref_evap_sat,
"P_ref_cmp_out [Pa]": P_ref_cond_sat,
"P_ref_exp_in [Pa]": P_ref_cond_sat,
"P_ref_exp_out [Pa]": P_ref_evap_sat,
"P_ref_evap_sat [Pa]": P_ref_evap_sat,
"P_ref_cond_sat_v [Pa]": P_ref_cond_sat,
"P_ref_cond_sat_l [Pa]": P_ref_cond_sat,
"h_ref_cmp_in [J/kg]": h_ref_evap_sat,
"h_ref_cmp_out [J/kg]": h_ref_evap_sat,
"h_ref_exp_in [J/kg]": h_ref_cond_sat_l,
"h_ref_exp_out [J/kg]": h_ref_cond_sat_l,
"h_ref_evap_sat [J/kg]": h_ref_evap_sat,
"h_ref_cond_sat_v [J/kg]": h_ref_evap_sat,
"h_ref_cond_sat_l [J/kg]": h_ref_cond_sat_l,
"s_ref_cmp_in [J/(kg·K)]": s_ref_evap_sat,
"s_ref_cmp_out [J/(kg·K)]": s_ref_evap_sat,
"s_ref_exp_in [J/(kg·K)]": s_ref_cond_sat_l,
"s_ref_exp_out [J/(kg·K)]": s_ref_cond_sat_l,
"x_ref_cmp_in [J/kg]": 0.0,
"x_ref_cmp_out [J/kg]": 0.0,
"x_ref_exp_in [J/kg]": 0.0,
"x_ref_exp_out [J/kg]": 0.0,
"Q_bhe [W]": 0.0,
"Q_ref_cond [W]": 0.0,
"Q_ref_evap [W]": 0.0,
"Q_cond_load [W]": 0.0,
"E_cmp [W]": 0.0,
"E_pmp [W]": 0.0,
"E_tot [W]": 0.0,
"cop_ref [-]": np.nan,
"cop_sys [-]": np.nan,
"m_dot_ref [kg/s]": 0.0,
"cmp_rpm [rpm]": 0.0,
}
def _calc_state(
self, dT_ref_evap: float, T_tank_w: float, Q_cond_load: float, T0: float, *, flow_state: dict
) -> dict | None:
if Q_cond_load <= 0:
return self._calc_off_state(T_tank_w, T0, flow_state)
import inspect
def _eval_eff(eff, r_p, rps) -> float:
if eff is None:
return 1.0
if callable(eff):
sig = inspect.signature(eff)
if len(sig.parameters) == 2:
return float(eff(r_p, rps))
return float(eff(r_p))
return float(eff)
# 1. Analytical Condenser Approach Temperature
dT_ref_cond = Q_cond_load / self.UA_cond
T_tank_w_K = cu.C2K(T_tank_w)
_t_bhe = getattr(self, "T_bhe_f_out_K", None)
T_b_out_K = float(_t_bhe) if _t_bhe is not None else cu.C2K(15.0)
m_dot_cp_b = self.dV_b_f_m3s * rho_w * c_w
T_evap_in_K = T_b_out_K + (self.E_pmp / m_dot_cp_b)
T_ref_evap_sat_K = T_evap_in_K - dT_ref_evap
T_ref_cond_sat_K = T_tank_w_K + dT_ref_cond
# 2. Refrigerant Cycle Evaluation
cs = calc_ref_state(
T_evap_K=T_ref_evap_sat_K,
T_cond_K=T_ref_cond_sat_K,
refrigerant=self.ref,
eta_cmp_isen=1.0,
mode="heating",
dT_superheat=self.dT_superheat,
dT_subcool=self.dT_subcool,
is_active=True,
)
h_ref_cmp_in = cs["h_ref_cmp_in [J/kg]"]
h_ref_exp_in = cs["h_ref_exp_in [J/kg]"]
h_ref_exp_out = cs["h_ref_exp_out [J/kg]"]
rho_ref_cmp_in = cs["rho_ref_cmp_in [kg/m3]"]
P_evap = cs["P_ref_cmp_in [Pa]"]
P_cond = cs["P_ref_cmp_out [Pa]"]
ratio_P_cmp = P_cond / P_evap if P_evap > 0 else 1.0
if h_ref_cmp_in - h_ref_exp_in <= 0:
return None
try:
s_cmp_in = cs["s_ref_cmp_in [J/(kg·K)]"]
h2_isen = CP.PropsSI("H", "P", P_cond, "S", s_cmp_in, self.ref)
except ValueError:
h2_isen = h_ref_cmp_in
# 3. Cycle Performance
def _residual_rps(rps):
val_eta_vol = _eval_eff(self.eta_cmp_vol, ratio_P_cmp, rps)
val_eta_isen = _eval_eff(self.eta_cmp_isen, ratio_P_cmp, rps)
h_cmp_out_local = h_ref_cmp_in + (h2_isen - h_ref_cmp_in) / val_eta_isen
dh_cond_local = h_cmp_out_local - h_ref_exp_in
m_dot = self.V_disp_cmp * rho_ref_cmp_in * val_eta_vol * rps
return (m_dot * dh_cond_local) - Q_cond_load
from scipy.optimize import brentq
try:
cmp_rps = brentq(_residual_rps, 10.0, 150.0)
converged_rps = True
except ValueError:
res_min = _residual_rps(10.0)
res_max = _residual_rps(150.0)
cmp_rps = 10.0 if abs(res_min) < abs(res_max) else 150.0
converged_rps = False
val_eta_vol = _eval_eff(self.eta_cmp_vol, ratio_P_cmp, cmp_rps)
val_eta_isen = _eval_eff(self.eta_cmp_isen, ratio_P_cmp, cmp_rps)
val_eta_mech = _eval_eff(self.eta_cmp_mech, ratio_P_cmp, cmp_rps)
cs = calc_ref_state(
T_evap_K=T_ref_evap_sat_K,
T_cond_K=T_ref_cond_sat_K,
refrigerant=self.ref,
eta_cmp_isen=val_eta_isen,
mode="heating",
dT_superheat=self.dT_superheat,
dT_subcool=self.dT_subcool,
is_active=True,
)
h_ref_cmp_out = cs["h_ref_cmp_out [J/kg]"]
m_dot_ref = self.V_disp_cmp * rho_ref_cmp_in * val_eta_vol * cmp_rps
Q_ref_cond = m_dot_ref * (h_ref_cmp_out - h_ref_exp_in)
Q_ref_evap = m_dot_ref * (h_ref_cmp_in - h_ref_exp_out)
E_cmp = (m_dot_ref * (h_ref_cmp_out - h_ref_cmp_in)) / val_eta_mech
# 4. NTU Evaporator Analysis
NTU_evap = self.UA_evap / m_dot_cp_b
eps = 1.0 - math.exp(-NTU_evap)
Q_evap_actual = eps * m_dot_cp_b * (T_evap_in_K - T_ref_evap_sat_K)
err = Q_ref_evap - Q_evap_actual
# Penalize if cycle evap load exceeds physics limit
penalty = 0.0
if Q_ref_evap > Q_evap_actual:
penalty = 1e4 * (Q_ref_evap - Q_evap_actual) ** 2
# 5. BHE state
Q_bhe = Q_ref_evap - self.E_pmp
Q_bhe_unit = Q_bhe / self.H_b
# Fluid enters BHE at T_bhe_f_in_K
T_bhe_f_in_K = T_evap_in_K - Q_ref_evap / m_dot_cp_b
T_bhe_f_out_K = T_b_out_K
T_bhe_f = (cu.K2C(T_bhe_f_in_K) + cu.K2C(T_bhe_f_out_K)) / 2
T_bhe = T_bhe_f + Q_bhe_unit * self.R_b
# 6. Assemble
result: dict = cs.copy()
result.update(
{
"hp_is_on": True,
"converged": bool(converged_rps),
"_penalty": penalty,
"err_Q_evap [W]": 0.0,
"T_ref_evap_sat [°C]": cu.K2C(cs.get("T_ref_evap_sat_K", np.nan)),
"T_ref_cond_sat_v [°C]": cu.K2C(cs.get("T_ref_cond_sat_l_K", np.nan)),
"T_ref_cond_sat_l [°C]": cu.K2C(cs.get("T_ref_cond_sat_l_K", np.nan)),
"T0 [°C]": T0,
"T_ref_cmp_in [°C]": cu.K2C(cs.get("T_ref_cmp_in_K", np.nan)),
"T_ref_cmp_out [°C]": cu.K2C(cs.get("T_ref_cmp_out_K", np.nan)),
"T_ref_exp_in [°C]": cu.K2C(cs.get("T_ref_exp_in_K", np.nan)),
"T_ref_exp_out [°C]": cu.K2C(cs.get("T_ref_exp_out_K", np.nan)),
"T_cond [°C]": cu.K2C(cs.get("T_ref_cond_sat_l_K", np.nan)),
"T_tank_w [°C]": T_tank_w,
"T_mix_w_out [°C]": self.T_mix_w_out,
"T_tank_w_in [°C]": self.T_tank_w_in,
"Ts [°C]": self.Ts,
"T_bhe [°C]": T_bhe,
"T_bhe_f [°C]": T_bhe_f,
"T_bhe_f_in [°C]": cu.K2C(T_bhe_f_in_K),
"T_bhe_f_out [°C]": cu.K2C(T_bhe_f_out_K),
"dV_bhe_f [m3/s]": self.dV_b_f_m3s,
"dV_mix_w_out [m3/s]": flow_state.get("dV_mix_w_out", 0.0),
"dV_tank_w_in [m3/s]": flow_state.get("dV_tank_w_in", 0.0),
"dV_tank_w_out [m3/s]": flow_state.get("dV_tank_w_out", 0.0),
"dV_mix_sup_w_in [m3/s]": flow_state.get("dV_mix_sup_w_in", 0.0),
"P_ref_evap_sat [Pa]": cs.get("P_ref_cmp_in [Pa]", np.nan),
"P_ref_cond_sat_l [Pa]": cs.get("P_ref_exp_in [Pa]", np.nan),
"m_dot_ref [kg/s]": m_dot_ref,
"cmp_rpm [rpm]": cmp_rps * 60,
"h_ref_evap_sat [J/kg]": CP.PropsSI(
"H", "P", cs.get("P_ref_cmp_in [Pa]", 1e5), "Q", 1, self.ref
),
"h_ref_cond_sat_v [J/kg]": CP.PropsSI(
"H", "P", cs.get("P_ref_cmp_out [Pa]", 1e6), "Q", 1, self.ref
),
"h_ref_cond_sat_l [J/kg]": h_ref_exp_in,
"Q_cond_load [W]": Q_cond_load,
"Q_ref_cond [W]": Q_ref_cond,
"Q_ref_evap [W]": Q_ref_evap,
"Q_bhe [W]": Q_bhe,
"E_cmp [W]": E_cmp,
"E_pmp [W]": self.E_pmp,
"E_tot [W]": E_cmp + self.E_pmp,
"cop_ref [-]": (Q_ref_cond / E_cmp) if E_cmp > 0 else np.nan,
"cop_sys [-]": (Q_ref_cond / (E_cmp + self.E_pmp)) if (E_cmp + self.E_pmp) > 0 else np.nan,
}
)
return result
def _optimize_operation(self, T_tank_w: float, Q_cond_load: float, T0: float, *, flow_state: dict):
from scipy.optimize import brentq
self._opt_evals = getattr(self, "_opt_evals", 0)
def _objective(dT_evap):
self._opt_evals += 1
perf = self._calc_state(
dT_ref_evap=dT_evap, T_tank_w=T_tank_w, Q_cond_load=Q_cond_load, T0=T0, flow_state=flow_state
)
if perf is None:
raise ValueError(f"Cycle impossible at dT_evap={dT_evap}")
err = perf.get("err_Q_evap [W]", np.nan)
if np.isnan(err):
raise ValueError(f"NaN error at dT_evap={dT_evap}")
return err
self._opt_evals = 0
try:
opt_x = brentq(_objective, 1, 20.0, xtol=1e-4, maxiter=50)
class OptRes:
success = True
x = opt_x
return OptRes()
except Exception:
class OptResFail:
success = False
x = np.nan
return OptResFail()
def _determine_hp_state(self, ctx: StepContext, is_on_prev: bool) -> tuple[bool, dict, float]:
T_tank_w = cu.K2C(ctx.T_tank_w_K)
hp_is_on = determine_heat_source_on_off(
T_tank_w_C=T_tank_w,
T_lower=self.T_tank_w_lower_bound,
T_upper=self.T_tank_w_upper_bound,
is_on_prev=is_on_prev,
hour_of_day=ctx.hour_of_day,
on_schedule=self.hp_on_schedule,
)
Q_cond_load = self.hp_capacity if hp_is_on else 0.0
flow_state = self._calc_tank_flow_context(
dV_mix_w_out=ctx.dV_mix_w_out,
T_tank_w_K=ctx.T_tank_w_K,
T_tank_w_in_K=self.T_tank_w_in_K,
T_mix_w_out_K=self.T_mix_w_out_K,
)
if Q_cond_load <= self.Q_cond_LOAD_OFF_TOL:
# OFF
perf = self._calc_off_state(T_tank_w, cu.K2C(ctx.T0_K), flow_state=flow_state)
return False, perf, 0.0
else:
# ON
opt_res = self._optimize_operation(T_tank_w, Q_cond_load, cu.K2C(ctx.T0_K), flow_state=flow_state)
if opt_res.success:
opt_x = float(getattr(opt_res, "x", 0.0))
perf = self._calc_state(opt_x, T_tank_w, Q_cond_load, cu.K2C(ctx.T0_K), flow_state=flow_state)
if perf is None:
perf = self._calc_off_state(T_tank_w, cu.K2C(ctx.T0_K), flow_state=flow_state)
else:
perf = self._calc_off_state(T_tank_w, cu.K2C(ctx.T0_K), flow_state=flow_state)
perf["hp_is_on"] = True
perf["converged"] = opt_res.success
Q_ref_cond_actual = perf.get("Q_ref_cond [W]", 0.0)
if np.isnan(Q_ref_cond_actual):
Q_ref_cond_actual = 0.0
return True, perf, Q_ref_cond_actual
# =============================================================
# Hooks
# =============================================================
def _get_activation_flags(self, hour_of_day: float) -> dict[str, bool]:
flags = {}
if self.stc is not None:
flags["stc"] = self.stc.is_preheat_on(hour_of_day)
return flags
def _needs_solar_input(self) -> bool:
return self.stc is not None
def _build_residual_fn(
self,
ctx: StepContext,
ctrl: ControlState,
dt_s: float,
T_tank_w_in_K_n: float,
T_sup_w_K_n: float,
tank_level: float,
sub_states: dict,
):
def residual(T_cand_K: float) -> float:
return tank_mass_energy_residual(
[T_cand_K, tank_level],
ctx,
ctrl,
dt_s,
T_tank_w_in_K_n,
T_sup_w_K_n,
self.T_mix_w_out_K,
self.C_tank,
self.UA_tank,
self.V_tank_full,
self._subsystems,
sub_states,
)[0]
return residual
def _run_subsystems(
self,
ctx: StepContext,
ctrl: ControlState,
dt: float,
T_tank_w_in_K: float,
) -> dict[str, dict]:
states = {}
for name, sub in self._subsystems.items():
if hasattr(sub, "step"):
states[name] = sub.step(ctx, ctrl, dt, T_tank_w_in_K)
return states
def _augment_results(
self,
r: dict,
ctx: StepContext,
ctrl: ControlState,
sub_states: dict[str, dict],
T_solved_K: float,
) -> dict:
for name, sub in self._subsystems.items():
if hasattr(sub, "assemble_results"):
sub_record = sub.assemble_results(
ctx,
ctrl,
sub_states.get(name, {}),
T_solved_K,
)
r.update(sub_record)
return r
def _postprocess(self, df: pd.DataFrame) -> pd.DataFrame:
return self.postprocess_exergy(df)
def _assemble_core_results(
self, ctx: StepContext, ctrl: ControlState, T_solved_K: float, level_solved: float, perf: dict, flow_state: dict
) -> dict:
r = perf.copy()
r["T_tank_w [°C]"] = cu.K2C(T_solved_K)
r["T0 [°C]"] = cu.K2C(ctx.T0_K)
r["hp_is_on"] = ctrl.is_on
Q_tank_loss = self.UA_tank * (T_solved_K - ctx.T0_K)
mix = calc_mixing_valve_temp(T_solved_K, self.T_tank_w_in_K, self.T_mix_w_out_K)
r["T_mix_w_out [°C]"] = cu.K2C(mix["T_mix_w_out_K"])
r["Q_tank_loss [W]"] = Q_tank_loss
r["dV_mix_w_out [m3/s]"] = ctx.dV_mix_w_out
r["dV_tank_w_in [m3/s]"] = flow_state["dV_tank_w_in"]
r["dV_tank_w_out [m3/s]"] = flow_state["dV_tank_w_out"]
r["dV_mix_sup_w_in [m3/s]"] = flow_state["dV_mix_sup_w_in"]
r["tank_level [-]"] = 1.0 # lumped capacitance
if not self.tank_always_full or (self.tank_always_full and self.prevent_simultaneous_flow):
r["tank_level [-]"] = level_solved
r.pop("_penalty", None)
return r
def _compute_bhe_superposition(
self,
n: int,
time_arr: np.ndarray,
Q_bhe_unit_pulse: np.ndarray,
Q_bhe_unit_old: float,
hp_result: dict,
hp_is_on: bool,
) -> float:
Q_bhe_unit = hp_result.get("Q_bhe [W]", 0.0) / self.H_b if hp_is_on else 0.0
if abs(Q_bhe_unit - Q_bhe_unit_old) > 1e-6:
Q_bhe_unit_pulse[n] = Q_bhe_unit - Q_bhe_unit_old
Q_bhe_unit_old = Q_bhe_unit
pulses_idx = np.flatnonzero(Q_bhe_unit_pulse[: n + 1])
if len(pulses_idx) > 0:
dQ = Q_bhe_unit_pulse[pulses_idx]
tau = time_arr[n] - time_arr[pulses_idx]
tau = np.maximum(tau, 1e-6)
g_n_array = self._gfunc_interp(tau)
dT_bhe = float(np.dot(dQ, g_n_array))
else:
dT_bhe = 0.0
self.T_bhe = self.Ts - dT_bhe
T_bhe_K = cu.C2K(self.T_bhe)
T_bhe_f_K = T_bhe_K - Q_bhe_unit * self.R_b
self.T_bhe_f = cu.K2C(T_bhe_f_K)
self.Q_bhe = Q_bhe_unit * self.H_b
m_cp_b = c_w * rho_w * self.dV_b_f_m3s
# Assume symmetrical temperature approach around average BHE fluid temperature
dT_bhe_f_half = float((self.Q_bhe / m_cp_b) / 2) if m_cp_b > 0 else 0.0
self.T_bhe_f_in_K = T_bhe_f_K - dT_bhe_f_half
self.T_bhe_f_in = cu.K2C(self.T_bhe_f_in_K)
T_bhe_f_out_K = T_bhe_f_K + dT_bhe_f_half
self.T_bhe_f_out = cu.K2C(T_bhe_f_out_K)
# Apply BHE state to hp_result (so it is visible correctly)
hp_result["T_bhe [°C]"] = self.T_bhe
hp_result["T_bhe_f [°C]"] = self.T_bhe_f
hp_result["T_bhe_f_in [°C]"] = self.T_bhe_f_in
hp_result["T_bhe_f_out [°C]"] = self.T_bhe_f_out
return Q_bhe_unit_old
# =============================================================
# Orchestration
# =============================================================
[docs]
def analyze_dynamic(
self,
simulation_period_sec: float,
dt_s: float,
T_tank_w_init_C: float,
dhw_usage_schedule,
T0_schedule,
I_DN_schedule=None,
I_dH_schedule=None,
T_sup_w_schedule=None,
tank_level_init: float = 1.0,
result_save_csv_path=None,
) -> pd.DataFrame:
from scipy.optimize import fsolve
time = np.arange(0, simulation_period_sec, dt_s)
tN = len(time)
T0_schedule = np.array(T0_schedule)
if I_DN_schedule is None:
I_DN_schedule = np.zeros(tN)
if I_dH_schedule is None:
I_dH_schedule = np.zeros(tN)
if T_sup_w_schedule is not None:
T_sup_w_arr = np.array(T_sup_w_schedule, dtype=float)
else:
T_sup_w_arr = np.full(tN, cu.K2C(self.T_tank_w_in_K))
results_data = []
self.time = time
self.dt = dt_s
T_tank_w_K = cu.C2K(T_tank_w_init_C)
tank_level = tank_level_init
is_on_prev = False
is_refilling = False
self.T_bhe_f = self.Ts
self.T_bhe = self.Ts
self.T_bhe_f_in = self.Ts
self.T_bhe_f_in_K = self.Ts_K
self.T_bhe_f_out = self.Ts
self.Q_bhe = 0.0
Q_bhe_unit_pulse = np.zeros(tN)
Q_bhe_unit_old = 0.0
# DHW schedule handling: direct m³/s flow array
dhw_flow_m3s = np.asarray(dhw_usage_schedule, dtype=float)
if len(dhw_flow_m3s) != tN:
raise ValueError(f"dhw_usage_schedule length ({len(dhw_flow_m3s)}) != tN ({tN})")
_use_solar = self._needs_solar_input()
for n in tqdm(range(tN), desc="GSHPB Simulating"):
t_s = time[n]
hr = t_s * cu.s2h
hour_of_day = (t_s % (24 * 3600)) * cu.s2h
T0_K = cu.C2K(T0_schedule[n])
T_sup_w_n = T_sup_w_arr[n]
T_sup_w_K_n = cu.C2K(T_sup_w_n)
# Subsystem activation
activation_flags = self._get_activation_flags(hour_of_day)
dV_mix_w_out = dhw_flow_m3s[n]
ctx = StepContext(
n=n,
current_time_s=t_s,
current_hour=hr,
hour_of_day=hour_of_day,
T0=T0_schedule[n],
T0_K=T0_K,
activation_flags=activation_flags,
T_tank_w_K=T_tank_w_K,
tank_level=tank_level,
dV_mix_w_out=dV_mix_w_out,
I_DN=I_DN_schedule[n] if _use_solar else 0.0,
I_dH=I_dH_schedule[n] if _use_solar else 0.0,
T_sup_w_K=T_sup_w_K_n,
)
# --- Phase A: Control Decisions ---
hp_is_on, hp_result, Q_ref_cond = self._determine_hp_state(ctx, is_on_prev)
is_transitioning_off_to_on = (not is_on_prev) and hp_is_on
is_on_prev = hp_is_on
# Refill logic
flow_state_guess = self._calc_tank_flow_context(
dV_mix_w_out=ctx.dV_mix_w_out,
T_tank_w_K=ctx.T_tank_w_K,
T_tank_w_in_K=T_sup_w_K_n,
T_mix_w_out_K=self.T_mix_w_out_K,
)
dV_tank_w_in_ctrl, is_refilling = determine_tank_refill_flow(
dt=dt_s,
tank_level=ctx.tank_level,
dV_tank_w_out=flow_state_guess["dV_tank_w_out"],
V_tank_full=self.V_tank_full,
tank_always_full=self.tank_always_full,
prevent_simultaneous_flow=self.prevent_simultaneous_flow,
tank_level_lower_bound=self.tank_level_lower_bound,
tank_level_upper_bound=self.tank_level_upper_bound,
dV_tank_w_in_refill=self.dV_tank_w_in_refill,
is_refilling=is_refilling,
)
ctrl = ControlState(
is_on=hp_is_on,
Q_heat_source=Q_ref_cond,
dV_tank_w_in_ctrl=dV_tank_w_in_ctrl,
)
# --- Phase B: Implicit Solving ---
sub_states = self._run_subsystems(ctx, ctrl, dt_s, T_sup_w_K_n)
alp_prev: float = min(
1.0, max(0.0, (self.T_mix_w_out_K - T_sup_w_K_n) / max(1e-6, ctx.T_tank_w_K - T_sup_w_K_n))
)
dV_tank_w_out_prev = alp_prev * ctx.dV_mix_w_out
dV_tank_w_in_prev = dV_tank_w_out_prev if ctrl.dV_tank_w_in_ctrl is None else ctrl.dV_tank_w_in_ctrl
tank_vol_change_prev = (dV_tank_w_in_prev - dV_tank_w_out_prev) * dt_s
level_next_approx = min(1.0, max(0.0, ctx.tank_level + tank_vol_change_prev / self.V_tank_full))
tank_level_solve = max(0.001, level_next_approx)
res_fn = self._build_residual_fn(
ctx=ctx,
ctrl=ctrl,
dt_s=dt_s,
T_tank_w_in_K_n=T_sup_w_K_n,
T_sup_w_K_n=T_sup_w_K_n,
tank_level=tank_level_solve,
sub_states=sub_states,
)
from typing import cast
T_guess_K = ctx.T_tank_w_K
try:
T_solved_K_arr = cast(np.ndarray, fsolve(res_fn, x0=[T_guess_K]))
T_solved_K = float(T_solved_K_arr[0])
except Exception:
# explicit Euler fallback
Q_hp_val = ctrl.Q_heat_source
Q_flow_curr = c_w * rho_w * dV_tank_w_out_prev * (T_sup_w_K_n - ctx.T_tank_w_K)
Q_loss_curr = self.UA_tank * (ctx.T_tank_w_K - ctx.T0_K)
Q_tot = Q_hp_val + Q_flow_curr - Q_loss_curr
T_solved_K = ctx.T_tank_w_K + dt_s * Q_tot / (self.C_tank * tank_level_solve)
if T_solved_K <= T_sup_w_K_n:
T_solved_K = T_sup_w_K_n
# Flow state evaluated at solved temperature
flow_state_final = self._calc_tank_flow_context(
dV_mix_w_out=ctx.dV_mix_w_out,
T_tank_w_K=T_solved_K,
T_tank_w_in_K=T_sup_w_K_n,
T_mix_w_out_K=self.T_mix_w_out_K,
dV_tank_w_in_override=ctrl.dV_tank_w_in_ctrl,
)
tank_vol_change_final = (flow_state_final["dV_tank_w_in"] - flow_state_final["dV_tank_w_out"]) * dt_s
level_next = min(1.0, max(0.0, ctx.tank_level + tank_vol_change_final / self.V_tank_full))
# --- Phase C: BHE Temporal Superposition ---
Q_bhe_unit_old = self._compute_bhe_superposition(
n=n,
time_arr=time,
Q_bhe_unit_pulse=Q_bhe_unit_pulse,
Q_bhe_unit_old=Q_bhe_unit_old,
hp_result=hp_result,
hp_is_on=hp_is_on,
)
# Assemble step results
step_record = self._assemble_core_results(ctx, ctrl, T_solved_K, level_next, hp_result, flow_state_final)
self._augment_results(step_record, ctx, ctrl, sub_states, T_solved_K)
results_data.append(step_record)
# Step forward
T_tank_w_K = T_solved_K
tank_level = level_next
results_df = pd.DataFrame(results_data)
results_df.ffill(inplace=True)
results_df = self._postprocess(results_df)
if result_save_csv_path:
results_df.to_csv(result_save_csv_path, index=False)
return results_df
[docs]
def analyze_steady(
self,
T_tank_w: float,
T_source: float,
Q_ref_cond: float,
T0: float = 0.0,
*,
return_dict: bool = True,
) -> dict | pd.DataFrame:
"""Run a steady-state performance snapshot.
Evaluates the refrigerant cycle at a given operating point
(``T_tank_w``, ``T_source``, ``Q_ref_cond``) **without** solving the tank energy
balance or tracking dynamic flows.
Parameters
----------
T_tank_w : float
Tank water temperature [°C] — treated as a given input.
T_source : float
Source fluid temperature entering the heat pump [°C].
Q_ref_cond : float
Target condenser heat rate [W].
T0 : float
Dead-state / outdoor-air temperature [°C] (for exergy calculations).
return_dict : bool
If ``True`` return dict; else single-row DataFrame.
Returns
-------
dict | pd.DataFrame
"""
import warnings
import contextlib
# Empty flow state as steady state ignores dynamic withdrawal/refill
flow_state = {
"dV_mix_w_out": 0.0,
"dV_tank_w_out": 0.0,
"dV_tank_w_in": 0.0,
"dV_mix_sup_w_in": 0.0,
"alp": 0.0,
}
# Override T_bhe_f_out_K so that _calc_state uses T_source correctly
self.T_bhe_f_out_K = cu.C2K(T_source)
if Q_ref_cond <= 0:
result = self._calc_off_state(
T_tank_w=T_tank_w,
T0=T0,
flow_state=flow_state,
)
else:
opt_result = self._optimize_operation(
T_tank_w=T_tank_w,
Q_cond_load=Q_ref_cond,
T0=T0,
flow_state=flow_state,
)
result = None
with contextlib.suppress(Exception):
opt_x = float(getattr(opt_result, "x", 5.0))
result = self._calc_state(
dT_ref_evap=opt_x,
T_tank_w=T_tank_w,
Q_cond_load=Q_ref_cond,
T0=T0,
flow_state=flow_state,
)
if result is None or not isinstance(result, dict):
warnings.warn(
f"analyze_steady: optimization failed "
f"(T_tank_w={T_tank_w:.1f}°C, T_source={T_source:.1f}°C, "
f"Q_ref_cond={Q_ref_cond:.0f}W). "
"Returning HP-off state.",
RuntimeWarning,
stacklevel=2,
)
try:
result = self._calc_state(
dT_ref_evap=5.0,
T_tank_w=T_tank_w,
Q_cond_load=0.0,
T0=T0,
flow_state=flow_state,
)
except Exception:
result = self._calc_off_state(
T_tank_w=T_tank_w,
T0=T0,
flow_state=flow_state,
)
if (
result is not None
and isinstance(result, dict)
and "opt_result" in locals()
and hasattr(opt_result, "success")
):
result["converged"] = opt_result.success
if result is not None:
# Steady state doesn't have tank loss because we don't solve tank mass/energy balance
result["Q_tank_loss [W]"] = 0.0
result["tank_level [-]"] = 1.0 # steady-state: always_full
if result is None:
result = {}
if return_dict:
return result
return pd.DataFrame([result])
[docs]
def postprocess_exergy(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute GSHPB-specific exergy variables."""
from .thermodynamics import calc_energy_flow, calc_refrigerant_exergy, convert_electricity_to_exergy
df = df.copy()
T0_K = cu.C2K(df["T0 [°C]"])
T_tank_K = cu.C2K(df["T_tank_w [°C]"])
df["Q_tank_w_out [W]"] = calc_energy_flow(c_w * rho_w * df["dV_tank_w_out [m3/s]"].fillna(0), T_tank_K, T0_K)
# 1. Refrigerant state points
df = calc_refrigerant_exergy(df, self.ref, T0_K)
df = convert_electricity_to_exergy(df)
# 2. Exergy flows
G_b = c_w * rho_w * df["dV_bhe_f [m3/s]"]
T_bhe_f_in_K = cu.C2K(df["T_bhe_f_in [°C]"])
T_bhe_f_out_K = cu.C2K(df["T_bhe_f_out [°C]"])
# Exergy at BHE boundaries
X_bhe_in = calc_exergy_flow(G_b, T_bhe_f_in_K, T0_K)
X_bhe_out = calc_exergy_flow(G_b, T_bhe_f_out_K, T0_K)
# Fluid enters evaporator after being heated by the pump
T_evap_in_K = T_bhe_f_out_K + df["E_pmp [W]"] / G_b.replace(0, np.nan)
X_evap_in = calc_exergy_flow(G_b, T_evap_in_K, T0_K)
# Fluid leaves evaporator and enters BHE
X_evap_out = X_bhe_in
df["X_ref_cond [W]"] = df["Q_ref_cond [W]"] * (1 - T0_K / cu.C2K(df["T_ref_cond_sat_v [°C]"]))
df["X_ref_evap [W]"] = df["Q_ref_evap [W]"] * (1 - T0_K / cu.C2K(df["T_ref_evap_sat [°C]"]))
df["X_tank_w_in [W]"] = calc_exergy_flow(
c_w * rho_w * df["dV_tank_w_in [m3/s]"].fillna(0), cu.C2K(df["T_tank_w_in [°C]"]), T0_K
)
df["X_tank_w_out [W]"] = calc_exergy_flow(c_w * rho_w * df["dV_tank_w_out [m3/s]"].fillna(0), T_tank_K, T0_K)
df["X_mix_w_out [W]"] = calc_exergy_flow(
c_w * rho_w * df["dV_mix_w_out [m3/s]"].fillna(0), cu.C2K(df["T_mix_w_out [°C]"]), T0_K
)
df["X_mix_sup_w_in [W]"] = calc_exergy_flow(
c_w * rho_w * df["dV_mix_sup_w_in [m3/s]"].fillna(0), cu.C2K(df["T_tank_w_in [°C]"]), T0_K
)
df["X_tank_loss [W]"] = df["Q_tank_loss [W]"] * (1 - T0_K / T_tank_K)
tank_lvl = df["tank_level [-]"].fillna(1.0) if "tank_level [-]" in df.columns else 1.0
C_tank_actual = self.C_tank * tank_lvl
T_tank_K_prev = T_tank_K.shift(1)
df["Xst_tank [W]"] = (1 - T0_K / T_tank_K) * C_tank_actual * (T_tank_K - T_tank_K_prev) / self.dt
df.loc[df.index[0], "Xst_tank [W]"] = 0.0
import typing
# Subsystems exergy
X_sub_tot_add = typing.cast(typing.Any, 0.0)
X_sub_in_tank_add = typing.cast(typing.Any, 0.0)
X_sub_out_tank_add = typing.cast(typing.Any, 0.0)
for _name, sub in self._subsystems.items():
if hasattr(sub, "calc_exergy"):
ex_res = sub.calc_exergy(df, T0_K)
if ex_res is not None:
for col_name, s in ex_res.columns.items():
df[col_name] = s
X_sub_tot_add = X_sub_tot_add + ex_res.X_tot_add
X_sub_in_tank_add = X_sub_in_tank_add + ex_res.X_in_tank_add
X_sub_out_tank_add = X_sub_out_tank_add + ex_res.X_out_tank_add
# Components Destruction
df["X_tot [W]"] = df["E_cmp [W]"] + df["E_pmp [W]"] + df.get("X_uv [W]", 0.0) + X_sub_tot_add
df["Xc_cmp [W]"] = df["X_cmp [W]"] + df["X_ref_cmp_in [W]"] - df["X_ref_cmp_out [W]"]
df["Xc_cond [W]"] = (df["X_ref_cmp_out [W]"] - df["X_ref_exp_in [W]"]) - df["X_ref_cond [W]"]
df["Xc_exp [W]"] = df["X_ref_exp_in [W]"] - df["X_ref_exp_out [W]"]
df["Xc_evap [W]"] = (X_evap_in - X_evap_out) - df["X_ref_evap [W]"]
df["Xc_pmp [W]"] = df["E_pmp [W]"] - (X_evap_in - X_bhe_out)
X_in_tank = df["X_ref_cond [W]"] + df["X_tank_w_in [W]"].fillna(0) + df.get("X_uv [W]", 0.0) + X_sub_in_tank_add
X_out_tank = df["Xst_tank [W]"] + df["X_tank_w_out [W]"].fillna(0) + X_sub_out_tank_add
df["Xc_tank [W]"] = X_in_tank - X_out_tank
df["Xc_mix [W]"] = (
df["X_tank_w_out [W]"].fillna(0) + df["X_mix_sup_w_in [W]"].fillna(0) - df["X_mix_w_out [W]"].fillna(0)
)
# Efficiency
df["X_eff_ref [-]"] = df["X_ref_cond [W]"] / df["X_cmp [W]"].replace(0, np.nan)
df["X_eff_sys [-]"] = df["X_ref_cond [W]"] / df["X_tot [W]"].replace(0, np.nan)
return df