| from collections.abc import Mapping |
| from typing import Any |
|
|
| from configs import dify_config |
| from core.file import File |
| from core.variables import ( |
| ArrayAnySegment, |
| ArrayFileSegment, |
| ArrayNumberSegment, |
| ArrayNumberVariable, |
| ArrayObjectSegment, |
| ArrayObjectVariable, |
| ArraySegment, |
| ArrayStringSegment, |
| ArrayStringVariable, |
| FileSegment, |
| FloatSegment, |
| FloatVariable, |
| IntegerSegment, |
| IntegerVariable, |
| NoneSegment, |
| ObjectSegment, |
| ObjectVariable, |
| SecretVariable, |
| Segment, |
| SegmentType, |
| StringSegment, |
| StringVariable, |
| Variable, |
| ) |
| from core.variables.exc import VariableError |
|
|
|
|
| def build_variable_from_mapping(mapping: Mapping[str, Any], /) -> Variable: |
| if (value_type := mapping.get("value_type")) is None: |
| raise VariableError("missing value type") |
| if not mapping.get("name"): |
| raise VariableError("missing name") |
| if (value := mapping.get("value")) is None: |
| raise VariableError("missing value") |
| match value_type: |
| case SegmentType.STRING: |
| result = StringVariable.model_validate(mapping) |
| case SegmentType.SECRET: |
| result = SecretVariable.model_validate(mapping) |
| case SegmentType.NUMBER if isinstance(value, int): |
| result = IntegerVariable.model_validate(mapping) |
| case SegmentType.NUMBER if isinstance(value, float): |
| result = FloatVariable.model_validate(mapping) |
| case SegmentType.NUMBER if not isinstance(value, float | int): |
| raise VariableError(f"invalid number value {value}") |
| case SegmentType.OBJECT if isinstance(value, dict): |
| result = ObjectVariable.model_validate(mapping) |
| case SegmentType.ARRAY_STRING if isinstance(value, list): |
| result = ArrayStringVariable.model_validate(mapping) |
| case SegmentType.ARRAY_NUMBER if isinstance(value, list): |
| result = ArrayNumberVariable.model_validate(mapping) |
| case SegmentType.ARRAY_OBJECT if isinstance(value, list): |
| result = ArrayObjectVariable.model_validate(mapping) |
| case _: |
| raise VariableError(f"not supported value type {value_type}") |
| if result.size > dify_config.MAX_VARIABLE_SIZE: |
| raise VariableError(f"variable size {result.size} exceeds limit {dify_config.MAX_VARIABLE_SIZE}") |
| return result |
|
|
|
|
| def build_segment(value: Any, /) -> Segment: |
| if value is None: |
| return NoneSegment() |
| if isinstance(value, str): |
| return StringSegment(value=value) |
| if isinstance(value, int): |
| return IntegerSegment(value=value) |
| if isinstance(value, float): |
| return FloatSegment(value=value) |
| if isinstance(value, dict): |
| return ObjectSegment(value=value) |
| if isinstance(value, File): |
| return FileSegment(value=value) |
| if isinstance(value, list): |
| items = [build_segment(item) for item in value] |
| types = {item.value_type for item in items} |
| if len(types) != 1 or all(isinstance(item, ArraySegment) for item in items): |
| return ArrayAnySegment(value=value) |
| match types.pop(): |
| case SegmentType.STRING: |
| return ArrayStringSegment(value=value) |
| case SegmentType.NUMBER: |
| return ArrayNumberSegment(value=value) |
| case SegmentType.OBJECT: |
| return ArrayObjectSegment(value=value) |
| case SegmentType.FILE: |
| return ArrayFileSegment(value=value) |
| case SegmentType.NONE: |
| return ArrayAnySegment(value=value) |
| case _: |
| raise ValueError(f"not supported value {value}") |
| raise ValueError(f"not supported value {value}") |
|
|