Source code for enex_analysis.water_source_heat_pump_boiler

"""Ground source heat pump boiler — physics-based cycle model.

Resolves a vapour-compression refrigerant cycle coupled to a borehole heat
exchanger (BHE) on the evaporator side and a lumped-capacitance hot-water
tank on the condenser side. At each time step the model finds the
minimum-power operating point via 1D Brent optimization over the evaporator
approach temperature difference, while the condenser temperature is solved
analytically.

Borehole thermal response is tracked with pygfunction-based multi-borehole
g-functions, enabling robust long-term ground temperature drift modeling.
"""

from __future__ import annotations

import math
from typing import TYPE_CHECKING, Any

import CoolProp.CoolProp as CP
import numpy as np
import pandas as pd
from tqdm import tqdm

from . import calc_util as cu
from .constants import c_w, k_w, mu_w, rho_w
from .dynamic_context import (
    ControlState,
    StepContext,
    determine_heat_source_on_off,
    determine_tank_refill_flow,
    tank_mass_energy_residual,
)
from .enex_functions import (
    calc_mixing_valve_flows,
    calc_mixing_valve_temp,
)
from .heat_transfer import calc_simple_tank_UA
from .refrigerant import calc_ref_state
from .thermodynamics import calc_exergy_flow

if TYPE_CHECKING:
    from .subsystems import SolarThermalCollector


[docs] class WaterSourceHeatPumpBoiler: """Water source heat pump boiler with BHE and lumped-tank model. The refrigerant cycle is resolved via CoolProp with user-specified superheat / subcool margins. An optimizer minimises total cycle electrical input subject to NTU-based evaporator constraints and analytical condenser temperature relations. """
[docs] def __init__( self, # 1. Refrigerant / cycle / compressor refrigerant: str = "R410A", V_disp_cmp: float = 0.0005, eta_cmp_isen: float = 0.7, # 2. Heat exchanger UA UA_cond_design: float = 500, UA_evap_design: float = 500, # 3. Tank / control / load T0: float = 0.0, Ts: float = 16.0, T_tank_w_upper_bound: float = 65.0, T_tank_w_lower_bound: float = 55.0, T_mix_w_out: float = 40.0, T_tank_w_in: float = 15.0, hp_capacity: float = 8000.0, dV_mix_w_out_max: float = 0.0001, # Tank / insulation r0: float = 0.2, H: float = 0.8, x_shell: float = 0.01, x_ins: float = 0.05, k_shell: float = 25, k_ins: float = 0.03, h_o: float = 15, # 4. Borehole heat exchanger (Field + Params) N_1: int = 1, N_2: int = 1, B: float = 6.0, D_b: float = 0, H_b: float = 200, r_b: float = 0.08, R_b: float | None = None, k_g: float = 1.5, k_p: float = 0.4, r_out: float = 0.016, r_in: float = 0.013, D_s: float = 0.025, dV_b_f_lpm: float = 24, k_s: float = 2.0, c_s: float = 800, rho_s: float = 2000, E_pmp: float = 200, v_river: float = 0.5, # 6. Superheat / subcool dT_superheat: float = 5.0, dT_subcool: float = 5.0, # 7. Tank fluid limits tank_always_full: bool = True, prevent_simultaneous_flow: bool = False, tank_level_lower_bound: float = 0.5, tank_level_upper_bound: float = 1.0, dV_tank_w_in_refill: float = 0.001, # 8. Operation Schedule hp_on_schedule: list[tuple[float, float]] | None = None, # 9. Subsystems stc: SolarThermalCollector | None = None, pv=None, uv=None, # 10. Simulation scope (for precomputing g-functions) t_max_s: float = 8760 * 3600, dt_s: float = 3600, ) -> None: self.tank_physical = { "r0": r0, "H": H, "x_shell": x_shell, "x_ins": x_ins, "k_shell": k_shell, "k_ins": k_ins, "h_o": h_o, } self.UA_tank = calc_simple_tank_UA(**self.tank_physical) self.V_tank_full: float = math.pi * r0**2 * H self.C_tank = c_w * rho_w * self.V_tank_full self.ref = refrigerant self.V_disp_cmp = V_disp_cmp self.eta_cmp_isen = eta_cmp_isen self.UA_cond = UA_cond_design self.UA_evap = UA_evap_design self.T0_K = cu.C2K(T0) self.Ts = Ts self.Ts_K = cu.C2K(self.Ts) self.T_bhe_f_in = Ts self.T_bhe_f_in_K = self.Ts_K self.hp_capacity = hp_capacity self.hp_on_schedule = hp_on_schedule or [(0.0, 24.0)] self.dV_mix_w_out_max = dV_mix_w_out_max self.T_tank_w_upper_bound = T_tank_w_upper_bound self.T_tank_w_lower_bound = T_tank_w_lower_bound self.T_mix_w_out = T_mix_w_out self.T_mix_w_out_K = cu.C2K(T_mix_w_out) self.T_tank_w_in = T_tank_w_in self.T_tank_w_in_K = cu.C2K(T_tank_w_in) self.tank_always_full = tank_always_full self.prevent_simultaneous_flow = prevent_simultaneous_flow self.tank_level_lower_bound = tank_level_lower_bound self.tank_level_upper_bound = tank_level_upper_bound self.dV_tank_w_in_refill = dV_tank_w_in_refill self.dT_superheat = dT_superheat self.dT_subcool = dT_subcool # BHE properties self.N_1 = N_1 self.N_2 = N_2 self.B = B self.D_b = D_b self.H_b = H_b self.r_b = r_b self.k_s = k_s self.c_s = c_s self.rho_s = rho_s self.alp_s = k_s / (c_s * rho_s) self.E_pmp = E_pmp self.dV_b_f_m3s = dV_b_f_lpm * cu.L2m3 / cu.m2s self.v_river = v_river if R_b is None: from .g_function import calc_submerged_coil_thermal_resistance n_boreholes = max(1, self.N_1 * self.N_2) m_flow_total = self.dV_b_f_m3s * rho_w m_flow_pipe = m_flow_total / n_boreholes self.R_b = calc_submerged_coil_thermal_resistance( r_out=r_out, r_in=r_in, D_s=D_s, k_p=k_p, m_flow_pipe=m_flow_pipe, rho_f=rho_w, mu_f=mu_w, cp_f=c_w, k_f=k_w, v_river=self.v_river, ) else: self.R_b = R_b # Subsystems self.stc = stc self.pv = pv self._subsystems: dict[str, Any] = {} if stc is not None: self._subsystems["stc"] = stc if pv is not None: self._subsystems["pv"] = pv if uv is not None: self._subsystems["uv"] = uv self.Q_cond_LOAD_OFF_TOL: float = 50.0 # W self.dt_s: float = dt_s # Simulation state tracking (dynamically updated in analyze_dynamic) self.time: np.ndarray = np.array([]) self.dt: float = dt_s self._opt_evals: int = 0 self.T_bhe_f: float = self.Ts self.T_bhe: float = self.Ts self.T_bhe_f_out: float = self.Ts self.T_bhe_f_out_K: float = self.Ts_K self.Q_bhe: float = 0.0
# NOTE: Removed self.dV_mix_w_out, self.dV_tank_w_in, self.dV_mix_sup_w_in # They will be passed inside `flow_state: dict`. @staticmethod def _calc_tank_flow_context( dV_mix_w_out: float, T_tank_w_K: float, T_tank_w_in_K: float, T_mix_w_out_K: float, dV_tank_w_in_override: float | None = None, ) -> dict: mix_state = calc_mixing_valve_temp(T_tank_w_K, T_tank_w_in_K, T_mix_w_out_K) flows = calc_mixing_valve_flows(dV_mix_w_out, mix_state["alp"]) dV_tank_w_out = flows["dV_hot_in"] dV_tank_w_in = dV_tank_w_out if dV_tank_w_in_override is None else dV_tank_w_in_override return { "alp": mix_state["alp"], "dV_mix_w_out": dV_mix_w_out, "dV_tank_w_out": dV_tank_w_out, "dV_tank_w_in": dV_tank_w_in, "dV_mix_sup_w_in": flows["dV_cold_in"], } def _calc_off_state(self, T_tank_w: float, T0: float, flow_state: dict) -> dict: T_tank_w_K = cu.C2K(T_tank_w) mix = calc_mixing_valve_temp(T_tank_w_K, self.T_tank_w_in_K, self.T_mix_w_out_K) # Bound temperatures for PropsSI to prevent crashes when tank overheats # R410A critical temp is ~344.49K (71.3 °C) T_cond_K_calc = min(max(T_tank_w_K, 250.0), 340.0) T_evap_K_calc = min(max(self.T_bhe_f_in_K, 250.0), 340.0) P_ref_evap_sat = CP.PropsSI("P", "T", T_evap_K_calc, "Q", 1, self.ref) h_ref_evap_sat = CP.PropsSI("H", "P", P_ref_evap_sat, "Q", 1, self.ref) s_ref_evap_sat = CP.PropsSI("S", "P", P_ref_evap_sat, "Q", 1, self.ref) P_ref_cond_sat = CP.PropsSI("P", "T", T_cond_K_calc, "Q", 0, self.ref) h_ref_cond_sat_l = CP.PropsSI("H", "P", P_ref_cond_sat, "Q", 0, self.ref) s_ref_cond_sat_l = CP.PropsSI("S", "P", P_ref_cond_sat, "Q", 0, self.ref) return { "hp_is_on": False, "converged": True, "T_tank_w [°C]": T_tank_w, "T0 [°C]": T0, "T_mix_w_out [°C]": cu.K2C(mix["T_mix_w_out_K"]), "T_tank_w_in [°C]": self.T_tank_w_in, "Ts [°C]": self.Ts, "T_bhe [°C]": getattr(self, "T_bhe", self.Ts), "T_bhe_f [°C]": getattr(self, "T_bhe_f", self.Ts), "T_bhe_f_in [°C]": cu.K2C(getattr(self, "T_bhe_f_in_K", self.Ts_K)), "T_bhe_f_out [°C]": cu.K2C(getattr(self, "T_bhe_f_out_K", self.Ts_K)), "T_ref_evap_sat [°C]": cu.K2C(self.T_bhe_f_in_K), "T_ref_cond_sat_v [°C]": T_tank_w, "T_ref_cond_sat_l [°C]": T_tank_w, "T_ref_cmp_in [°C]": cu.K2C(self.T_bhe_f_in_K), "T_ref_cmp_out [°C]": T_tank_w, "T_ref_exp_in [°C]": T_tank_w, "T_ref_exp_out [°C]": cu.K2C(self.T_bhe_f_in_K), "T_cond [°C]": T_tank_w, "dV_mix_w_out [m3/s]": flow_state.get("dV_mix_w_out", 0.0), "dV_tank_w_in [m3/s]": flow_state.get("dV_tank_w_in", 0.0), "dV_tank_w_out [m3/s]": flow_state.get("dV_tank_w_out", 0.0), "dV_mix_sup_w_in [m3/s]": flow_state.get("dV_mix_sup_w_in", 0.0), "dV_bhe_f [m3/s]": self.dV_b_f_m3s, "P_ref_cmp_in [Pa]": P_ref_evap_sat, "P_ref_cmp_out [Pa]": P_ref_cond_sat, "P_ref_exp_in [Pa]": P_ref_cond_sat, "P_ref_exp_out [Pa]": P_ref_evap_sat, "P_ref_evap_sat [Pa]": P_ref_evap_sat, "P_ref_cond_sat_v [Pa]": P_ref_cond_sat, "P_ref_cond_sat_l [Pa]": P_ref_cond_sat, "h_ref_cmp_in [J/kg]": h_ref_evap_sat, "h_ref_cmp_out [J/kg]": h_ref_evap_sat, "h_ref_exp_in [J/kg]": h_ref_cond_sat_l, "h_ref_exp_out [J/kg]": h_ref_cond_sat_l, "h_ref_evap_sat [J/kg]": h_ref_evap_sat, "h_ref_cond_sat_v [J/kg]": h_ref_evap_sat, "h_ref_cond_sat_l [J/kg]": h_ref_cond_sat_l, "s_ref_cmp_in [J/(kg·K)]": s_ref_evap_sat, "s_ref_cmp_out [J/(kg·K)]": s_ref_evap_sat, "s_ref_exp_in [J/(kg·K)]": s_ref_cond_sat_l, "s_ref_exp_out [J/(kg·K)]": s_ref_cond_sat_l, "x_ref_cmp_in [J/kg]": 0.0, "x_ref_cmp_out [J/kg]": 0.0, "x_ref_exp_in [J/kg]": 0.0, "x_ref_exp_out [J/kg]": 0.0, "Q_bhe [W]": 0.0, "Q_ref_cond [W]": 0.0, "Q_ref_evap [W]": 0.0, "Q_cond_load [W]": 0.0, "E_cmp [W]": 0.0, "E_pmp [W]": 0.0, "E_tot [W]": 0.0, "m_dot_ref [kg/s]": 0.0, "cmp_rpm [rpm]": 0.0, } def _calc_state( self, dT_ref_evap: float, T_tank_w: float, Q_cond_load: float, T0: float, *, flow_state: dict ) -> dict | None: if Q_cond_load <= 0: return self._calc_off_state(T_tank_w, T0, flow_state) # 1. Analytical Condenser Approach Temperature dT_ref_cond = Q_cond_load / self.UA_cond T_tank_w_K = cu.C2K(T_tank_w) # The source temperature leaving BHE and entering HP T_source_K = float(getattr(self, "T_bhe_f_out_K", cu.C2K(15.0))) m_dot_cp_b = self.dV_b_f_m3s * rho_w * c_w T_evap_in_K = T_source_K + (self.E_pmp / m_dot_cp_b) T_ref_evap_sat_K = T_evap_in_K - dT_ref_evap T_ref_cond_sat_K = T_tank_w_K + dT_ref_cond pinch_min: float = 0.5 actual_dT_subcool = min(self.dT_subcool, max(0.0, dT_ref_cond - pinch_min)) # 2. Refrigerant Cycle Evaluation try: cycle_states = calc_ref_state( T_evap_K=T_ref_evap_sat_K, T_cond_K=T_ref_cond_sat_K, refrigerant=self.ref, eta_cmp_isen=self.eta_cmp_isen, dT_superheat=self.dT_superheat, dT_subcool=actual_dT_subcool, ) except Exception: return None rho_ref_cmp_in = cycle_states["rho_ref_cmp_in [kg/m3]"] h_ref_cmp_in = cycle_states["h_ref_cmp_in [J/kg]"] h_ref_cmp_out = cycle_states["h_ref_cmp_out [J/kg]"] h_ref_exp_in = cycle_states["h_ref_exp_in [J/kg]"] h_ref_exp_out = cycle_states["h_ref_exp_out [J/kg]"] if (h_ref_cmp_out - h_ref_exp_in) <= 0: return None # 3. Cycle Performance m_dot_ref = Q_cond_load / (h_ref_cmp_out - h_ref_exp_in) Q_ref_cond = Q_cond_load Q_ref_evap = m_dot_ref * (h_ref_cmp_in - h_ref_exp_out) E_cmp = m_dot_ref * (h_ref_cmp_out - h_ref_cmp_in) cmp_rps = m_dot_ref / (self.V_disp_cmp * rho_ref_cmp_in) # 4. NTU Evaporator Analysis NTU_evap = self.UA_evap / m_dot_cp_b eps = 1.0 - math.exp(-NTU_evap) Q_evap_actual = eps * m_dot_cp_b * (T_evap_in_K - T_ref_evap_sat_K) err = Q_ref_evap - Q_evap_actual # Penalize if cycle evap load exceeds physics limit penalty = 0.0 if Q_ref_evap > Q_evap_actual: penalty = 1e4 * (Q_ref_evap - Q_evap_actual) ** 2 # 5. BHE state Q_bhe = Q_ref_evap - self.E_pmp Q_bhe_unit = Q_bhe / self.H_b # Fluid enters BHE at T_bhe_f_in_K T_bhe_f_in_K = T_evap_in_K - Q_ref_evap / m_dot_cp_b T_bhe_f_out_K = T_source_K T_bhe_f = (cu.K2C(T_bhe_f_in_K) + cu.K2C(T_bhe_f_out_K)) / 2 T_bhe = T_bhe_f + Q_bhe_unit * self.R_b # 6. Assemble result: dict = cycle_states.copy() result.update( { "hp_is_on": True, "converged": True, "_penalty": penalty, "err_Q_evap [W]": err, "T_ref_evap_sat [°C]": cu.K2C(cycle_states.get("T_ref_evap_sat_K", np.nan)), "T_ref_cond_sat_v [°C]": cu.K2C(cycle_states.get("T_ref_cond_sat_l_K", np.nan)), "T_ref_cond_sat_l [°C]": cu.K2C(cycle_states.get("T_ref_cond_sat_l_K", np.nan)), "T0 [°C]": T0, "T_ref_cmp_in [°C]": cu.K2C(cycle_states.get("T_ref_cmp_in_K", np.nan)), "T_ref_cmp_out [°C]": cu.K2C(cycle_states.get("T_ref_cmp_out_K", np.nan)), "T_ref_exp_in [°C]": cu.K2C(cycle_states.get("T_ref_exp_in_K", np.nan)), "T_ref_exp_out [°C]": cu.K2C(cycle_states.get("T_ref_exp_out_K", np.nan)), "T_cond [°C]": cu.K2C(cycle_states.get("T_ref_cond_sat_l_K", np.nan)), "T_tank_w [°C]": T_tank_w, "T_mix_w_out [°C]": self.T_mix_w_out, "T_tank_w_in [°C]": self.T_tank_w_in, "Ts [°C]": self.Ts, "T_bhe [°C]": T_bhe, "T_bhe_f [°C]": T_bhe_f, "T_bhe_f_in [°C]": cu.K2C(T_bhe_f_in_K), "T_bhe_f_out [°C]": cu.K2C(T_bhe_f_out_K), "dV_bhe_f [m3/s]": self.dV_b_f_m3s, "dV_mix_w_out [m3/s]": flow_state.get("dV_mix_w_out", 0.0), "dV_tank_w_in [m3/s]": flow_state.get("dV_tank_w_in", 0.0), "dV_tank_w_out [m3/s]": flow_state.get("dV_tank_w_out", 0.0), "dV_mix_sup_w_in [m3/s]": flow_state.get("dV_mix_sup_w_in", 0.0), "P_ref_evap_sat [Pa]": cycle_states.get("P_ref_cmp_in [Pa]", np.nan), "P_ref_cond_sat_l [Pa]": cycle_states.get("P_ref_exp_in [Pa]", np.nan), "m_dot_ref [kg/s]": m_dot_ref, "cmp_rpm [rpm]": cmp_rps * 60, "h_ref_evap_sat [J/kg]": CP.PropsSI( "H", "P", cycle_states.get("P_ref_cmp_in [Pa]", 1e5), "Q", 1, self.ref ), "h_ref_cond_sat_v [J/kg]": CP.PropsSI( "H", "P", cycle_states.get("P_ref_cmp_out [Pa]", 1e6), "Q", 1, self.ref ), "h_ref_cond_sat_l [J/kg]": h_ref_exp_in, "Q_cond_load [W]": Q_cond_load, "Q_ref_cond [W]": Q_ref_cond, "Q_ref_evap [W]": Q_ref_evap, "Q_bhe [W]": Q_bhe, "E_cmp [W]": E_cmp, "E_pmp [W]": self.E_pmp, "E_tot [W]": E_cmp + self.E_pmp, } ) return result def _optimize_operation(self, T_tank_w: float, Q_cond_load: float, T0: float, *, flow_state: dict): from scipy.optimize import brentq self._opt_evals = getattr(self, "_opt_evals", 0) def _objective(dT_evap): self._opt_evals += 1 perf = self._calc_state( dT_ref_evap=dT_evap, T_tank_w=T_tank_w, Q_cond_load=Q_cond_load, T0=T0, flow_state=flow_state ) if perf is None: raise ValueError(f"Cycle impossible at dT_evap={dT_evap}") err = perf.get("err_Q_evap [W]", np.nan) if np.isnan(err): raise ValueError(f"NaN error at dT_evap={dT_evap}") return err self._opt_evals = 0 try: opt_x = brentq(_objective, 1, 20.0, xtol=1e-4, maxiter=50) class OptRes: success = True x = opt_x return OptRes() except Exception: class OptResFail: success = False x = np.nan return OptResFail() def _determine_hp_state(self, ctx: StepContext, is_on_prev: bool) -> tuple[bool, dict, float]: T_tank_w = cu.K2C(ctx.T_tank_w_K) hp_is_on = determine_heat_source_on_off( T_tank_w_C=T_tank_w, T_lower=self.T_tank_w_lower_bound, T_upper=self.T_tank_w_upper_bound, is_on_prev=is_on_prev, hour_of_day=ctx.hour_of_day, on_schedule=self.hp_on_schedule, ) Q_cond_load = self.hp_capacity if hp_is_on else 0.0 flow_state = self._calc_tank_flow_context( dV_mix_w_out=ctx.dV_mix_w_out, T_tank_w_K=ctx.T_tank_w_K, T_tank_w_in_K=self.T_tank_w_in_K, T_mix_w_out_K=self.T_mix_w_out_K, ) if Q_cond_load <= self.Q_cond_LOAD_OFF_TOL: # OFF perf = self._calc_off_state(T_tank_w, cu.K2C(ctx.T0_K), flow_state=flow_state) return False, perf, 0.0 else: # ON opt_res = self._optimize_operation(T_tank_w, Q_cond_load, cu.K2C(ctx.T0_K), flow_state=flow_state) if opt_res.success: opt_x = float(getattr(opt_res, "x", 0.0)) perf = self._calc_state(opt_x, T_tank_w, Q_cond_load, cu.K2C(ctx.T0_K), flow_state=flow_state) if perf is None: perf = self._calc_off_state(T_tank_w, cu.K2C(ctx.T0_K), flow_state=flow_state) else: perf = self._calc_off_state(T_tank_w, cu.K2C(ctx.T0_K), flow_state=flow_state) perf["hp_is_on"] = True perf["converged"] = opt_res.success Q_ref_cond_actual = perf.get("Q_ref_cond [W]", 0.0) if np.isnan(Q_ref_cond_actual): Q_ref_cond_actual = 0.0 return True, perf, Q_ref_cond_actual # ============================================================= # Hooks # ============================================================= def _get_activation_flags(self, hour_of_day: float) -> dict[str, bool]: flags = {} if self.stc is not None: flags["stc"] = self.stc.is_preheat_on(hour_of_day) return flags def _needs_solar_input(self) -> bool: return self.stc is not None def _build_residual_fn( self, ctx: StepContext, ctrl: ControlState, dt_s: float, T_tank_w_in_K_n: float, T_sup_w_K_n: float, tank_level: float, sub_states: dict, ): def residual(T_cand_K: float) -> float: return tank_mass_energy_residual( [T_cand_K, tank_level], ctx, ctrl, dt_s, T_tank_w_in_K_n, T_sup_w_K_n, self.T_mix_w_out_K, self.C_tank, self.UA_tank, self.V_tank_full, self._subsystems, sub_states, )[0] return residual def _run_subsystems( self, ctx: StepContext, ctrl: ControlState, dt: float, T_tank_w_in_K: float, ) -> dict[str, dict]: states = {} for name, sub in self._subsystems.items(): if hasattr(sub, "step"): states[name] = sub.step(ctx, ctrl, dt, T_tank_w_in_K) return states def _augment_results( self, r: dict, ctx: StepContext, ctrl: ControlState, sub_states: dict[str, dict], T_solved_K: float, ) -> dict: for name, sub in self._subsystems.items(): if hasattr(sub, "assemble_results"): sub_record = sub.assemble_results( ctx, ctrl, sub_states.get(name, {}), T_solved_K, ) r.update(sub_record) return r def _postprocess(self, df: pd.DataFrame) -> pd.DataFrame: return self.postprocess_exergy(df) def _assemble_core_results( self, ctx: StepContext, ctrl: ControlState, T_solved_K: float, level_solved: float, perf: dict, flow_state: dict ) -> dict: r = perf.copy() r["T_tank_w [°C]"] = cu.K2C(T_solved_K) r["T0 [°C]"] = cu.K2C(ctx.T0_K) r["hp_is_on"] = ctrl.is_on Q_tank_loss = self.UA_tank * (T_solved_K - ctx.T0_K) mix = calc_mixing_valve_temp(T_solved_K, self.T_tank_w_in_K, self.T_mix_w_out_K) r["T_mix_w_out [°C]"] = cu.K2C(mix["T_mix_w_out_K"]) r["Q_tank_loss [W]"] = Q_tank_loss r["dV_mix_w_out [m3/s]"] = ctx.dV_mix_w_out r["dV_tank_w_in [m3/s]"] = flow_state["dV_tank_w_in"] r["dV_tank_w_out [m3/s]"] = flow_state["dV_tank_w_out"] r["dV_mix_sup_w_in [m3/s]"] = flow_state["dV_mix_sup_w_in"] r["tank_level [-]"] = 1.0 # lumped capacitance if not self.tank_always_full or (self.tank_always_full and self.prevent_simultaneous_flow): r["tank_level [-]"] = level_solved r.pop("_penalty", None) return r # _compute_bhe_superposition removed: river water operates as infinite capacity boundary. # ============================================================= # Orchestration # =============================================================
[docs] def analyze_dynamic( self, simulation_period_sec: float, dt_s: float, T_tank_w_init_C: float, dhw_usage_schedule, T0_schedule, I_DN_schedule=None, I_dH_schedule=None, T_sup_w_schedule=None, T_source_w_schedule=None, tank_level_init: float = 1.0, result_save_csv_path=None, ) -> pd.DataFrame: from scipy.optimize import fsolve time = np.arange(0, simulation_period_sec, dt_s) tN = len(time) T0_schedule = np.array(T0_schedule) if I_DN_schedule is None: I_DN_schedule = np.zeros(tN) if I_dH_schedule is None: I_dH_schedule = np.zeros(tN) if T_sup_w_schedule is not None: T_sup_w_arr = np.array(T_sup_w_schedule, dtype=float) else: T_sup_w_arr = np.full(tN, cu.K2C(self.T_tank_w_in_K)) if T_source_w_schedule is not None: T_source_w_arr = np.array(T_source_w_schedule, dtype=float) else: T_source_w_arr = np.full(tN, self.Ts) results_data = [] self.time = time self.dt = dt_s T_tank_w_K = cu.C2K(T_tank_w_init_C) tank_level = tank_level_init is_on_prev = False is_refilling = False self.T_bhe = self.Ts self.T_bhe_f_in = self.Ts self.T_bhe_f_in_K = self.Ts_K self.T_bhe_f_out = self.Ts self.Q_bhe = 0.0 # DHW schedule handling: direct m³/s flow array dhw_flow_m3s = np.asarray(dhw_usage_schedule, dtype=float) if len(dhw_flow_m3s) != tN: raise ValueError(f"dhw_usage_schedule length ({len(dhw_flow_m3s)}) != tN ({tN})") _use_solar = self._needs_solar_input() for n in tqdm(range(tN), desc="GSHPB Simulating"): t_s = time[n] hr = t_s * cu.s2h hour_of_day = (t_s % (24 * 3600)) * cu.s2h T0_K = cu.C2K(T0_schedule[n]) T_sup_w_n = T_sup_w_arr[n] T_sup_w_K_n = cu.C2K(T_sup_w_n) # Set boundary condition for water source T_source_w_n = T_source_w_arr[n] if np.isnan(T_source_w_n): # Fallback to last known BHE wall temperature when source schedule has NaN. # WARNING: if all schedule values are NaN, this silently uses self.Ts (init value). if n == 0 or n % 43200 == 0: # Log at start and every ~30 days import warnings warnings.warn( f"[WSHPB] T_source_w_schedule[{n}] is NaN — falling back to " f"T_bhe={self.T_bhe:.2f}°C. Check NIER data quality.", RuntimeWarning, stacklevel=2, ) T_source_w_n = self.T_bhe # fallback to last known valid T_source_w_K_n = cu.C2K(T_source_w_n) # Subsystem activation activation_flags = self._get_activation_flags(hour_of_day) dV_mix_w_out = dhw_flow_m3s[n] ctx = StepContext( n=n, current_time_s=t_s, current_hour=hr, hour_of_day=hour_of_day, T0=T0_schedule[n], T0_K=T0_K, activation_flags=activation_flags, T_tank_w_K=T_tank_w_K, tank_level=tank_level, dV_mix_w_out=dV_mix_w_out, I_DN=I_DN_schedule[n] if _use_solar else 0.0, I_dH=I_dH_schedule[n] if _use_solar else 0.0, T_sup_w_K=T_sup_w_K_n, ) # --- Phase A: Control Decisions --- hp_is_on, hp_result, Q_ref_cond = self._determine_hp_state(ctx, is_on_prev) is_on_prev = hp_is_on # Refill logic flow_state_guess = self._calc_tank_flow_context( dV_mix_w_out=ctx.dV_mix_w_out, T_tank_w_K=ctx.T_tank_w_K, T_tank_w_in_K=T_sup_w_K_n, T_mix_w_out_K=self.T_mix_w_out_K, ) dV_tank_w_in_ctrl, is_refilling = determine_tank_refill_flow( dt=dt_s, tank_level=ctx.tank_level, dV_tank_w_out=flow_state_guess["dV_tank_w_out"], V_tank_full=self.V_tank_full, tank_always_full=self.tank_always_full, prevent_simultaneous_flow=self.prevent_simultaneous_flow, tank_level_lower_bound=self.tank_level_lower_bound, tank_level_upper_bound=self.tank_level_upper_bound, dV_tank_w_in_refill=self.dV_tank_w_in_refill, is_refilling=is_refilling, ) ctrl = ControlState( is_on=hp_is_on, Q_heat_source=Q_ref_cond, dV_tank_w_in_ctrl=dV_tank_w_in_ctrl, ) # --- Phase B: Implicit Solving --- sub_states = self._run_subsystems(ctx, ctrl, dt_s, T_sup_w_K_n) alp_prev: float = min( 1.0, max(0.0, (self.T_mix_w_out_K - T_sup_w_K_n) / max(1e-6, ctx.T_tank_w_K - T_sup_w_K_n)) ) dV_tank_w_out_prev = alp_prev * ctx.dV_mix_w_out dV_tank_w_in_prev = dV_tank_w_out_prev if ctrl.dV_tank_w_in_ctrl is None else ctrl.dV_tank_w_in_ctrl tank_vol_change_prev = (dV_tank_w_in_prev - dV_tank_w_out_prev) * dt_s level_next_approx = min(1.0, max(0.0, ctx.tank_level + tank_vol_change_prev / self.V_tank_full)) tank_level_solve = max(0.001, level_next_approx) res_fn = self._build_residual_fn( ctx=ctx, ctrl=ctrl, dt_s=dt_s, T_tank_w_in_K_n=T_sup_w_K_n, T_sup_w_K_n=T_sup_w_K_n, tank_level=tank_level_solve, sub_states=sub_states, ) from typing import cast T_guess_K = ctx.T_tank_w_K try: T_solved_K_arr = cast(np.ndarray, fsolve(res_fn, x0=[T_guess_K])) T_solved_K = float(T_solved_K_arr[0]) except Exception: # explicit Euler fallback Q_hp_val = ctrl.Q_heat_source Q_flow_curr = c_w * rho_w * dV_tank_w_out_prev * (T_sup_w_K_n - ctx.T_tank_w_K) Q_loss_curr = self.UA_tank * (ctx.T_tank_w_K - ctx.T0_K) Q_tot = Q_hp_val + Q_flow_curr - Q_loss_curr T_solved_K = ctx.T_tank_w_K + dt_s * Q_tot / (self.C_tank * tank_level_solve) if T_solved_K <= T_sup_w_K_n: T_solved_K = T_sup_w_K_n # Flow state evaluated at solved temperature flow_state_final = self._calc_tank_flow_context( dV_mix_w_out=ctx.dV_mix_w_out, T_tank_w_K=T_solved_K, T_tank_w_in_K=T_sup_w_K_n, T_mix_w_out_K=self.T_mix_w_out_K, dV_tank_w_in_override=ctrl.dV_tank_w_in_ctrl, ) tank_vol_change_final = (flow_state_final["dV_tank_w_in"] - flow_state_final["dV_tank_w_out"]) * dt_s level_next = min(1.0, max(0.0, ctx.tank_level + tank_vol_change_final / self.V_tank_full)) # Heat extraction calculation for reporting Q_bhe_unit = hp_result.get("Q_bhe [W]", 0.0) / self.H_b if hp_is_on else 0.0 self.Q_bhe = Q_bhe_unit * self.H_b m_cp_b = c_w * rho_w * self.dV_b_f_m3s # Infinite heat capacity boundary (River/Lake), no ground interference self.T_bhe = T_source_w_n T_bhe_K = cu.C2K(self.T_bhe) # Thermal resistance of pipe T_bhe_f_K = T_bhe_K - Q_bhe_unit * self.R_b self.T_bhe_f = cu.K2C(T_bhe_f_K) # Assume symmetrical temperature approach around average BHE fluid temperature dT_bhe_f_half = float((self.Q_bhe / m_cp_b) / 2) if m_cp_b > 0 else 0.0 self.T_bhe_f_in_K = T_bhe_f_K - dT_bhe_f_half self.T_bhe_f_in = cu.K2C(self.T_bhe_f_in_K) T_bhe_f_out_K = T_bhe_f_K + dT_bhe_f_half self.T_bhe_f_out = cu.K2C(T_bhe_f_out_K) self.T_bhe_f_out_K = T_bhe_f_out_K # Apply BHE state to hp_result hp_result["Ts [°C]"] = T_source_w_n # Override: record timestep-specific river water temp (not the static init value self.Ts) hp_result["T_bhe [°C]"] = self.T_bhe hp_result["T_bhe_f [°C]"] = self.T_bhe_f hp_result["T_bhe_f_in [°C]"] = self.T_bhe_f_in hp_result["T_bhe_f_out [°C]"] = self.T_bhe_f_out # Assemble step results step_record = self._assemble_core_results(ctx, ctrl, T_solved_K, level_next, hp_result, flow_state_final) self._augment_results(step_record, ctx, ctrl, sub_states, T_solved_K) results_data.append(step_record) # Step forward T_tank_w_K = T_solved_K tank_level = level_next results_df = pd.DataFrame(results_data) results_df.ffill(inplace=True) results_df = self._postprocess(results_df) if result_save_csv_path: results_df.to_csv(result_save_csv_path, index=False) return results_df
[docs] def analyze_steady( self, T_tank_w: float, T_source: float, Q_ref_cond: float, T0: float = 0.0, *, return_dict: bool = True, ) -> dict | pd.DataFrame: """Run a steady-state performance snapshot. Evaluates the refrigerant cycle at a given operating point (``T_tank_w``, ``T_source``, ``Q_ref_cond``) **without** solving the tank energy balance or tracking dynamic flows. Parameters ---------- T_tank_w : float Tank water temperature [°C] — treated as a given input. T_source : float Source fluid temperature entering the heat pump [°C]. Q_ref_cond : float Target condenser heat rate [W]. T0 : float Dead-state / outdoor-air temperature [°C] (for exergy calculations). return_dict : bool If ``True`` return dict; else single-row DataFrame. Returns ------- dict | pd.DataFrame """ import warnings import contextlib # Empty flow state as steady state ignores dynamic withdrawal/refill flow_state = { "dV_mix_w_out": 0.0, "dV_tank_w_out": 0.0, "dV_tank_w_in": 0.0, "dV_mix_sup_w_in": 0.0, "alp": 0.0, } # Override T_bhe_f_out_K so that _calc_state uses T_source correctly self.T_bhe_f_out_K = cu.C2K(T_source) if Q_ref_cond <= 0: result = self._calc_off_state( T_tank_w=T_tank_w, T0=T0, flow_state=flow_state, ) else: opt_result = self._optimize_operation( T_tank_w=T_tank_w, Q_cond_load=Q_ref_cond, T0=T0, flow_state=flow_state, ) result = None with contextlib.suppress(Exception): opt_x = float(getattr(opt_result, "x", 5.0)) result = self._calc_state( dT_ref_evap=opt_x, T_tank_w=T_tank_w, Q_cond_load=Q_ref_cond, T0=T0, flow_state=flow_state, ) if result is None or not isinstance(result, dict): warnings.warn( f"analyze_steady: optimization failed " f"(T_tank_w={T_tank_w:.1f}°C, T_source={T_source:.1f}°C, " f"Q_ref_cond={Q_ref_cond:.0f}W). " "Returning HP-off state.", RuntimeWarning, stacklevel=2, ) try: result = self._calc_state( dT_ref_evap=5.0, T_tank_w=T_tank_w, Q_cond_load=0.0, T0=T0, flow_state=flow_state, ) except Exception: result = self._calc_off_state( T_tank_w=T_tank_w, T0=T0, flow_state=flow_state, ) if ( result is not None and isinstance(result, dict) and "opt_result" in locals() and hasattr(opt_result, "success") ): result["converged"] = opt_result.success if result is None: result = {} if result: # Steady state doesn't have tank loss because we don't solve tank mass/energy balance result["Q_tank_loss [W]"] = 0.0 result["tank_level [-]"] = 1.0 # steady-state: always_full if return_dict: return result return pd.DataFrame([result])
[docs] def postprocess_exergy(self, df: pd.DataFrame) -> pd.DataFrame: """Compute GSHPB-specific exergy variables.""" from .thermodynamics import calc_energy_flow, calc_refrigerant_exergy, convert_electricity_to_exergy df = df.copy() T0_K = cu.C2K(df["T0 [°C]"]) T_tank_K = cu.C2K(df["T_tank_w [°C]"]) df["Q_tank_w_out [W]"] = calc_energy_flow(c_w * rho_w * df["dV_tank_w_out [m3/s]"].fillna(0), T_tank_K, T0_K) # 1. Refrigerant state points df = calc_refrigerant_exergy(df, self.ref, T0_K) df = convert_electricity_to_exergy(df) # 2. Exergy flows G_b = c_w * rho_w * df["dV_bhe_f [m3/s]"] T_bhe_f_in_K = cu.C2K(df["T_bhe_f_in [°C]"]) T_bhe_f_out_K = cu.C2K(df["T_bhe_f_out [°C]"]) # Exergy at BHE boundaries X_bhe_in = calc_exergy_flow(G_b, T_bhe_f_in_K, T0_K) X_bhe_out = calc_exergy_flow(G_b, T_bhe_f_out_K, T0_K) # Fluid enters evaporator after being heated by the pump T_evap_in_K = T_bhe_f_out_K + df["E_pmp [W]"] / G_b.replace(0, np.nan) X_evap_in = calc_exergy_flow(G_b, T_evap_in_K, T0_K) # Fluid leaves evaporator and enters BHE X_evap_out = X_bhe_in df["X_ref_cond [W]"] = df["Q_ref_cond [W]"] * (1 - T0_K / cu.C2K(df["T_ref_cond_sat_v [°C]"])) df["X_ref_evap [W]"] = df["Q_ref_evap [W]"] * (1 - T0_K / cu.C2K(df["T_ref_evap_sat [°C]"])) df["X_tank_w_in [W]"] = calc_exergy_flow( c_w * rho_w * df["dV_tank_w_in [m3/s]"].fillna(0), cu.C2K(df["T_tank_w_in [°C]"]), T0_K ) df["X_tank_w_out [W]"] = calc_exergy_flow(c_w * rho_w * df["dV_tank_w_out [m3/s]"].fillna(0), T_tank_K, T0_K) df["X_mix_w_out [W]"] = calc_exergy_flow( c_w * rho_w * df["dV_mix_w_out [m3/s]"].fillna(0), cu.C2K(df["T_mix_w_out [°C]"]), T0_K ) df["X_mix_sup_w_in [W]"] = calc_exergy_flow( c_w * rho_w * df["dV_mix_sup_w_in [m3/s]"].fillna(0), cu.C2K(df["T_tank_w_in [°C]"]), T0_K ) df["X_tank_loss [W]"] = df["Q_tank_loss [W]"] * (1 - T0_K / T_tank_K) tank_lvl = df["tank_level [-]"].fillna(1.0) if "tank_level [-]" in df.columns else 1.0 C_tank_actual = self.C_tank * tank_lvl T_tank_K_prev = T_tank_K.shift(1) df["Xst_tank [W]"] = (1 - T0_K / T_tank_K) * C_tank_actual * (T_tank_K - T_tank_K_prev) / self.dt df.loc[df.index[0], "Xst_tank [W]"] = 0.0 import typing # Subsystems exergy X_sub_tot_add = typing.cast(typing.Any, 0.0) X_sub_in_tank_add = typing.cast(typing.Any, 0.0) X_sub_out_tank_add = typing.cast(typing.Any, 0.0) for _name, sub in self._subsystems.items(): if hasattr(sub, "calc_exergy"): ex_res = sub.calc_exergy(df, T0_K) if ex_res is not None: for col_name, s in ex_res.columns.items(): df[col_name] = s X_sub_tot_add = X_sub_tot_add + ex_res.X_tot_add X_sub_in_tank_add = X_sub_in_tank_add + ex_res.X_in_tank_add X_sub_out_tank_add = X_sub_out_tank_add + ex_res.X_out_tank_add # Components Destruction df["X_tot [W]"] = df["E_cmp [W]"] + df["E_pmp [W]"] + df.get("X_uv [W]", 0.0) + X_sub_tot_add df["Xc_cmp [W]"] = df["X_cmp [W]"] + df["X_ref_cmp_in [W]"] - df["X_ref_cmp_out [W]"] df["Xc_cond [W]"] = (df["X_ref_cmp_out [W]"] - df["X_ref_exp_in [W]"]) - df["X_ref_cond [W]"] df["Xc_exp [W]"] = df["X_ref_exp_in [W]"] - df["X_ref_exp_out [W]"] df["Xc_evap [W]"] = (X_evap_in - X_evap_out) - df["X_ref_evap [W]"] df["Xc_pmp [W]"] = df["E_pmp [W]"] - (X_evap_in - X_bhe_out) X_in_tank = df["X_ref_cond [W]"] + df["X_tank_w_in [W]"].fillna(0) + df.get("X_uv [W]", 0.0) + X_sub_in_tank_add X_out_tank = df["Xst_tank [W]"] + df["X_tank_w_out [W]"].fillna(0) + X_sub_out_tank_add df["Xc_tank [W]"] = X_in_tank - X_out_tank df["Xc_mix [W]"] = ( df["X_tank_w_out [W]"].fillna(0) + df["X_mix_sup_w_in [W]"].fillna(0) - df["X_mix_w_out [W]"].fillna(0) ) # Efficiency df["X_eff_ref [-]"] = df["X_ref_cond [W]"] / df["X_cmp [W]"].replace(0, np.nan) df["X_eff_sys [-]"] = df["X_ref_cond [W]"] / df["X_tot [W]"].replace(0, np.nan) return df