| import json |
| from typing import Iterator, Dict, Any, Iterable, AsyncIterator |
| from itertools import chain |
|
|
| def parse_json_array_stream(line_iterator: Iterable[str]) -> Iterator[Dict[str, Any]]: |
| """ |
| 解析一个由文本行组成的、格式化的(pretty-printed)JSON数组流。 |
| |
| 这个函数是一个生成器,它会为在流中发现的每个第一层级的JSON对象 |
| 产出(yield)一个完整的Python字典。它的设计目标是高内存效率, |
| 因为它会逐行处理流,而不是一次性加载所有内容。 |
| |
| Args: |
| line_iterator: 一个产生响应行的迭代器。例如,`requests.Response.iter_lines()` |
| 解码后的结果。 |
| |
| Yields: |
| 一个从流中解析出的JSON对象的字典。 |
| |
| Raises: |
| ValueError: 如果流看起来不像是以JSON数组开始,或者其格式错误 |
| 导致无法按对象进行解析。 |
| """ |
| |
| buffer = [] |
| brace_level = 0 |
| in_array = False |
|
|
| |
| for line in line_iterator: |
| stripped_line = line.strip() |
| if not stripped_line: |
| continue |
|
|
| if stripped_line.startswith('['): |
| in_array = True |
| |
| line = stripped_line[1:] |
| |
| line_iterator = chain([line], line_iterator) |
| break |
| |
| if not in_array: |
| raise ValueError("数据流不是以一个JSON数组 ( '[' ) 开始。") |
|
|
| |
| in_string = False |
| escape_next = False |
|
|
| for line in line_iterator: |
| for char in line: |
| |
| if escape_next: |
| if brace_level > 0: |
| buffer.append(char) |
| escape_next = False |
| continue |
|
|
| |
| if char == '\\': |
| if brace_level > 0: |
| buffer.append(char) |
| escape_next = True |
| continue |
|
|
| |
| if char == '"' and brace_level > 0: |
| in_string = not in_string |
| buffer.append(char) |
| continue |
|
|
| |
| if not in_string: |
| |
| if char == '{': |
| |
| if brace_level == 0: |
| buffer = [] |
| brace_level += 1 |
|
|
| |
| if brace_level > 0: |
| buffer.append(char) |
|
|
| |
| if char == '}': |
| brace_level -= 1 |
| |
| if brace_level == 0 and buffer: |
| obj_str = "".join(buffer) |
| try: |
| |
| |
| yield json.loads(obj_str, strict=False) |
| except json.JSONDecodeError as e: |
| |
| raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e |
| finally: |
| |
| buffer = [] |
| in_string = False |
| else: |
| |
| if brace_level > 0: |
| buffer.append(char) |
|
|
| |
| if brace_level != 0: |
| print(f"警告: JSON流意外结束,括号层级为 {brace_level},可能数据不完整。") |
|
|
| async def parse_json_array_stream_async(line_iterator: AsyncIterator[str]) -> AsyncIterator[Dict[str, Any]]: |
| """ |
| 异步版本:解析一个由文本行组成的、格式化的(pretty-printed)JSON数组流。 |
| |
| 这个函数是一个异步生成器,它会为在流中发现的每个第一层级的JSON对象 |
| 产出(yield)一个完整的Python字典。它的设计目标是高内存效率, |
| 因为它会逐行处理流,而不是一次性加载所有内容。 |
| |
| Args: |
| line_iterator: 一个产生响应行的异步迭代器。例如,`httpx.Response.aiter_lines()` |
| |
| Yields: |
| 一个从流中解析出的JSON对象的字典。 |
| |
| Raises: |
| ValueError: 如果流看起来不像是以JSON数组开始,或者其格式错误 |
| 导致无法按对象进行解析。 |
| """ |
| |
| buffer = [] |
| brace_level = 0 |
| in_array = False |
|
|
| |
| in_string = False |
| escape_next = False |
|
|
| async for line in line_iterator: |
| stripped_line = line.strip() |
| if not stripped_line: |
| continue |
|
|
| if stripped_line.startswith('['): |
| in_array = True |
| |
| line = stripped_line[1:] |
| |
| for char in line: |
| if escape_next: |
| if brace_level > 0: |
| buffer.append(char) |
| escape_next = False |
| continue |
|
|
| if char == '\\': |
| if brace_level > 0: |
| buffer.append(char) |
| escape_next = True |
| continue |
|
|
| if char == '"' and brace_level > 0: |
| in_string = not in_string |
| buffer.append(char) |
| continue |
|
|
| if not in_string: |
| if char == '{': |
| if brace_level == 0: |
| buffer = [] |
| brace_level += 1 |
|
|
| if brace_level > 0: |
| buffer.append(char) |
|
|
| if char == '}': |
| brace_level -= 1 |
| if brace_level == 0 and buffer: |
| obj_str = "".join(buffer) |
| try: |
| yield json.loads(obj_str, strict=False) |
| except json.JSONDecodeError as e: |
| raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e |
| finally: |
| buffer = [] |
| in_string = False |
| else: |
| if brace_level > 0: |
| buffer.append(char) |
| break |
|
|
| if not in_array: |
| raise ValueError("数据流不是以一个JSON数组 ( '[' ) 开始。") |
|
|
| |
| async for line in line_iterator: |
| for char in line: |
| |
| if escape_next: |
| if brace_level > 0: |
| buffer.append(char) |
| escape_next = False |
| continue |
|
|
| |
| if char == '\\': |
| if brace_level > 0: |
| buffer.append(char) |
| escape_next = True |
| continue |
|
|
| |
| if char == '"' and brace_level > 0: |
| in_string = not in_string |
| buffer.append(char) |
| continue |
|
|
| |
| if not in_string: |
| |
| if char == '{': |
| |
| if brace_level == 0: |
| buffer = [] |
| brace_level += 1 |
|
|
| |
| if brace_level > 0: |
| buffer.append(char) |
|
|
| |
| if char == '}': |
| brace_level -= 1 |
| |
| if brace_level == 0 and buffer: |
| obj_str = "".join(buffer) |
| try: |
| |
| |
| yield json.loads(obj_str, strict=False) |
| except json.JSONDecodeError as e: |
| |
| raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e |
| finally: |
| |
| buffer = [] |
| in_string = False |
| else: |
| |
| if brace_level > 0: |
| buffer.append(char) |
|
|
| |
| if brace_level != 0: |
| print(f"警告: JSON流意外结束,括号层级为 {brace_level},可能数据不完整。") |
|
|
|
|