| def fetch_and_compare_with_workflow( |
| entry, workflow_steps, arxiv_fetcher, crossref_fetcher, |
| semantic_scholar_fetcher, openalex_fetcher, dblp_fetcher, comparator |
| ): |
| """Fetch metadata from online sources using the configured workflow.""" |
| from src.utils.normalizer import TextNormalizer |
| |
| best_result = None |
| |
| |
| if not workflow_steps: |
| |
| pass |
|
|
| |
| |
| |
| |
| |
| |
| |
| results = [] |
| |
| |
| if dblp_fetcher and entry.title: |
| try: |
| dblp_result = dblp_fetcher.search_by_title(entry.title) |
| if dblp_result: |
| res = comparator.compare_with_dblp(entry, dblp_result) |
| if res.is_match: return res |
| results.append(res) |
| except Exception: pass |
|
|
| |
| if semantic_scholar_fetcher and entry.title: |
| try: |
| ss_result = None |
| if entry.doi: |
| ss_result = semantic_scholar_fetcher.fetch_by_doi(entry.doi) |
| if not ss_result: |
| ss_result = semantic_scholar_fetcher.search_by_title(entry.title) |
| |
| if ss_result: |
| res = comparator.compare_with_semantic_scholar(entry, ss_result) |
| if res.is_match: return res |
| results.append(res) |
| except Exception: pass |
|
|
| |
| if openalex_fetcher and entry.title: |
| try: |
| oa_result = None |
| if entry.doi: |
| oa_result = openalex_fetcher.fetch_by_doi(entry.doi) |
| if not oa_result: |
| oa_result = openalex_fetcher.search_by_title(entry.title) |
| |
| if oa_result: |
| res = comparator.compare_with_openalex(entry, oa_result) |
| if res.is_match: return res |
| results.append(res) |
| except Exception: pass |
| |
| |
| if crossref_fetcher and entry.doi: |
| try: |
| crossref_result = crossref_fetcher.search_by_doi(entry.doi) |
| if crossref_result: |
| res = comparator.compare_with_crossref(entry, crossref_result) |
| if res.is_match: return res |
| results.append(res) |
| except Exception: pass |
| |
| |
| if arxiv_fetcher: |
| try: |
| arxiv_meta = None |
| if entry.has_arxiv: |
| arxiv_meta = arxiv_fetcher.fetch_by_id(entry.arxiv_id) |
| elif entry.title: |
| |
| search_results = arxiv_fetcher.search_by_title(entry.title, max_results=1) |
| if search_results: |
| arxiv_meta = search_results[0] |
| |
| if arxiv_meta: |
| res = comparator.compare_with_arxiv(entry, arxiv_meta) |
| if res.is_match: return res |
| results.append(res) |
| except Exception: pass |
| |
| |
| if results: |
| results.sort(key=lambda x: x.confidence, reverse=True) |
| return results[0] |
| |
| |
| return comparator.create_unable_result(entry, "No metadata found in any source") |
|
|