Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#6 breakdown pt2 - add common module #17

Merged
merged 3 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions cow_py/common/api/api_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from abc import ABC
from typing import Any, Optional

import httpx

from cow_py.common.api.decorators import rate_limitted, with_backoff
from cow_py.common.config import SupportedChainId

Context = dict[str, Any]


class APIConfig(ABC):
"""Base class for API configuration with common functionality."""

config_map = {}

def __init__(
self, chain_id: SupportedChainId, base_context: Optional[Context] = None
):
self.chain_id = chain_id
self.context = base_context or {}

def get_base_url(self) -> str:
return self.config_map.get(
self.chain_id, "default URL if chain_id is not found"
)

def get_context(self) -> Context:
return {"base_url": self.get_base_url(), **self.context}


class RequestStrategy:
async def make_request(self, client, url, method, **request_kwargs):
headers = {
"accept": "application/json",
"content-type": "application/json",
}

return await client.request(
url=url, headers=headers, method=method, **request_kwargs
)


class ResponseAdapter:
async def adapt_response(self, _response):
raise NotImplementedError()


class RequestBuilder:
def __init__(self, strategy, response_adapter):
self.strategy = strategy
self.response_adapter = response_adapter

async def execute(self, client, url, method, **kwargs):
response = await self.strategy.make_request(client, url, method, **kwargs)
return self.response_adapter.adapt_response(response)


class JsonResponseAdapter(ResponseAdapter):
def adapt_response(self, response):
if response.headers.get("content-type") == "application/json":
return response.json()
else:
return response.text


class ApiBase:
"""Base class for APIs utilizing configuration and request execution."""

def __init__(self, config: APIConfig):
self.config = config

@with_backoff()
@rate_limitted()
async def _fetch(self, path, method="GET", **kwargs):
url = self.config.get_base_url() + path

del kwargs["context_override"]

async with httpx.AsyncClient() as client:
builder = RequestBuilder(
RequestStrategy(),
JsonResponseAdapter(),
)
return await builder.execute(client, url, method, **kwargs)
58 changes: 58 additions & 0 deletions cow_py/common/api/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import backoff
import httpx
from aiolimiter import AsyncLimiter

DEFAULT_LIMITER_OPTIONS = {"rate": 5, "per": 1.0}

DEFAULT_BACKOFF_OPTIONS = {
"max_tries": 10,
"max_time": None,
"jitter": None,
}


def dig(self, *keys):
try:
for key in keys:
self = self[key]
return self
except KeyError:
return None


def with_backoff():
def decorator(func):
async def wrapper(*args, **kwargs):
backoff_opts = dig(kwargs, "context_override", "backoff_opts")

if backoff_opts is None:
internal_backoff_opts = DEFAULT_BACKOFF_OPTIONS
else:
internal_backoff_opts = backoff_opts

@backoff.on_exception(
backoff.expo, httpx.HTTPStatusError, **internal_backoff_opts
)
async def closure():
return await func(*args, **kwargs)

return await closure()

return wrapper

return decorator


def rate_limitted(
rate=DEFAULT_LIMITER_OPTIONS["rate"], per=DEFAULT_LIMITER_OPTIONS["per"]
):
limiter = AsyncLimiter(rate, per)

def decorator(func):
async def wrapper(*args, **kwargs):
async with limiter:
return await func(*args, **kwargs)

return wrapper

return decorator
32 changes: 17 additions & 15 deletions cow_py/common/chains/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,26 @@ class Chain(Enum):
Supported chains and their respective `chainId` for the SDK.
"""

MAINNET = 1
GNOSIS = 100
SEPOLIA = 11155111
MAINNET = (1, "ethereum", "https://etherscan.io")
GNOSIS = (100, "gnosis", "https://gnosisscan.io")
SEPOLIA = (11155111, "sepolia", "https://sepolia.etherscan.io")

def __init__(self, id) -> None:
def __init__(self, id: int, network_name: str, explorer_url: str) -> None:
self.id = id
self.network_name = network_name
self.explorer_url = explorer_url

@property
def name(self) -> str:
return self.network_name

SUPPORTED_CHAINS = {Chain.MAINNET, Chain.GNOSIS, Chain.SEPOLIA}
@property
def explorer(self) -> str:
return self.explorer_url

CHAIN_NAMES = {
Chain.MAINNET: "ethereum",
Chain.GNOSIS: "gnosis",
Chain.SEPOLIA: "sepolia",
}
@property
def chain_id(self) -> int:
return self.id

CHAIN_SCANNER_MAP = {
Chain.MAINNET: "https://etherscan.io",
Chain.GNOSIS: "https://gnosisscan.io",
Chain.SEPOLIA: "https://sepolia.etherscan.io/",
}

SUPPORTED_CHAINS = {chain for chain in Chain}
4 changes: 2 additions & 2 deletions cow_py/common/chains/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from cow_py.common.chains import CHAIN_SCANNER_MAP, Chain
from cow_py.common.chains import Chain


def get_explorer_link(chain: Chain, tx_hash: str) -> str:
"""Return the scan link for the provided transaction hash."""
return f"{CHAIN_SCANNER_MAP[chain]}/tx/{tx_hash}"
return f"{chain.explorer_url}/tx/{tx_hash}"
31 changes: 31 additions & 0 deletions cow_py/common/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,35 @@
from dataclasses import dataclass
from enum import Enum
from typing import Dict, Optional


class SupportedChainId(Enum):
MAINNET = 1
GNOSIS_CHAIN = 100
SEPOLIA = 11155111


class CowEnv(Enum):
PROD = "prod"
STAGING = "staging"


ApiBaseUrls = Dict[SupportedChainId, str]


@dataclass
class ApiContext:
chain_id: SupportedChainId
env: CowEnv
base_urls: Optional[ApiBaseUrls] = None
max_tries: Optional[int] = 5


# Define the list of available environments.
ENVS_LIST = [CowEnv.PROD, CowEnv.STAGING]

# Define the default CoW Protocol API context.
DEFAULT_COW_API_CONTEXT = ApiContext(env=CowEnv.PROD, chain_id=SupportedChainId.MAINNET)


class IPFSConfig(Enum):
Expand Down
5 changes: 3 additions & 2 deletions cow_py/common/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from enum import Enum
from typing import Dict

from .chains import Chain

"""
Expand All @@ -17,14 +18,14 @@ class CowContractAddress(Enum):
EXTENSIBLE_FALLBACK_HANDLER = "0x2f55e8b20D0B9FEFA187AA7d00B6Cbe563605bF5"


def map_address_to_supported_networks(address) -> Dict[Chain, str]:
def map_address_to_supported_networks(address) -> Dict[int, str]:
"""
Maps a given address to all supported networks.

:param address: The address to be mapped.
:return: A dictionary mapping the address to each supported chain.
"""
return {chain_id: address for chain_id in Chain}
return {chain.chain_id: address for chain in Chain}


COW_PROTOCOL_SETTLEMENT_CONTRACT_CHAIN_ADDRESS_MAP = map_address_to_supported_networks(
Expand Down
Empty file removed cow_py/subgraphs/__init__.py
Empty file.
52 changes: 0 additions & 52 deletions cow_py/subgraphs/base/client.py

This file was deleted.

23 changes: 0 additions & 23 deletions cow_py/subgraphs/base/query.py

This file was deleted.

14 changes: 0 additions & 14 deletions cow_py/subgraphs/client.py

This file was deleted.

36 changes: 0 additions & 36 deletions cow_py/subgraphs/deployments.py

This file was deleted.

Loading
Loading