Skip to content

Commit

Permalink
Avoid long-lasting http pools since they leak
Browse files Browse the repository at this point in the history
  • Loading branch information
Zaczero committed Jun 20, 2023
1 parent 6d13284 commit 9b658fc
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 34 deletions.
2 changes: 1 addition & 1 deletion web/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from concurrent.futures import ProcessPoolExecutor
from dataclasses import replace
from typing import Optional, Sequence
from typing import Optional

from authlib.integrations.starlette_client import OAuth
from cachetools import TTLCache
Expand Down
54 changes: 29 additions & 25 deletions web/openstreetmap.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass
from typing import Iterable

import httpx
import xmltodict
from asyncache import cached
from authlib.integrations.httpx_client import OAuth1Auth
Expand All @@ -24,22 +25,24 @@ def __init__(self, *,
username: str = None, password: str = None,
oauth_token: str = None, oauth_token_secret: str = None):
if oauth_token and oauth_token_secret:
auth = OAuth1Auth(
self.auth = OAuth1Auth(
client_id=CONSUMER_KEY,
client_secret=CONSUMER_SECRET,
token=oauth_token,
token_secret=oauth_token_secret,
force_include_body=True)
elif username and password:
auth = (username, password)
self.auth = (username, password)
else:
auth = None
self.auth = None

self.http = get_http_client('https://api.openstreetmap.org/api', auth=auth)
def _get_http_client(self) -> httpx.AsyncClient:
return get_http_client('https://api.openstreetmap.org/api', auth=self.auth)

async def get_changeset_maxsize(self) -> int:
r = await self.http.get('/capabilities')
r.raise_for_status()
async with self._get_http_client() as http:
r = await http.get('/capabilities')
r.raise_for_status()

caps = xmltodict.parse(r.text)

Expand All @@ -65,22 +68,23 @@ async def get_nodes(self, node_ids: list[str | int], *, json: bool = True) -> li

@cached(TTLCache(maxsize=1024, ttl=60))
async def _get_elements(self, elements_type: str, element_ids: Iterable[str], json: bool) -> list[dict]:
r = await self.http.get(f'/0.6/{elements_type}{".json" if json else ""}', params={
elements_type: ','.join(map(str, element_ids))
})
r.raise_for_status()
async with self._get_http_client() as http:
r = await http.get(f'/0.6/{elements_type}{".json" if json else ""}', params={
elements_type: ','.join(map(str, element_ids))})
r.raise_for_status()

if json:
return r.json()['elements']
else:
return ensure_list(xmltodict.parse(r.text)['osm'][elements_type[:-1]])

async def get_authorized_user(self) -> dict | None:
if not self.http.auth:
if self.auth is None:
return None

r = await self.http.get('/0.6/user/details.json')
r.raise_for_status()
async with self._get_http_client() as http:
r = await http.get('/0.6/user/details.json')
r.raise_for_status()

return r.json()['user']

Expand Down Expand Up @@ -112,21 +116,21 @@ async def upload_osm_change(self, osm_change: str, tags: dict[str, str]) -> Uplo

changeset = xmltodict.unparse(changeset_dict)

r = await self.http.put('/0.6/changeset/create', content=changeset, headers={
'Content-Type': 'text/xml; charset=utf-8',
}, follow_redirects=False)
r.raise_for_status()
changeset_id_raw = r.text
changeset_id = int(changeset_id_raw)
async with self._get_http_client() as http:
r = await http.put('/0.6/changeset/create', content=changeset, headers={
'Content-Type': 'text/xml; charset=utf-8'}, follow_redirects=False)
r.raise_for_status()

osm_change = osm_change.replace(CHANGESET_ID_PLACEHOLDER, changeset_id_raw)
changeset_id_raw = r.text
changeset_id = int(changeset_id_raw)

upload_resp = await self.http.post(f'/0.6/changeset/{changeset_id_raw}/upload', content=osm_change, headers={
'Content-Type': 'text/xml; charset=utf-8',
}, timeout=150)
osm_change = osm_change.replace(CHANGESET_ID_PLACEHOLDER, changeset_id_raw)

r = await self.http.put(f'/0.6/changeset/{changeset_id_raw}/close')
r.raise_for_status()
upload_resp = await http.post(f'/0.6/changeset/{changeset_id_raw}/upload', content=osm_change, headers={
'Content-Type': 'text/xml; charset=utf-8'}, timeout=150)

r = await http.put(f'/0.6/changeset/{changeset_id_raw}/close')
r.raise_for_status()

if not upload_resp.is_success:
return UploadResult(ok=False, error_code=upload_resp.status_code, error_message=upload_resp.text, changeset_id=changeset_id)
Expand Down
24 changes: 16 additions & 8 deletions web/overpass.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from math import radians
from typing import Iterable, NamedTuple, Sequence

import httpx
import xmltodict
from asyncache import cached
from cachetools import TTLCache
Expand Down Expand Up @@ -565,16 +566,19 @@ def get_download_triggers(bbc: BoundingBoxCollection, cells: Sequence[Cell], way
return dict(result)


# TODO: check data freshness
class Overpass:
def __init__(self):
self.http = get_http_client(OVERPASS_API_INTERPRETER)
pass

# TODO: check data freshness
def _get_http_client(self) -> httpx.AsyncClient:
return get_http_client(OVERPASS_API_INTERPRETER)

@cached(TTLCache(maxsize=1024, ttl=7200)) # 2 hours
async def _query_relation_history_post(self, session: str, query: str, timeout: float) -> list[list[dict]]:
r = await self.http.post('', data={'data': query}, timeout=timeout * 2)
r.raise_for_status()
async with self._get_http_client() as http:
r = await http.post('', data={'data': query}, timeout=timeout * 2)
r.raise_for_status()

elements: list[dict] = r.json()['elements']
return split_by_count(elements)
Expand Down Expand Up @@ -613,8 +617,10 @@ async def query_relation(self, relation_id: int, download_hist: DownloadHistory
if download_targets is None:
timeout = 60
query = build_bb_query(relation_id, timeout)
r = await self.http.post('', data={'data': query}, timeout=timeout * 2)
r.raise_for_status()

async with self._get_http_client() as http:
r = await http.post('', data={'data': query}, timeout=timeout * 2)
r.raise_for_status()

elements: list[dict] = r.json()['elements']

Expand Down Expand Up @@ -711,8 +717,10 @@ async def query_relation(self, relation_id: int, download_hist: DownloadHistory
async def query_parents(self, way_ids_set: frozenset[int]) -> QueryParentsResult:
timeout = 60
query = build_parents_query(way_ids_set, timeout)
r = await self.http.post('', data={'data': query}, timeout=timeout * 2)
r.raise_for_status()

async with self._get_http_client() as http:
r = await http.post('', data={'data': query}, timeout=timeout * 2)
r.raise_for_status()

data: dict[str, list[dict]] = xmltodict.parse(
r.text,
Expand Down

0 comments on commit 9b658fc

Please sign in to comment.