[docs]classAsyncAnmoku(BaseClient):""" Asynchronous anmoku client. Uses aiohttp for http and `slowstack`_ for rate-limiting. .. _slowstack: https://github.com/TAG-Epic/slowstack """__slots__=("_session","jikan_url","_rate_limiter")def__init__(self,debug:Optional[bool]=False,jikan_url:Optional[str]=None,session:Optional[ClientSession]=None,rate_limits:Optional[Tuple[Tuple[int,int],Tuple[int,int]]]=None)->None:super().__init__(debug)self.jikan_url=jikan_urlor"https://api.jikan.moe/v4"self._session=sessionifrate_limitsisNone:# https://docs.api.jikan.moe/#section/Information/Rate-Limitingrate_limits=((3,3),(60,60))self._rate_limiter=AllRateLimiter({TimesPerRateLimiter(limit,per)for(limit,per)inrate_limits})
[docs]asyncdefget(self,resource:Type[ResourceGenericT],id:SnowflakeT)->ResourceGenericT:"""Get's the exact resource by id."""url=resource._get_endpoint.format(id=id)json_data=awaitself._request(url)returnresource(json_data)
[docs]asyncdefsearch(self,resource:Type[SearchResourceGenericT],query:str)->SearchResult[SearchResourceGenericT]:"""Searches for the resource and returns a list of the results."""url=resource._search_endpointifurlisNone:raiseResourceNotSupportedError(resource,"searching")json_data:SearchResultData[Any]=awaitself._request(url,params={"q":query})returnSearchResult(json_data,resource)