Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up using custom JSON decoders/encoders #96

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 77 additions & 5 deletions app/routers/datafilter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Data Filter."""
import importlib
import json
from typing import Optional
import logging
from typing import TYPE_CHECKING, Optional

from aioredis import Redis
from fastapi import APIRouter, Depends, status
from fastapi import APIRouter, Depends, Query, status
from fastapi_plugins import depends_redis
from oteapi.models import FilterConfig
from oteapi.plugins import create_strategy
Expand All @@ -17,8 +19,14 @@
from app.models.error import HTTPNotFoundError, httpexception_404_item_id_does_not_exist
from app.routers.session import _update_session, _update_session_list_item

if TYPE_CHECKING:
from typing import Type

ROUTER = APIRouter(prefix=f"/{IDPREFIX}")

LOGGER = logging.getLogger("app.routers")
LOGGER.setLevel(logging.DEBUG)


@ROUTER.post(
"/",
Expand All @@ -35,7 +43,21 @@ async def create_filter(
"""Define a new filter configuration (data operation)"""
new_filter = CreateFilterResponse()

await cache.set(new_filter.filter_id, config.json())
json_encoder = None
if hasattr(config, "json_encoder") and config.json_encoder:
json_encoder = getattr(
importlib.import_module(config.json_encoder[0]),
config.json_encoder[2],
None,
)
if json_encoder is None:
LOGGER.debug(
"Tried to set json_encoder from %s, but failed.", config.json_encoder
)
else:
LOGGER.debug("Set json_encoder to %s", config.json_encoder)

await cache.set(new_filter.filter_id, config.json(encoder=json_encoder))

if session_id:
if not await cache.exists(session_id):
Expand All @@ -60,6 +82,15 @@ async def create_filter(
async def get_filter(
filter_id: str,
session_id: Optional[str] = None,
json_decoder: Optional[str] = Query(
None,
description=(
"An importable path to a JSONDecoder. This JSON decoder is for decoding "
"the strategy configuration model only."
),
regex=r"^[a-zA-Z]+[a-zA-Z0-9_]*(?:\.[a-zA-Z]+[a-zA-Z0-9_]*)*:[a-zA-Z]+[a-zA-Z0-9_]*$",
example="oteapi_plugin.utils.json:JSONDecoder",
),
cache: Redis = Depends(depends_redis),
) -> GetFilterResponse:
"""Run and return data from a filter (data operation)"""
Expand All @@ -68,7 +99,23 @@ async def get_filter(
if session_id and not await cache.exists(session_id):
raise httpexception_404_item_id_does_not_exist(session_id, "session_id")

config = FilterConfig(**json.loads(await cache.get(filter_id)))
json_decoder_cls: "Optional[Type[json.JSONDecoder]]" = None
if json_decoder:
json_decoder_cls = getattr(
importlib.import_module(".".join(json_decoder.split(":")[0])),
json_decoder.split(":")[1],
None,
)
if json_decoder_cls is None:
LOGGER.debug(
"Tried to set json_decoder_cls from %s, but failed.", json_decoder
)
else:
LOGGER.debug("Set json_decoder_cls to %s", json_decoder)

config = FilterConfig(
**json.loads(await cache.get(filter_id), cls=json_decoder_cls)
)

strategy = create_strategy("filter", config)
session_data = None if not session_id else json.loads(await cache.get(session_id))
Expand All @@ -92,6 +139,15 @@ async def get_filter(
async def initialize_filter(
filter_id: str,
session_id: Optional[str] = None,
json_decoder: Optional[str] = Query(
None,
description=(
"An importable path to a JSONDecoder. This JSON decoder is for decoding "
"the strategy configuration model only."
),
regex=r"^[a-zA-Z]+[a-zA-Z0-9_]*(?:\.[a-zA-Z]+[a-zA-Z0-9_]*)*:[a-zA-Z]+[a-zA-Z0-9_]*$",
example="oteapi_plugin.utils.json:JSONDecoder",
),
cache: Redis = Depends(depends_redis),
) -> InitializeFilterResponse:
"""Initialize and return data to update session."""
Expand All @@ -100,7 +156,23 @@ async def initialize_filter(
if session_id and not await cache.exists(session_id):
raise httpexception_404_item_id_does_not_exist(session_id, "session_id")

config = FilterConfig(**json.loads(await cache.get(filter_id)))
json_decoder_cls: "Optional[Type[json.JSONDecoder]]" = None
if json_decoder:
json_decoder_cls = getattr(
importlib.import_module(".".join(json_decoder.split(":")[0])),
json_decoder.split(":")[1],
None,
)
if json_decoder_cls is None:
LOGGER.debug(
"Tried to set json_decoder_cls from %s, but failed.", json_decoder
)
else:
LOGGER.debug("Set json_decoder_cls to %s", json_decoder)

config = FilterConfig(
**json.loads(await cache.get(filter_id), cls=json_decoder_cls)
)

strategy = create_strategy("filter", config)
session_data = None if not session_id else json.loads(await cache.get(session_id))
Expand Down