examples.example_3

Constrained value representing a validated ventilation temperature in Celsius.

Pipeline stages

The pipeline processes input in multiple steps:

  1. TypeValidationStrategy(int) Ensure the register selector is an integer.

  2. AllowedInputRegister Ensure the chosen register index is one of the supported registers {0, 1, 2, 3}.

  3. GetValueFromRegister Fetch the raw sensor value from the provided Modbus input register list.

  4. DetectSensorErrors Check for hardware error codes:

    • -32768 → "No sensor detected"
    • 32767 → "Sensor short circuit"
  5. RawToCelsius Convert the raw integer to Celsius by dividing by 10.0.

  6. Parent pipeline (ConstrainedRangeValue)

    • Type normalization (float)
    • Coercion to type of low_value (float)
    • Range validation (-10.0 .. 40.0 °C)
  1"""
  2Constrained value representing a validated ventilation temperature in Celsius.
  3
  4Pipeline stages
  5---------------
  6The pipeline processes input in multiple steps:
  7
  81. **TypeValidationStrategy(int)**
  9   Ensure the register selector is an integer.
 10
 112. **AllowedInputRegister**
 12   Ensure the chosen register index is one of the supported registers {0, 1, 2, 3}.
 13
 143. **GetValueFromRegister**
 15   Fetch the raw sensor value from the provided Modbus input register list.
 16
 174. **DetectSensorErrors**
 18   Check for hardware error codes:
 19     - -32768 → "No sensor detected"
 20     -  32767 → "Sensor short circuit"
 21
 225. **RawToCelsius**
 23   Convert the raw integer to Celsius by dividing by 10.0.
 24
 256. **Parent pipeline (ConstrainedRangeValue)**
 26   - Type normalization (float)
 27   - Coercion to type of `low_value` (`float`)
 28   - Range validation (-10.0 .. 40.0 °C)
 29"""
 30import sys, pathlib
 31from typing import List
 32from constrained_values import (Response, Status, RangeValue,
 33                                ValidationStrategy, TypeValidationStrategy, DEFAULT_SUCCESS_MESSAGE)
 34from constrained_values.response import StatusResponse
 35from constrained_values.value import TransformationStrategy, PipeLineStrategy
 36
 37# Make repo root importable when running this file directly
 38sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[1]))
 39
 40
 41class AllowedInputRegister(ValidationStrategy[int]):
 42    """Checks if the selected register address is valid."""
 43
 44    def validate(self, value: int) -> StatusResponse:
 45        valid_registers = {0, 1, 2, 3}
 46        if value in valid_registers:
 47            return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE)
 48        return StatusResponse(status=Status.EXCEPTION, details="Invalid temperature register selected")
 49
 50
 51class GetValueFromRegister(TransformationStrategy[int, int]):
 52    """Fetches the raw integer from the Modbus data list."""
 53
 54    def __init__(self, input_register: List[int]):
 55        self.input_register = input_register
 56
 57    def transform(self, value: int) -> Response[int]:
 58        raw_sensor_value = self.input_register[value]
 59        return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=raw_sensor_value)
 60
 61
 62class DetectSensorErrors(ValidationStrategy[int]):
 63    """Checks for hardware-specific error codes."""
 64    NO_SENSOR = -32768
 65    SENSOR_SHORT = 32767
 66
 67    def validate(self, value: int) -> StatusResponse:
 68        if value == self.NO_SENSOR:
 69            return StatusResponse(status=Status.EXCEPTION, details="No sensor detected")
 70        if value == self.SENSOR_SHORT:
 71            return StatusResponse(status=Status.EXCEPTION, details="Sensor short circuit")
 72        return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE)
 73
 74
 75class RawToCelsius(TransformationStrategy[int, float]):
 76    """Transforms the raw integer to a float in degrees Celsius."""
 77
 78    def transform(self, value: int) -> Response[float]:
 79        celsius = float(value) / 10.0
 80        return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=celsius)
 81
 82
 83class VentilationTemperature(RangeValue[float]):
 84    """
 85    This value object encapsulates the full pipeline of reading and validating
 86    temperature data from Modbus input registers, converting to Celsius, and
 87    enforcing an allowed range.
 88    """
 89    __slots__ = ("_getValueFromRegister",)
 90
 91    def __init__(self, input_register: Response[int], selected_register: int):
 92        object.__setattr__(self, "_getValueFromRegister", GetValueFromRegister(input_register))
 93        super().__init__(selected_register, -10.0, 40.0)
 94
 95    def get_strategies(self) -> List[PipeLineStrategy]:
 96        return [TypeValidationStrategy(int),
 97                AllowedInputRegister(),
 98                self._getValueFromRegister,
 99                DetectSensorErrors(),
100                RawToCelsius()] + super().get_strategies()
101
102
103def main():
104    registers = [215, -32768, 32767, 402]  # Example Modbus register values
105
106    print("=== Valid register 0 ===")
107    v = VentilationTemperature(registers, 0)
108    print(f"status={v.status}, details={v.details}, value={v.value}")  # → 21.5°C
109    # Output # status=Status.OK, details=validation successful, value=21.5
110
111    print("\n=== Invalid: No sensor detected (register 1) ===")
112    v = VentilationTemperature(registers, 1)
113    print(f"status={v.status}, details={v.details}")
114    # Output # status=Status.EXCEPTION, details=No sensor detected
115
116    print("\n=== Invalid: Sensor short circuit (register 2) ===")
117    v = VentilationTemperature(registers, 2)
118    print(f"status={v.status}, details={v.details}")
119    # Output # status=Status.EXCEPTION, details=Sensor short circuit
120
121    print("\n=== Out of range (register 3) ===")
122    v = VentilationTemperature(registers, 3)
123    print(f"status={v.status}, details={v.details}")
124    # Output # status=Status.EXCEPTION, details=Value must be less than or equal to 40.0, got 40.2
125
126
127if __name__ == "__main__":
128    main()
class AllowedInputRegister(constrained_values.value.ValidationStrategy[int]):
42class AllowedInputRegister(ValidationStrategy[int]):
43    """Checks if the selected register address is valid."""
44
45    def validate(self, value: int) -> StatusResponse:
46        valid_registers = {0, 1, 2, 3}
47        if value in valid_registers:
48            return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE)
49        return StatusResponse(status=Status.EXCEPTION, details="Invalid temperature register selected")

Checks if the selected register address is valid.

def validate(self, value: int) -> constrained_values.response.StatusResponse:
45    def validate(self, value: int) -> StatusResponse:
46        valid_registers = {0, 1, 2, 3}
47        if value in valid_registers:
48            return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE)
49        return StatusResponse(status=Status.EXCEPTION, details="Invalid temperature register selected")

Validate value and return a StatusResponse.

Arguments:
  • value (MidT): Value to validate.
Returns:

StatusResponse: Validation outcome (status and details).

class GetValueFromRegister(constrained_values.value.TransformationStrategy[int, int]):
52class GetValueFromRegister(TransformationStrategy[int, int]):
53    """Fetches the raw integer from the Modbus data list."""
54
55    def __init__(self, input_register: List[int]):
56        self.input_register = input_register
57
58    def transform(self, value: int) -> Response[int]:
59        raw_sensor_value = self.input_register[value]
60        return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=raw_sensor_value)

Fetches the raw integer from the Modbus data list.

GetValueFromRegister(input_register: List[int])
55    def __init__(self, input_register: List[int]):
56        self.input_register = input_register
input_register
def transform(self, value: int) -> constrained_values.Response[int]:
58    def transform(self, value: int) -> Response[int]:
59        raw_sensor_value = self.input_register[value]
60        return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=raw_sensor_value)

Transform value and return a Response.

Arguments:
  • value (InT): Input value to transform.
Returns:

Response[OutT]: The transformed value with status and details.

class DetectSensorErrors(constrained_values.value.ValidationStrategy[int]):
63class DetectSensorErrors(ValidationStrategy[int]):
64    """Checks for hardware-specific error codes."""
65    NO_SENSOR = -32768
66    SENSOR_SHORT = 32767
67
68    def validate(self, value: int) -> StatusResponse:
69        if value == self.NO_SENSOR:
70            return StatusResponse(status=Status.EXCEPTION, details="No sensor detected")
71        if value == self.SENSOR_SHORT:
72            return StatusResponse(status=Status.EXCEPTION, details="Sensor short circuit")
73        return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE)

Checks for hardware-specific error codes.

NO_SENSOR = -32768
SENSOR_SHORT = 32767
def validate(self, value: int) -> constrained_values.response.StatusResponse:
68    def validate(self, value: int) -> StatusResponse:
69        if value == self.NO_SENSOR:
70            return StatusResponse(status=Status.EXCEPTION, details="No sensor detected")
71        if value == self.SENSOR_SHORT:
72            return StatusResponse(status=Status.EXCEPTION, details="Sensor short circuit")
73        return StatusResponse(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE)

Validate value and return a StatusResponse.

Arguments:
  • value (MidT): Value to validate.
Returns:

StatusResponse: Validation outcome (status and details).

class RawToCelsius(constrained_values.value.TransformationStrategy[int, float]):
76class RawToCelsius(TransformationStrategy[int, float]):
77    """Transforms the raw integer to a float in degrees Celsius."""
78
79    def transform(self, value: int) -> Response[float]:
80        celsius = float(value) / 10.0
81        return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=celsius)

Transforms the raw integer to a float in degrees Celsius.

def transform(self, value: int) -> constrained_values.Response[float]:
79    def transform(self, value: int) -> Response[float]:
80        celsius = float(value) / 10.0
81        return Response(status=Status.OK, details=DEFAULT_SUCCESS_MESSAGE, value=celsius)

Transform value and return a Response.

Arguments:
  • value (InT): Input value to transform.
Returns:

Response[OutT]: The transformed value with status and details.

class VentilationTemperature(constrained_values.constrained_value_types.RangeValue[float]):
 84class VentilationTemperature(RangeValue[float]):
 85    """
 86    This value object encapsulates the full pipeline of reading and validating
 87    temperature data from Modbus input registers, converting to Celsius, and
 88    enforcing an allowed range.
 89    """
 90    __slots__ = ("_getValueFromRegister",)
 91
 92    def __init__(self, input_register: Response[int], selected_register: int):
 93        object.__setattr__(self, "_getValueFromRegister", GetValueFromRegister(input_register))
 94        super().__init__(selected_register, -10.0, 40.0)
 95
 96    def get_strategies(self) -> List[PipeLineStrategy]:
 97        return [TypeValidationStrategy(int),
 98                AllowedInputRegister(),
 99                self._getValueFromRegister,
100                DetectSensorErrors(),
101                RawToCelsius()] + super().get_strategies()

This value object encapsulates the full pipeline of reading and validating temperature data from Modbus input registers, converting to Celsius, and enforcing an allowed range.

VentilationTemperature( input_register: constrained_values.Response[int], selected_register: int)
92    def __init__(self, input_register: Response[int], selected_register: int):
93        object.__setattr__(self, "_getValueFromRegister", GetValueFromRegister(input_register))
94        super().__init__(selected_register, -10.0, 40.0)
def get_strategies(self) -> List[constrained_values.value.PipeLineStrategy]:
 96    def get_strategies(self) -> List[PipeLineStrategy]:
 97        return [TypeValidationStrategy(int),
 98                AllowedInputRegister(),
 99                self._getValueFromRegister,
100                DetectSensorErrors(),
101                RawToCelsius()] + super().get_strategies()

Return the ordered list of strategies for this pipeline.

Returns:

List[PipeLineStrategy]: Transformation and validation steps in the order to apply.

Inherited Members
constrained_values.constrained_value_types.RangeValue
infer_valid_types_from_value
get_custom_strategies
constrained_values.value.ConstrainedValue
status
details
value
unwrap
ok
def main():
104def main():
105    registers = [215, -32768, 32767, 402]  # Example Modbus register values
106
107    print("=== Valid register 0 ===")
108    v = VentilationTemperature(registers, 0)
109    print(f"status={v.status}, details={v.details}, value={v.value}")  # → 21.5°C
110    # Output # status=Status.OK, details=validation successful, value=21.5
111
112    print("\n=== Invalid: No sensor detected (register 1) ===")
113    v = VentilationTemperature(registers, 1)
114    print(f"status={v.status}, details={v.details}")
115    # Output # status=Status.EXCEPTION, details=No sensor detected
116
117    print("\n=== Invalid: Sensor short circuit (register 2) ===")
118    v = VentilationTemperature(registers, 2)
119    print(f"status={v.status}, details={v.details}")
120    # Output # status=Status.EXCEPTION, details=Sensor short circuit
121
122    print("\n=== Out of range (register 3) ===")
123    v = VentilationTemperature(registers, 3)
124    print(f"status={v.status}, details={v.details}")
125    # Output # status=Status.EXCEPTION, details=Value must be less than or equal to 40.0, got 40.2