From ba314f092528ffce6b832e4fa3c71cc3f2c4b3d5 Mon Sep 17 00:00:00 2001 From: Casper Welzel Andersen Date: Wed, 16 Mar 2022 10:38:40 +0100 Subject: [PATCH] Set up using JSON decoders/encoders for `/filter` This is the initial work for the intended implementation of custom JSON decoders/encoders and will act as the basis for a discussion before implementing it throughout the code. --- app/routers/datafilter.py | 82 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 5 deletions(-) diff --git a/app/routers/datafilter.py b/app/routers/datafilter.py index ed3fb17c..89778283 100644 --- a/app/routers/datafilter.py +++ b/app/routers/datafilter.py @@ -1,9 +1,11 @@ """Data Filter.""" +import importlib import json -from typing import Optional +import logging +from typing import TYPE_CHECKING, Optional from aioredis import Redis -from fastapi import APIRouter, Depends, status +from fastapi import APIRouter, Depends, Query, status from fastapi_plugins import depends_redis from oteapi.models import FilterConfig from oteapi.plugins import create_strategy @@ -17,8 +19,14 @@ from app.models.error import HTTPNotFoundError, httpexception_404_item_id_does_not_exist from app.routers.session import _update_session, _update_session_list_item +if TYPE_CHECKING: + from typing import Type + ROUTER = APIRouter(prefix=f"/{IDPREFIX}") +LOGGER = logging.getLogger("app.routers") +LOGGER.setLevel(logging.DEBUG) + @ROUTER.post( "/", @@ -35,7 +43,21 @@ async def create_filter( """Define a new filter configuration (data operation)""" new_filter = CreateFilterResponse() - await cache.set(new_filter.filter_id, config.json()) + json_encoder = None + if hasattr(config, "json_encoder") and config.json_encoder: + json_encoder = getattr( + importlib.import_module(config.json_encoder[0]), + config.json_encoder[2], + None, + ) + if json_encoder is None: + LOGGER.debug( + "Tried to set json_encoder from %s, but failed.", config.json_encoder + ) + else: + LOGGER.debug("Set json_encoder to %s", config.json_encoder) + + await cache.set(new_filter.filter_id, config.json(encoder=json_encoder)) if session_id: if not await cache.exists(session_id): @@ -60,6 +82,15 @@ async def create_filter( async def get_filter( filter_id: str, session_id: Optional[str] = None, + json_decoder: Optional[str] = Query( + None, + description=( + "An importable path to a JSONDecoder. This JSON decoder is for decoding " + "the strategy configuration model only." + ), + regex=r"^[a-zA-Z]+[a-zA-Z0-9_]*(?:\.[a-zA-Z]+[a-zA-Z0-9_]*)*:[a-zA-Z]+[a-zA-Z0-9_]*$", + example="oteapi_plugin.utils.json:JSONDecoder", + ), cache: Redis = Depends(depends_redis), ) -> GetFilterResponse: """Run and return data from a filter (data operation)""" @@ -68,7 +99,23 @@ async def get_filter( if session_id and not await cache.exists(session_id): raise httpexception_404_item_id_does_not_exist(session_id, "session_id") - config = FilterConfig(**json.loads(await cache.get(filter_id))) + json_decoder_cls: "Optional[Type[json.JSONDecoder]]" = None + if json_decoder: + json_decoder_cls = getattr( + importlib.import_module(".".join(json_decoder.split(":")[0])), + json_decoder.split(":")[1], + None, + ) + if json_decoder_cls is None: + LOGGER.debug( + "Tried to set json_decoder_cls from %s, but failed.", json_decoder + ) + else: + LOGGER.debug("Set json_decoder_cls to %s", json_decoder) + + config = FilterConfig( + **json.loads(await cache.get(filter_id), cls=json_decoder_cls) + ) strategy = create_strategy("filter", config) session_data = None if not session_id else json.loads(await cache.get(session_id)) @@ -92,6 +139,15 @@ async def get_filter( async def initialize_filter( filter_id: str, session_id: Optional[str] = None, + json_decoder: Optional[str] = Query( + None, + description=( + "An importable path to a JSONDecoder. This JSON decoder is for decoding " + "the strategy configuration model only." + ), + regex=r"^[a-zA-Z]+[a-zA-Z0-9_]*(?:\.[a-zA-Z]+[a-zA-Z0-9_]*)*:[a-zA-Z]+[a-zA-Z0-9_]*$", + example="oteapi_plugin.utils.json:JSONDecoder", + ), cache: Redis = Depends(depends_redis), ) -> InitializeFilterResponse: """Initialize and return data to update session.""" @@ -100,7 +156,23 @@ async def initialize_filter( if session_id and not await cache.exists(session_id): raise httpexception_404_item_id_does_not_exist(session_id, "session_id") - config = FilterConfig(**json.loads(await cache.get(filter_id))) + json_decoder_cls: "Optional[Type[json.JSONDecoder]]" = None + if json_decoder: + json_decoder_cls = getattr( + importlib.import_module(".".join(json_decoder.split(":")[0])), + json_decoder.split(":")[1], + None, + ) + if json_decoder_cls is None: + LOGGER.debug( + "Tried to set json_decoder_cls from %s, but failed.", json_decoder + ) + else: + LOGGER.debug("Set json_decoder_cls to %s", json_decoder) + + config = FilterConfig( + **json.loads(await cache.get(filter_id), cls=json_decoder_cls) + ) strategy = create_strategy("filter", config) session_data = None if not session_id else json.loads(await cache.get(session_id))