examples.example_3
Constrained value representing a validated ventilation temperature in Celsius.
Pipeline stages
The pipeline processes input in multiple steps:
TypeValidationStrategy(int) Ensure the register selector is an integer.
AllowedInputRegister Ensure the chosen register index is one of the supported registers {0, 1, 2, 3}.
GetValueFromRegister Fetch the raw sensor value from the provided Modbus input register list.
DetectSensorErrors Check for hardware error codes:
- -32768 → "No sensor detected"
- 32767 → "Sensor short circuit"
RawToCelsius Convert the raw integer to Celsius by dividing by 10.0.
Parent pipeline (ConstrainedRangeValue)
- Type normalization (float)
- Coercion to type of
low_value(float) - Range validation (-10.0 .. 40.0 °C)
1""" 2Constrained value representing a validated ventilation temperature in Celsius. 3 4Pipeline stages 5--------------- 6The pipeline processes input in multiple steps: 7 81. **TypeValidationStrategy(int)** 9 Ensure the register selector is an integer. 10 112. **AllowedInputRegister** 12 Ensure the chosen register index is one of the supported registers {0, 1, 2, 3}. 13 143. **GetValueFromRegister** 15 Fetch the raw sensor value from the provided Modbus input register list. 16 174. **DetectSensorErrors** 18 Check for hardware error codes: 19 - -32768 → "No sensor detected" 20 - 32767 → "Sensor short circuit" 21 225. **RawToCelsius** 23 Convert the raw integer to Celsius by dividing by 10.0. 24 256. **Parent pipeline (ConstrainedRangeValue)** 26 - Type normalization (float) 27 - Coercion to type of `low_value` (`float`) 28 - Range validation (-10.0 .. 40.0 °C) 29""" 30import sys, pathlib 31from typing import List 32from constrained_values import (Response, Status, RangeValue, 33 ValidationStrategy, TypeValidationStrategy, DEFAULT_SUCCESS_MESSAGE) 34from constrained_values.response import StatusResponse 35from constrained_values.value import TransformationStrategy, PipeLineStrategy 36 37# Make repo root importable when running this file directly 38sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[1])) 39 40 41class AllowedInputRegister(ValidationStrategy[int]): 42 """Checks if the selected register address is valid.""" 43 44 def validate(self, value: int) -> StatusResponse: 45 valid_registers = {0, 1, 2, 3} 46 if value in valid_registers: 47 return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE) 48 return StatusResponse(status=Status.EXCEPTION, details="Invalid temperature register selected") 49 50 51class GetValueFromRegister(TransformationStrategy[int, int]): 52 """Fetches the raw integer from the Modbus data list.""" 53 54 def __init__(self, input_register: List[int]): 55 self.input_register = input_register 56 57 def transform(self, value: int) -> Response[int]: 58 raw_sensor_value = self.input_register[value] 59 return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=raw_sensor_value) 60 61 62class DetectSensorErrors(ValidationStrategy[int]): 63 """Checks for hardware-specific error codes.""" 64 NO_SENSOR = -32768 65 SENSOR_SHORT = 32767 66 67 def validate(self, value: int) -> StatusResponse: 68 if value == self.NO_SENSOR: 69 return StatusResponse(status=Status.EXCEPTION, details="No sensor detected") 70 if value == self.SENSOR_SHORT: 71 return StatusResponse(status=Status.EXCEPTION, details="Sensor short circuit") 72 return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE) 73 74 75class RawToCelsius(TransformationStrategy[int, float]): 76 """Transforms the raw integer to a float in degrees Celsius.""" 77 78 def transform(self, value: int) -> Response[float]: 79 celsius = float(value) / 10.0 80 return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=celsius) 81 82 83class VentilationTemperature(RangeValue[float]): 84 """ 85 This value object encapsulates the full pipeline of reading and validating 86 temperature data from Modbus input registers, converting to Celsius, and 87 enforcing an allowed range. 88 """ 89 __slots__ = ("_getValueFromRegister",) 90 91 def __init__(self, input_register: Response[int], selected_register: int): 92 object.__setattr__(self, "_getValueFromRegister", GetValueFromRegister(input_register)) 93 super().__init__(selected_register, -10.0, 40.0) 94 95 def get_strategies(self) -> List[PipeLineStrategy]: 96 return [TypeValidationStrategy(int), 97 AllowedInputRegister(), 98 self._getValueFromRegister, 99 DetectSensorErrors(), 100 RawToCelsius()] + super().get_strategies() 101 102 103def main(): 104 registers = [215, -32768, 32767, 402] # Example Modbus register values 105 106 print("=== Valid register 0 ===") 107 v = VentilationTemperature(registers, 0) 108 print(f"status={v.status}, details={v.details}, value={v.value}") # → 21.5°C 109 # Output # status=Status.OK, details=validation successful, value=21.5 110 111 print("\n=== Invalid: No sensor detected (register 1) ===") 112 v = VentilationTemperature(registers, 1) 113 print(f"status={v.status}, details={v.details}") 114 # Output # status=Status.EXCEPTION, details=No sensor detected 115 116 print("\n=== Invalid: Sensor short circuit (register 2) ===") 117 v = VentilationTemperature(registers, 2) 118 print(f"status={v.status}, details={v.details}") 119 # Output # status=Status.EXCEPTION, details=Sensor short circuit 120 121 print("\n=== Out of range (register 3) ===") 122 v = VentilationTemperature(registers, 3) 123 print(f"status={v.status}, details={v.details}") 124 # Output # status=Status.EXCEPTION, details=Value must be less than or equal to 40.0, got 40.2 125 126 127if __name__ == "__main__": 128 main()
42class AllowedInputRegister(ValidationStrategy[int]): 43 """Checks if the selected register address is valid.""" 44 45 def validate(self, value: int) -> StatusResponse: 46 valid_registers = {0, 1, 2, 3} 47 if value in valid_registers: 48 return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE) 49 return StatusResponse(status=Status.EXCEPTION, details="Invalid temperature register selected")
Checks if the selected register address is valid.
45 def validate(self, value: int) -> StatusResponse: 46 valid_registers = {0, 1, 2, 3} 47 if value in valid_registers: 48 return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE) 49 return StatusResponse(status=Status.EXCEPTION, details="Invalid temperature register selected")
Validate value and return a StatusResponse.
Arguments:
- value (MidT): Value to validate.
Returns:
StatusResponse: Validation outcome (status and details).
52class GetValueFromRegister(TransformationStrategy[int, int]): 53 """Fetches the raw integer from the Modbus data list.""" 54 55 def __init__(self, input_register: List[int]): 56 self.input_register = input_register 57 58 def transform(self, value: int) -> Response[int]: 59 raw_sensor_value = self.input_register[value] 60 return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=raw_sensor_value)
Fetches the raw integer from the Modbus data list.
58 def transform(self, value: int) -> Response[int]: 59 raw_sensor_value = self.input_register[value] 60 return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=raw_sensor_value)
Transform value and return a Response.
Arguments:
- value (InT): Input value to transform.
Returns:
Response[OutT]: The transformed value with status and details.
63class DetectSensorErrors(ValidationStrategy[int]): 64 """Checks for hardware-specific error codes.""" 65 NO_SENSOR = -32768 66 SENSOR_SHORT = 32767 67 68 def validate(self, value: int) -> StatusResponse: 69 if value == self.NO_SENSOR: 70 return StatusResponse(status=Status.EXCEPTION, details="No sensor detected") 71 if value == self.SENSOR_SHORT: 72 return StatusResponse(status=Status.EXCEPTION, details="Sensor short circuit") 73 return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE)
Checks for hardware-specific error codes.
68 def validate(self, value: int) -> StatusResponse: 69 if value == self.NO_SENSOR: 70 return StatusResponse(status=Status.EXCEPTION, details="No sensor detected") 71 if value == self.SENSOR_SHORT: 72 return StatusResponse(status=Status.EXCEPTION, details="Sensor short circuit") 73 return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE)
Validate value and return a StatusResponse.
Arguments:
- value (MidT): Value to validate.
Returns:
StatusResponse: Validation outcome (status and details).
76class RawToCelsius(TransformationStrategy[int, float]): 77 """Transforms the raw integer to a float in degrees Celsius.""" 78 79 def transform(self, value: int) -> Response[float]: 80 celsius = float(value) / 10.0 81 return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=celsius)
Transforms the raw integer to a float in degrees Celsius.
79 def transform(self, value: int) -> Response[float]: 80 celsius = float(value) / 10.0 81 return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=celsius)
Transform value and return a Response.
Arguments:
- value (InT): Input value to transform.
Returns:
Response[OutT]: The transformed value with status and details.
84class VentilationTemperature(RangeValue[float]): 85 """ 86 This value object encapsulates the full pipeline of reading and validating 87 temperature data from Modbus input registers, converting to Celsius, and 88 enforcing an allowed range. 89 """ 90 __slots__ = ("_getValueFromRegister",) 91 92 def __init__(self, input_register: Response[int], selected_register: int): 93 object.__setattr__(self, "_getValueFromRegister", GetValueFromRegister(input_register)) 94 super().__init__(selected_register, -10.0, 40.0) 95 96 def get_strategies(self) -> List[PipeLineStrategy]: 97 return [TypeValidationStrategy(int), 98 AllowedInputRegister(), 99 self._getValueFromRegister, 100 DetectSensorErrors(), 101 RawToCelsius()] + super().get_strategies()
This value object encapsulates the full pipeline of reading and validating temperature data from Modbus input registers, converting to Celsius, and enforcing an allowed range.
96 def get_strategies(self) -> List[PipeLineStrategy]: 97 return [TypeValidationStrategy(int), 98 AllowedInputRegister(), 99 self._getValueFromRegister, 100 DetectSensorErrors(), 101 RawToCelsius()] + super().get_strategies()
Return the ordered list of strategies for this pipeline.
Returns:
List[PipeLineStrategy]: Transformation and validation steps in the order to apply.
Inherited Members
- constrained_values.constrained_value_types.RangeValue
- infer_valid_types_from_value
- get_custom_strategies
- constrained_values.value.ConstrainedValue
- status
- details
- value
- unwrap
- ok
104def main(): 105 registers = [215, -32768, 32767, 402] # Example Modbus register values 106 107 print("=== Valid register 0 ===") 108 v = VentilationTemperature(registers, 0) 109 print(f"status={v.status}, details={v.details}, value={v.value}") # → 21.5°C 110 # Output # status=Status.OK, details=validation successful, value=21.5 111 112 print("\n=== Invalid: No sensor detected (register 1) ===") 113 v = VentilationTemperature(registers, 1) 114 print(f"status={v.status}, details={v.details}") 115 # Output # status=Status.EXCEPTION, details=No sensor detected 116 117 print("\n=== Invalid: Sensor short circuit (register 2) ===") 118 v = VentilationTemperature(registers, 2) 119 print(f"status={v.status}, details={v.details}") 120 # Output # status=Status.EXCEPTION, details=Sensor short circuit 121 122 print("\n=== Out of range (register 3) ===") 123 v = VentilationTemperature(registers, 3) 124 print(f"status={v.status}, details={v.details}") 125 # Output # status=Status.EXCEPTION, details=Value must be less than or equal to 40.0, got 40.2