[docs]classAnmoku(BaseClient):""" The normal synchronous Anmoku client. Uses requests for http and `slowstack`_ for rate limiting. .. _slowstack: https://github.com/TAG-Epic/slowstack """__slots__=("_session","jikan_url","_second_rate_limiter","_minute_rate_limiter")def__init__(self,debug:Optional[bool]=False,jikan_url:Optional[str]=None,session:Optional[Session]=None,rate_limits:Optional[Tuple[Tuple[int,int],Tuple[int,int]]]=None)->None:super().__init__(debug)self.jikan_url=jikan_urlor"https://api.jikan.moe/v4"self._session=sessionifrate_limitsisNone:# https://docs.api.jikan.moe/#section/Information/Rate-Limitingrate_limits=((3,3),(60,60))second_rate_limits,minute_rate_limits=rate_limitsself._second_rate_limiter=TimesPerRateLimiter(second_rate_limits[0],second_rate_limits[1])self._minute_rate_limiter=TimesPerRateLimiter(minute_rate_limits[0],minute_rate_limits[1])
[docs]defget(self,resource:Type[ResourceGenericT],id:SnowflakeT)->ResourceGenericT:"""Get's the exact resource by id."""url=resource._get_endpoint.format(id=id)json_data=self._request(url)returnresource(json_data)
[docs]defsearch(self,resource:Type[SearchResourceGenericT],query:str)->SearchResult[SearchResourceGenericT]:"""Searches for the resource and returns a list of the results."""url=resource._search_endpointifurlisNone:raiseResourceNotSupportedError(resource,"searching")json_data:SearchResultData[Any]=self._request(url,params={"q":query})returnSearchResult(json_data,resource)
def_request(self,route:str,*,params:Optional[dict[str,Any]]=None,headers:Optional[dict[str,str]]=None)->dict[str,Any]:headers=headersor{}session=self.__get_session()url=self.jikan_url+route# 'AllRateLimiter' doesn't exist yet for the synchronous portion of slowstack so I '.acquire' for both rate-limiters instead.withself._minute_rate_limiter.acquire():withself._second_rate_limiter.acquire():self.logger.debug(f"{Colours.GREEN.apply('GET')} --> {url}")withsession.get(url,params=params,headers=headers)asresp:content=resp.json()ifresp.status_code>400:self._raise_http_error(content,resp.status_code)returncontent