| """Module contains the async interface to match needle against haystack in batch.""" | |
| import asyncio | |
| import heapq | |
| from typing import Any, Callable, Dict, List, Union, cast | |
| from pfzy.score import SCORE_INDICES, fzy_scorer | |
| from pfzy.types import HAYSTACKS | |
| async def _rank_task( | |
| scorer: Callable[[str, str], SCORE_INDICES], | |
| needle: str, | |
| haystacks: List[Union[str, Dict[str, Any]]], | |
| key: str, | |
| ) -> List[Dict[str, Any]]: | |
| """Calculate the score for needle against given list of haystacks and rank them. | |
| Args: | |
| scorer: Scorer to be used to do the calculation. | |
| needle: Substring to search. | |
| haystacks: List of dictionary containing the haystack to be searched. | |
| key: The key within the `haystacks` dictionary that contains the actual string value. | |
| Return: | |
| Sorted list of haystacks based on the needle score with additional keys for the `score` | |
| and `indices`. | |
| """ | |
| result = [] | |
| for haystack in haystacks: | |
| score, indices = scorer(needle, cast(Dict, haystack)[key]) | |
| if indices is None: | |
| continue | |
| result.append( | |
| { | |
| "score": score, | |
| "indices": indices, | |
| "haystack": haystack, | |
| } | |
| ) | |
| result.sort(key=lambda x: x["score"], reverse=True) | |
| return result | |
| async def fuzzy_match( | |
| needle: str, | |
| haystacks: HAYSTACKS, | |
| key: str = "", | |
| batch_size: int = 4096, | |
| scorer: Callable[[str, str], SCORE_INDICES] = None, | |
| ) -> List[Dict[str, Any]]: | |
| """Fuzzy find the needle within list of haystacks and get matched results with matching index. | |
| Note: | |
| The `key` argument is optional when the provided `haystacks` argument is a list of :class:`str`. | |
| It will be given a default key `value` if not present. | |
| Warning: | |
| The `key` argument is required when provided `haystacks` argument is a list of :class:`dict`. | |
| If not present, :class:`TypeError` will be raised. | |
| Args: | |
| needle: String to search within the `haystacks`. | |
| haystacks: List of haystack/longer strings to be searched. | |
| key: If `haystacks` is a list of dictionary, provide the key that | |
| can obtain the haystack value to search. | |
| batch_size: Number of entry to be processed together. | |
| scorer (Callable[[str, str], SCORE_indices]): Desired scorer to use. Currently only :func:`~pfzy.score.fzy_scorer` and :func:`~pfzy.score.substr_scorer` is supported. | |
| Raises: | |
| TypeError: When the argument `haystacks` is :class:`list` of :class:`dict` and the `key` argument | |
| is missing, :class:`TypeError` will be raised. | |
| Returns: | |
| List of matching `haystacks` with additional key indices and score. | |
| Examples: | |
| >>> import asyncio | |
| >>> asyncio.run(fuzzy_match("ab", ["acb", "acbabc"])) | |
| [{'value': 'acbabc', 'indices': [3, 4]}, {'value': 'acb', 'indices': [0, 2]}] | |
| """ | |
| if scorer is None: | |
| scorer = fzy_scorer | |
| for index, haystack in enumerate(haystacks): | |
| if not isinstance(haystack, dict): | |
| if not key: | |
| key = "value" | |
| haystacks[index] = {key: haystack} | |
| if not key: | |
| raise TypeError( | |
| f"${fuzzy_match.__name__} missing 1 required argument: 'key', 'key' is required when haystacks is an instance of dict" | |
| ) | |
| batches = await asyncio.gather( | |
| *( | |
| _rank_task( | |
| scorer, | |
| needle, | |
| haystacks[offset : offset + batch_size], | |
| key, | |
| ) | |
| for offset in range(0, len(haystacks), batch_size) | |
| ) | |
| ) | |
| results = heapq.merge(*batches, key=lambda x: x["score"], reverse=True) | |
| choices = [] | |
| for candidate in results: | |
| candidate["haystack"]["indices"] = candidate["indices"] | |
| choices.append(candidate["haystack"]) | |
| return choices | |