| |
| import pytest |
| import logging |
| import hashlib |
| from unittest.mock import patch, MagicMock, ANY |
| import requests |
|
|
| from ankigen_core.utils import ( |
| get_logger, |
| ResponseCache, |
| fetch_webpage_text, |
| setup_logging, |
| ) |
|
|
|
|
| |
|
|
|
|
| def test_get_logger_returns_logger_instance(): |
| """Test that get_logger returns a logging.Logger instance.""" |
| logger = get_logger() |
| assert isinstance(logger, logging.Logger) |
|
|
|
|
| def test_get_logger_is_singleton(): |
| """Test that get_logger returns the same instance when called multiple times.""" |
| logger1 = get_logger() |
| logger2 = get_logger() |
| assert logger1 is logger2 |
|
|
|
|
| def test_setup_logging_configures_handlers(capsys): |
| """Test that setup_logging (called via get_logger) configures handlers |
| and basic logging works. This is a more integrated test. |
| """ |
| |
| |
| |
| from ankigen_core import utils |
|
|
| original_logger_instance = utils._logger_instance |
| utils._logger_instance = None |
|
|
| logger = get_logger() |
|
|
| |
| |
| |
| assert ( |
| len(logger.handlers) >= 1 |
| ) |
|
|
| |
| test_message = "Test INFO message for logging" |
| logger.info(test_message) |
| captured = capsys.readouterr() |
| assert test_message in captured.out |
|
|
| |
| utils._logger_instance = original_logger_instance |
|
|
|
|
| |
|
|
|
|
| def test_response_cache_set_and_get(): |
| """Test basic set and get functionality of ResponseCache.""" |
| cache = ResponseCache(maxsize=2) |
| prompt1 = "What is Python?" |
| model1 = "gpt-test" |
| response1 = {"answer": "A programming language"} |
|
|
| prompt2 = "What is Java?" |
| model2 = "gpt-test" |
| response2 = {"answer": "Another programming language"} |
|
|
| cache.set(prompt1, model1, response1) |
| cache.set(prompt2, model2, response2) |
|
|
| retrieved_response1 = cache.get(prompt1, model1) |
| assert retrieved_response1 == response1 |
|
|
| retrieved_response2 = cache.get(prompt2, model2) |
| assert retrieved_response2 == response2 |
|
|
|
|
| def test_response_cache_get_non_existent(): |
| """Test get returns None for a key not in the cache.""" |
| cache = ResponseCache() |
| retrieved_response = cache.get("NonExistentPrompt", "test-model") |
| assert retrieved_response is None |
|
|
|
|
| def test_response_cache_key_creation_indirectly(): |
| """Test that different prompts or models result in different cache entries.""" |
| cache = ResponseCache(maxsize=5) |
| prompt1 = "Key test prompt 1" |
| model_a = "model-a" |
| model_b = "model-b" |
| response_a = "Response for model A" |
| response_b = "Response for model B" |
|
|
| cache.set(prompt1, model_a, response_a) |
| cache.set(prompt1, model_b, response_b) |
|
|
| assert cache.get(prompt1, model_a) == response_a |
| assert cache.get(prompt1, model_b) == response_b |
| |
| assert cache.get(prompt1, model_a) != response_b |
|
|
|
|
| def test_response_cache_lru_eviction_simple(): |
| """Test basic LRU eviction if maxsize is hit. |
| Focus on the fact that old items might be evicted. |
| """ |
| cache = ResponseCache(maxsize=1) |
| prompt1 = "Prompt One" |
| model1 = "m1" |
| response1 = "Resp One" |
|
|
| prompt2 = "Prompt Two" |
| model2 = "m2" |
| response2 = "Resp Two" |
|
|
| cache.set(prompt1, model1, response1) |
| assert cache.get(prompt1, model1) == response1 |
|
|
| |
| |
| |
| cache.set(prompt2, model2, response2) |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| assert cache.get(prompt2, model2) == response2 |
|
|
| |
| cache_lru = ResponseCache(maxsize=1) |
| cache_lru.set("p1", "m", "r1") |
| cache_lru.set("p2", "m", "r2") |
|
|
| _ = cache_lru.get("p2", "m") |
| retrieved_p1_after_p2_get = cache_lru.get( |
| "p1", "m" |
| ) |
|
|
| |
| |
| |
| assert retrieved_p1_after_p2_get == "r1" |
| |
| |
| |
| |
| |
|
|
|
|
| |
|
|
|
|
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_success(mock_requests_get): |
| """Test successful webpage fetching and text extraction.""" |
| |
| mock_response = MagicMock() |
| mock_response.text = """ |
| <html> |
| <head><title>Test Page</title></head> |
| <body> |
| <header>Ignore this</header> |
| <script>console.log("ignore scripts");</script> |
| <main> |
| <h1>Main Title</h1> |
| <p>This is the first paragraph.</p> |
| <p>Second paragraph with extra spaces.</p> |
| <div>Div content</div> |
| </main> |
| <footer>Ignore footer too</footer> |
| </body> |
| </html> |
| """ |
| mock_response.raise_for_status = MagicMock() |
| mock_requests_get.return_value = mock_response |
|
|
| |
| url = "http://example.com/test" |
| extracted_text = fetch_webpage_text(url) |
|
|
| |
| mock_requests_get.assert_called_once_with( |
| url, |
| headers=pytest.approx( |
| { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
| } |
| ), |
| timeout=15, |
| ) |
| mock_response.raise_for_status.assert_called_once() |
|
|
| |
| expected_lines = [ |
| "Main Title", |
| "This is the first paragraph.", |
| "Second paragraph with extra spaces.", |
| "Div content", |
| ] |
| actual_lines = extracted_text.split("\n") |
|
|
| assert len(actual_lines) == len( |
| expected_lines |
| ), f"Expected {len(expected_lines)} lines, got {len(actual_lines)}" |
|
|
| for i, expected_line in enumerate(expected_lines): |
| assert ( |
| actual_lines[i] == expected_line |
| ), f"Line {i + 1} mismatch: Expected '{expected_line}', Got '{actual_lines[i]}'" |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_network_error(mock_requests_get): |
| """Test handling of network errors during webpage fetching.""" |
| |
| mock_requests_get.side_effect = requests.exceptions.RequestException( |
| "Test Network Error" |
| ) |
|
|
| url = "http://example.com/network-error" |
| |
| with pytest.raises(ConnectionError, match="Test Network Error"): |
| fetch_webpage_text(url) |
|
|
| mock_requests_get.assert_called_once_with( |
| url, |
| headers=pytest.approx( |
| { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
| } |
| ), |
| timeout=15, |
| ) |
|
|
|
|
| |
| @patch("ankigen_core.utils.BeautifulSoup") |
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_parsing_error(mock_requests_get, mock_beautiful_soup): |
| """Test handling of HTML parsing errors (simulated by BeautifulSoup raising error).""" |
| |
| mock_response = MagicMock() |
| mock_response.text = "<html><body>Invalid HTML?</body></html>" |
| mock_response.raise_for_status = MagicMock() |
| mock_requests_get.return_value = mock_response |
|
|
| |
| mock_beautiful_soup.side_effect = Exception("Test Parsing Error") |
|
|
| url = "http://example.com/parsing-error" |
| |
| with pytest.raises(RuntimeError, match="Failed to parse HTML content"): |
| fetch_webpage_text(url) |
|
|
| mock_requests_get.assert_called_once_with( |
| url, |
| headers=pytest.approx( |
| { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
| } |
| ), |
| timeout=15, |
| ) |
| |
| |
| |
| assert mock_beautiful_soup.call_count > 0 |
|
|
|
|
| def test_fetch_webpage_text_empty_content(): |
| """Test handling when the extracted text is empty.""" |
| mock_response = MagicMock() |
| mock_response.text = "<html><body><script>only script</script></body></html>" |
| mock_response.raise_for_status = MagicMock() |
|
|
| with patch("ankigen_core.utils.requests.get", return_value=mock_response): |
| url = "http://example.com/empty" |
| extracted_text = fetch_webpage_text(url) |
| assert extracted_text == "" |
|
|
|
|
| |
| |
| |
|
|
|
|
| |
|
|
|
|
| def test_setup_logging_initialization(): |
| """Test that setup_logging initializes and returns a logger.""" |
| logger = setup_logging() |
| assert isinstance(logger, logging.Logger) |
| assert logger.name == "ankigen" |
| assert len(logger.handlers) == 2 |
| |
| from ankigen_core import utils |
|
|
| utils._logger_instance = None |
|
|
|
|
| def test_setup_logging_singleton(): |
| """Test that setup_logging returns the same logger instance if called again.""" |
| logger1 = setup_logging() |
| logger2 = setup_logging() |
| assert logger1 is logger2 |
| from ankigen_core import utils |
|
|
| utils._logger_instance = None |
|
|
|
|
| def test_get_logger_flow(): |
| """Test get_logger calls setup_logging if no instance exists, else returns existing.""" |
| from ankigen_core import utils |
|
|
| utils._logger_instance = None |
|
|
| |
| logger1 = get_logger() |
| assert utils._logger_instance is not None |
| assert logger1 is utils._logger_instance |
|
|
| |
| logger2 = get_logger() |
| assert logger2 is logger1 |
| utils._logger_instance = None |
|
|
|
|
| |
|
|
|
|
| @pytest.fixture |
| def cache(): |
| return ResponseCache(maxsize=2) |
|
|
|
|
| def test_response_cache_get_miss(cache): |
| retrieved = cache.get("non_existent_prompt", "model") |
| assert retrieved is None |
|
|
|
|
| def test_response_cache_lru_eviction(cache): |
| |
| cache.set("p1", "m1", "r1") |
| cache.set("p2", "m2", "r2") |
|
|
| |
| cache.get("p1", "m1") |
|
|
| |
| |
| |
| cache.set("p3", "m3", "r3") |
|
|
| assert cache.get("p1", "m1") == "r1" |
| assert cache.get("p3", "m3") == "r3" |
|
|
| |
| |
| |
| |
|
|
| |
| |
| cache.get("p1", "m1") |
| cache.get("p2", "m2") |
| cache.get( |
| "p3", "m3" |
| ) |
|
|
| |
| |
| cache_info = cache._lru_cached_get.cache_info() |
| assert cache_info.hits >= 1 |
| assert cache_info.misses >= 1 |
| assert cache_info.currsize == 2 |
|
|
| |
| |
| |
| |
| assert cache.get("p2", "m2") == "r2" |
| |
| |
|
|
|
|
| def test_response_cache_create_key(cache): |
| prompt = "test prompt" |
| model = "test_model" |
| expected_key = hashlib.md5(f"{model}:{prompt}".encode("utf-8")).hexdigest() |
| assert cache._create_key(prompt, model) == expected_key |
|
|
|
|
| |
|
|
|
|
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_success_main_tag(mock_requests_get): |
| mock_response = MagicMock() |
| mock_response.status_code = 200 |
| mock_response.text = "<html><body><main> Main content here. </main></body></html>" |
| mock_requests_get.return_value = mock_response |
|
|
| text = fetch_webpage_text("http://example.com") |
| assert "Main content here." in text |
| mock_requests_get.assert_called_once_with( |
| "http://example.com", headers=ANY, timeout=15 |
| ) |
|
|
|
|
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_success_article_tag(mock_requests_get): |
| mock_response = MagicMock() |
| mock_response.status_code = 200 |
| mock_response.text = ( |
| "<html><body><article> Article content. </article></body></html>" |
| ) |
| mock_requests_get.return_value = mock_response |
| text = fetch_webpage_text("http://example.com") |
| assert "Article content." in text |
|
|
|
|
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_success_body_fallback(mock_requests_get): |
| mock_response = MagicMock() |
| mock_response.status_code = 200 |
| mock_response.text = ( |
| "<html><body> Body content only. <script>junk</script> </body></html>" |
| ) |
| mock_requests_get.return_value = mock_response |
| text = fetch_webpage_text("http://example.com") |
| assert "Body content only." in text |
| assert "junk" not in text |
|
|
|
|
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_no_meaningful_text(mock_requests_get): |
| mock_response = MagicMock() |
| mock_response.status_code = 200 |
| mock_response.text = "<html><body><main></main></body></html>" |
| mock_requests_get.return_value = mock_response |
| text = fetch_webpage_text("http://example.com") |
| assert text == "" |
|
|
|
|
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_http_error(mock_requests_get): |
| mock_response = MagicMock() |
| mock_response.status_code = 404 |
| |
| mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError( |
| "Client Error: Not Found for url", response=mock_response |
| ) |
| mock_requests_get.return_value = mock_response |
| with pytest.raises( |
| ConnectionError, match="Could not fetch URL: Client Error: Not Found for url" |
| ): |
| fetch_webpage_text("http://example.com") |
|
|
|
|
| @patch("ankigen_core.utils.BeautifulSoup") |
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_bs_init_error(mock_requests_get, mock_beautiful_soup): |
| mock_response = MagicMock() |
| mock_response.status_code = 200 |
| mock_response.text = "<html></html>" |
| mock_requests_get.return_value = mock_response |
| mock_beautiful_soup.side_effect = Exception("BS failed") |
|
|
| with pytest.raises( |
| RuntimeError, match="Failed to parse HTML content for http://example.com." |
| ): |
| fetch_webpage_text("http://example.com") |
|
|
|
|
| @patch("ankigen_core.utils.requests.get") |
| def test_fetch_webpage_text_lxml_fallback(mock_requests_get): |
| mock_response = MagicMock() |
| mock_response.status_code = 200 |
| mock_response.text = "<html><body><main>LXML Test</main></body></html>" |
| mock_requests_get.return_value = mock_response |
|
|
| with patch("ankigen_core.utils.BeautifulSoup") as mock_bs_constructor: |
|
|
| def bs_side_effect(text, parser_type): |
| if parser_type == "lxml": |
| raise ImportError("lxml not found") |
| elif parser_type == "html.parser": |
| from bs4 import BeautifulSoup as RealBeautifulSoup |
|
|
| return RealBeautifulSoup(text, "html.parser") |
| raise ValueError(f"Unexpected parser: {parser_type}") |
|
|
| mock_bs_constructor.side_effect = bs_side_effect |
|
|
| logger_instance = get_logger() |
| with patch.object(logger_instance, "warning") as mock_logger_warning: |
| text = fetch_webpage_text("http://example.com/lxmltest") |
| assert "LXML Test" in text |
| mock_logger_warning.assert_any_call( |
| "lxml not found, using html.parser instead." |
| ) |
|
|
| actual_parsers_used = [ |
| call[0][1] for call in mock_bs_constructor.call_args_list |
| ] |
| assert "lxml" in actual_parsers_used |
| assert "html.parser" in actual_parsers_used |
|
|