From 5b899da28ef48002021245a41e6389b08e431e71 Mon Sep 17 00:00:00 2001 From: Jules Dejaeghere Date: Sun, 30 Jun 2024 18:16:23 +0200 Subject: [PATCH] Use concurrent requests for WMS clients --- src/open_irceline/__init__.py | 4 ++-- src/open_irceline/api.py | 4 +--- src/open_irceline/forecast.py | 34 +++++++++++++++++++--------- src/open_irceline/rio.py | 30 ++++++++++++++++--------- src/open_irceline/utils.py | 38 ------------------------------- tests/test_utils.py | 42 +---------------------------------- 6 files changed, 46 insertions(+), 106 deletions(-) diff --git a/src/open_irceline/__init__.py b/src/open_irceline/__init__.py index d830345..5188fe4 100644 --- a/src/open_irceline/__init__.py +++ b/src/open_irceline/__init__.py @@ -1,6 +1,6 @@ from .api import IrcelineApiError -from .rio import IrcelineRioClient +from .data import RioFeature, ForecastFeature, FeatureValue, RioIfdmFeature from .forecast import IrcelineForecastClient -from .data import RioFeature, ForecastFeature, FeatureValue +from .rio import IrcelineRioClient, IrcelineRioIfdmClient __version__ = '2.0.0' diff --git a/src/open_irceline/api.py b/src/open_irceline/api.py index a89c165..110ce46 100644 --- a/src/open_irceline/api.py +++ b/src/open_irceline/api.py @@ -9,7 +9,6 @@ import async_timeout from aiohttp import ClientResponse from .data import IrcelineFeature -from .utils import SizedDict _rio_wfs_base_url = 'https://geo.irceline.be/wfs' _forecast_wms_base_url = 'https://geo.irceline.be/forecast/wms' @@ -22,9 +21,8 @@ class IrcelineApiError(Exception): class IrcelineBaseClient(ABC): - def __init__(self, session: aiohttp.ClientSession, cache_size: int = 20) -> None: + def __init__(self, session: aiohttp.ClientSession) -> None: self._session = session - self._cache = SizedDict(cache_size) @abstractmethod async def get_data(self, diff --git a/src/open_irceline/forecast.py b/src/open_irceline/forecast.py index c2b5efd..c29168e 100644 --- a/src/open_irceline/forecast.py +++ b/src/open_irceline/forecast.py @@ -1,3 +1,4 @@ +import asyncio from datetime import date, timedelta, datetime from itertools import product from typing import List, Tuple, Dict @@ -28,16 +29,27 @@ class IrcelineForecastClient(IrcelineBaseWmsClient): base_querystring = (self._default_querystring | {"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"}) - for feature, d in product(features, range(4)): - querystring = base_querystring | {"layers": f"{feature}_d{d}", - "query_layers": f"{feature}_d{d}"} - try: - r: ClientResponse = await self._api_wrapper(self._base_url, querystring) - r: dict = await r.json() - result[(feature, timestamp + timedelta(days=d))] = FeatureValue( - value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'), - timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None) - except (IrcelineApiError, ClientResponseError, IndexError): - result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=None) + tasks = [asyncio.create_task(self._get_single_feature(base_querystring, d, feature, timestamp)) + for feature, d in product(features, range(4))] + results = await asyncio.gather(*tasks) + + for r in results: + result |= r return result + + async def _get_single_feature(self, base_querystring: dict, d: int, feature: ForecastFeature, + timestamp: date) -> dict: + result = dict() + + querystring = base_querystring | {"layers": f"{feature}_d{d}", + "query_layers": f"{feature}_d{d}"} + try: + r: ClientResponse = await self._api_wrapper(self._base_url, querystring) + r: dict = await r.json() + result[(feature, timestamp + timedelta(days=d))] = FeatureValue( + value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'), + timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None) + except (IrcelineApiError, ClientResponseError, IndexError): + result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=None) + return result diff --git a/src/open_irceline/rio.py b/src/open_irceline/rio.py index 3cea0b5..42dfd7e 100644 --- a/src/open_irceline/rio.py +++ b/src/open_irceline/rio.py @@ -1,3 +1,4 @@ +import asyncio from datetime import datetime, date, UTC, timedelta from typing import List, Tuple, Dict, Set from xml.etree import ElementTree @@ -152,17 +153,24 @@ class IrcelineRioIfdmClient(IrcelineBaseWmsClient): lat, lon = position base_querystring = (self._default_querystring | {"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"}) - print({"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"}) - for feature in features: - querystring = base_querystring | {"layers": f"{feature}", "query_layers": f"{feature}"} - try: - r: ClientResponse = await self._api_wrapper(self._base_url, querystring) - r: dict = await r.json() - result[feature] = FeatureValue( - value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'), - timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None) - except (IrcelineApiError, ClientResponseError, IndexError): - result[feature] = FeatureValue(value=None, timestamp=None) + tasks = [asyncio.create_task(self._get_single_feature(base_querystring, feature)) for feature in features] + results = await asyncio.gather(*tasks) + + for r in results: + result |= r return result + + async def _get_single_feature(self, base_querystring: dict, feature: RioIfdmFeature) -> dict: + result = dict() + querystring = base_querystring | {"layers": f"{feature}", "query_layers": f"{feature}"} + try: + r: ClientResponse = await self._api_wrapper(self._base_url, querystring) + r: dict = await r.json() + result[feature] = FeatureValue( + value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'), + timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None) + except (IrcelineApiError, ClientResponseError, IndexError): + result[feature] = FeatureValue(value=None, timestamp=None) + return result diff --git a/src/open_irceline/utils.py b/src/open_irceline/utils.py index 65b573a..952bbf9 100644 --- a/src/open_irceline/utils.py +++ b/src/open_irceline/utils.py @@ -1,4 +1,3 @@ -from collections import OrderedDict from typing import Tuple from pyproj import Transformer @@ -6,31 +5,6 @@ from pyproj import Transformer _project_transform = Transformer.from_crs('EPSG:4326', 'EPSG:31370', always_xy=False) -class SizedDict(OrderedDict): - """Dictionary with a maximum size. When more items are added, the least recently accessed element is evicted""" - - def __init__(self, size: int): - super().__init__() - self._size = size - - def __setitem__(self, key, value): - super().__setitem__(key, value) - self.move_to_end(key) - if len(self) > self._size: - self.popitem(False) - - def __getitem__(self, key): - self.move_to_end(key) - return super().__getitem__(key) - - def get(self, __key, __default=None): - self.move_to_end(__key) - return super().get(__key, __default) - - def update(self, __m, **kwargs): - raise NotImplementedError() - - def epsg_transform(position: Tuple[float, float]) -> Tuple[int, int]: """ Convert 'EPSG:4326' coordinates to 'EPSG:31370' coordinates @@ -39,15 +13,3 @@ def epsg_transform(position: Tuple[float, float]) -> Tuple[int, int]: """ result = _project_transform.transform(position[0], position[1]) return round(result[0]), round(result[1]) - - -def round_coordinates(x: float, y: float, step=.05) -> Tuple[float, float]: - """ - Round the coordinate to the precision given by step - :param x: latitude - :param y: longitude - :param step: precision of the rounding - :return: x and y round to the closest step increment - """ - n = 1 / step - return round(x * n) / n, round(y * n) / n diff --git a/tests/test_utils.py b/tests/test_utils.py index 1423bc2..49dff05 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,44 +1,4 @@ -import pytest - -from src.open_irceline.utils import SizedDict, round_coordinates, epsg_transform - - -def test_sized_dict(): - s_dict = SizedDict(5) - assert len(s_dict) == 0 - - s_dict['a'] = 1 - s_dict['b'] = 2 - s_dict['c'] = 3 - s_dict['d'] = 4 - s_dict['e'] = 5 - assert len(s_dict) == 5 - - s_dict['f'] = 6 - assert 'a' not in s_dict - assert s_dict['f'] == 6 - assert len(s_dict) == 5 - - s_dict['b'] = 42 - s_dict['g'] = 7 - assert s_dict.get('f') == 6 - assert s_dict['g'] == 7 - assert s_dict['b'] == 42 - assert 'c' not in s_dict - assert len(s_dict) == 5 - - del s_dict['b'] - assert len(s_dict) == 4 - assert 'b' not in s_dict - - with pytest.raises(NotImplementedError): - s_dict.update({'a': 1}) - - -def test_round_coord(): - x, y = round_coordinates(50.4657, 4.8647) - assert x == 50.45 - assert y == 4.85 +from src.open_irceline.utils import epsg_transform def test_epsg_transform():