Use concurrent requests for WMS clients

This commit is contained in:
Jules 2024-06-30 18:16:23 +02:00
parent 3b2e88213e
commit 5b899da28e
Signed by: jdejaegh
GPG key ID: 99D6D184CA66933A
6 changed files with 46 additions and 106 deletions

View file

@ -1,6 +1,6 @@
from .api import IrcelineApiError from .api import IrcelineApiError
from .rio import IrcelineRioClient from .data import RioFeature, ForecastFeature, FeatureValue, RioIfdmFeature
from .forecast import IrcelineForecastClient from .forecast import IrcelineForecastClient
from .data import RioFeature, ForecastFeature, FeatureValue from .rio import IrcelineRioClient, IrcelineRioIfdmClient
__version__ = '2.0.0' __version__ = '2.0.0'

View file

@ -9,7 +9,6 @@ import async_timeout
from aiohttp import ClientResponse from aiohttp import ClientResponse
from .data import IrcelineFeature from .data import IrcelineFeature
from .utils import SizedDict
_rio_wfs_base_url = 'https://geo.irceline.be/wfs' _rio_wfs_base_url = 'https://geo.irceline.be/wfs'
_forecast_wms_base_url = 'https://geo.irceline.be/forecast/wms' _forecast_wms_base_url = 'https://geo.irceline.be/forecast/wms'
@ -22,9 +21,8 @@ class IrcelineApiError(Exception):
class IrcelineBaseClient(ABC): class IrcelineBaseClient(ABC):
def __init__(self, session: aiohttp.ClientSession, cache_size: int = 20) -> None: def __init__(self, session: aiohttp.ClientSession) -> None:
self._session = session self._session = session
self._cache = SizedDict(cache_size)
@abstractmethod @abstractmethod
async def get_data(self, async def get_data(self,

View file

@ -1,3 +1,4 @@
import asyncio
from datetime import date, timedelta, datetime from datetime import date, timedelta, datetime
from itertools import product from itertools import product
from typing import List, Tuple, Dict from typing import List, Tuple, Dict
@ -28,16 +29,27 @@ class IrcelineForecastClient(IrcelineBaseWmsClient):
base_querystring = (self._default_querystring | base_querystring = (self._default_querystring |
{"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"}) {"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"})
for feature, d in product(features, range(4)): tasks = [asyncio.create_task(self._get_single_feature(base_querystring, d, feature, timestamp))
querystring = base_querystring | {"layers": f"{feature}_d{d}", for feature, d in product(features, range(4))]
"query_layers": f"{feature}_d{d}"} results = await asyncio.gather(*tasks)
try:
r: ClientResponse = await self._api_wrapper(self._base_url, querystring) for r in results:
r: dict = await r.json() result |= r
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(
value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'),
timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None)
except (IrcelineApiError, ClientResponseError, IndexError):
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=None)
return result return result
async def _get_single_feature(self, base_querystring: dict, d: int, feature: ForecastFeature,
timestamp: date) -> dict:
result = dict()
querystring = base_querystring | {"layers": f"{feature}_d{d}",
"query_layers": f"{feature}_d{d}"}
try:
r: ClientResponse = await self._api_wrapper(self._base_url, querystring)
r: dict = await r.json()
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(
value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'),
timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None)
except (IrcelineApiError, ClientResponseError, IndexError):
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=None)
return result

View file

@ -1,3 +1,4 @@
import asyncio
from datetime import datetime, date, UTC, timedelta from datetime import datetime, date, UTC, timedelta
from typing import List, Tuple, Dict, Set from typing import List, Tuple, Dict, Set
from xml.etree import ElementTree from xml.etree import ElementTree
@ -152,17 +153,24 @@ class IrcelineRioIfdmClient(IrcelineBaseWmsClient):
lat, lon = position lat, lon = position
base_querystring = (self._default_querystring | base_querystring = (self._default_querystring |
{"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"}) {"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"})
print({"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"})
for feature in features: tasks = [asyncio.create_task(self._get_single_feature(base_querystring, feature)) for feature in features]
querystring = base_querystring | {"layers": f"{feature}", "query_layers": f"{feature}"} results = await asyncio.gather(*tasks)
try:
r: ClientResponse = await self._api_wrapper(self._base_url, querystring) for r in results:
r: dict = await r.json() result |= r
result[feature] = FeatureValue(
value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'),
timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None)
except (IrcelineApiError, ClientResponseError, IndexError):
result[feature] = FeatureValue(value=None, timestamp=None)
return result return result
async def _get_single_feature(self, base_querystring: dict, feature: RioIfdmFeature) -> dict:
result = dict()
querystring = base_querystring | {"layers": f"{feature}", "query_layers": f"{feature}"}
try:
r: ClientResponse = await self._api_wrapper(self._base_url, querystring)
r: dict = await r.json()
result[feature] = FeatureValue(
value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'),
timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None)
except (IrcelineApiError, ClientResponseError, IndexError):
result[feature] = FeatureValue(value=None, timestamp=None)
return result

View file

@ -1,4 +1,3 @@
from collections import OrderedDict
from typing import Tuple from typing import Tuple
from pyproj import Transformer from pyproj import Transformer
@ -6,31 +5,6 @@ from pyproj import Transformer
_project_transform = Transformer.from_crs('EPSG:4326', 'EPSG:31370', always_xy=False) _project_transform = Transformer.from_crs('EPSG:4326', 'EPSG:31370', always_xy=False)
class SizedDict(OrderedDict):
"""Dictionary with a maximum size. When more items are added, the least recently accessed element is evicted"""
def __init__(self, size: int):
super().__init__()
self._size = size
def __setitem__(self, key, value):
super().__setitem__(key, value)
self.move_to_end(key)
if len(self) > self._size:
self.popitem(False)
def __getitem__(self, key):
self.move_to_end(key)
return super().__getitem__(key)
def get(self, __key, __default=None):
self.move_to_end(__key)
return super().get(__key, __default)
def update(self, __m, **kwargs):
raise NotImplementedError()
def epsg_transform(position: Tuple[float, float]) -> Tuple[int, int]: def epsg_transform(position: Tuple[float, float]) -> Tuple[int, int]:
""" """
Convert 'EPSG:4326' coordinates to 'EPSG:31370' coordinates Convert 'EPSG:4326' coordinates to 'EPSG:31370' coordinates
@ -39,15 +13,3 @@ def epsg_transform(position: Tuple[float, float]) -> Tuple[int, int]:
""" """
result = _project_transform.transform(position[0], position[1]) result = _project_transform.transform(position[0], position[1])
return round(result[0]), round(result[1]) return round(result[0]), round(result[1])
def round_coordinates(x: float, y: float, step=.05) -> Tuple[float, float]:
"""
Round the coordinate to the precision given by step
:param x: latitude
:param y: longitude
:param step: precision of the rounding
:return: x and y round to the closest step increment
"""
n = 1 / step
return round(x * n) / n, round(y * n) / n

View file

@ -1,44 +1,4 @@
import pytest from src.open_irceline.utils import epsg_transform
from src.open_irceline.utils import SizedDict, round_coordinates, epsg_transform
def test_sized_dict():
s_dict = SizedDict(5)
assert len(s_dict) == 0
s_dict['a'] = 1
s_dict['b'] = 2
s_dict['c'] = 3
s_dict['d'] = 4
s_dict['e'] = 5
assert len(s_dict) == 5
s_dict['f'] = 6
assert 'a' not in s_dict
assert s_dict['f'] == 6
assert len(s_dict) == 5
s_dict['b'] = 42
s_dict['g'] = 7
assert s_dict.get('f') == 6
assert s_dict['g'] == 7
assert s_dict['b'] == 42
assert 'c' not in s_dict
assert len(s_dict) == 5
del s_dict['b']
assert len(s_dict) == 4
assert 'b' not in s_dict
with pytest.raises(NotImplementedError):
s_dict.update({'a': 1})
def test_round_coord():
x, y = round_coordinates(50.4657, 4.8647)
assert x == 50.45
assert y == 4.85
def test_epsg_transform(): def test_epsg_transform():