Skip to content

Commit

Permalink
fix(deps): Migrate MFA code to our codebase and bump djangorestframew…
Browse files Browse the repository at this point in the history
…ork (#3988)
  • Loading branch information
gagantrivedi committed Jun 7, 2024
1 parent 6e77054 commit e217df7
Show file tree
Hide file tree
Showing 39 changed files with 1,163 additions and 489 deletions.
4 changes: 2 additions & 2 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
"api_keys",
"features.feature_external_resources",
# 2FA
"trench",
"custom_auth.mfa.trench",
# health check plugins
"health_check",
"health_check.db",
Expand Down Expand Up @@ -750,7 +750,6 @@
}

TRENCH_AUTH = {
"FROM_EMAIL": DEFAULT_FROM_EMAIL,
"BACKUP_CODES_QUANTITY": 5,
"BACKUP_CODES_LENGTH": 10, # keep (quantity * length) under 200
"BACKUP_CODES_CHARACTERS": (
Expand All @@ -759,6 +758,7 @@
"DEFAULT_VALIDITY_PERIOD": 30,
"CONFIRM_BACKUP_CODES_REGENERATION_WITH_CODE": True,
"APPLICATION_ISSUER_NAME": "app.bullet-train.io",
"ENCRYPT_BACKUP_CODES": True,
"MFA_METHODS": {
"app": {
"VERBOSE_NAME": "TOTP App",
Expand Down
30 changes: 25 additions & 5 deletions api/custom_auth/mfa/backends/application.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
from trench.backends.application import ApplicationBackend
from typing import Any, Dict

from django.conf import settings
from pyotp import TOTP
from rest_framework.response import Response

from custom_auth.mfa.trench.models import MFAMethod


class CustomApplicationBackend:
def __init__(self, mfa_method: MFAMethod, config: Dict[str, Any]) -> None:
self._mfa_method = mfa_method
self._config = config
self._totp = TOTP(self._mfa_method.secret)

class CustomApplicationBackend(ApplicationBackend):
def dispatch_message(self):
original_message = super(CustomApplicationBackend, self).dispatch_message()
data = {**original_message, "secret": self.obj.secret}
return data
qr_link = self._totp.provisioning_uri(
self._mfa_method.user.email, settings.TRENCH_AUTH["APPLICATION_ISSUER_NAME"]
)
data = {
"qr_link": qr_link,
"secret": self._mfa_method.secret,
}
return Response(data)

def validate_code(self, code: str) -> bool:
validity_period = settings.TRENCH_AUTH["MFA_METHODS"]["app"]["VALIDITY_PERIOD"]
return self._totp.verify(otp=code, valid_window=int(validity_period / 20))
Empty file.
8 changes: 8 additions & 0 deletions api/custom_auth/mfa/trench/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.contrib import admin

from custom_auth.mfa.trench.models import MFAMethod


@admin.register(MFAMethod)
class MFAMethodAdmin(admin.ModelAdmin):
pass
6 changes: 6 additions & 0 deletions api/custom_auth/mfa/trench/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class TrenchConfig(AppConfig):
name = "custom_auth.mfa.trench"
verbose_name = "django-trench"
Empty file.
Empty file.
37 changes: 37 additions & 0 deletions api/custom_auth/mfa/trench/command/activate_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Callable, Set, Type

from custom_auth.mfa.trench.command.generate_backup_codes import (
generate_backup_codes_command,
)
from custom_auth.mfa.trench.command.replace_mfa_method_backup_codes import (
regenerate_backup_codes_for_mfa_method_command,
)
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class ActivateMFAMethodCommand:
def __init__(
self, mfa_model: Type[MFAMethod], backup_codes_generator: Callable
) -> None:
self._mfa_model = mfa_model
self._backup_codes_generator = backup_codes_generator

def execute(self, user_id: int, name: str, code: str) -> Set[str]:
self._mfa_model.objects.filter(user_id=user_id, name=name).update(
is_active=True,
is_primary=not self._mfa_model.objects.primary_exists(user_id=user_id),
)

backup_codes = regenerate_backup_codes_for_mfa_method_command(
user_id=user_id,
name=name,
)

return backup_codes


activate_mfa_method_command = ActivateMFAMethodCommand(
mfa_model=get_mfa_model(),
backup_codes_generator=generate_backup_codes_command,
).execute
36 changes: 36 additions & 0 deletions api/custom_auth/mfa/trench/command/authenticate_second_factor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from custom_auth.mfa.trench.command.remove_backup_code import (
remove_backup_code_command,
)
from custom_auth.mfa.trench.command.validate_backup_code import (
validate_backup_code_command,
)
from custom_auth.mfa.trench.exceptions import (
InvalidCodeError,
InvalidTokenError,
)
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_handler, user_token_generator
from users.models import FFAdminUser


def is_authenticated(user_id: int, code: str) -> None:
for auth_method in MFAMethod.objects.list_active(user_id=user_id):
validated_backup_code = validate_backup_code_command(
value=code, backup_codes=auth_method.backup_codes
)
if get_mfa_handler(mfa_method=auth_method).validate_code(code=code):
return
if validated_backup_code:
remove_backup_code_command(
user_id=auth_method.user_id, method_name=auth_method.name, code=code
)
return
raise InvalidCodeError()


def authenticate_second_step_command(code: str, ephemeral_token: str) -> FFAdminUser:
user = user_token_generator.check_token(user=None, token=ephemeral_token)
if user is None:
raise InvalidTokenError()
is_authenticated(user_id=user.id, code=code)
return user
30 changes: 30 additions & 0 deletions api/custom_auth/mfa/trench/command/create_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Callable, Type

from custom_auth.mfa.trench.command.create_secret import create_secret_command
from custom_auth.mfa.trench.exceptions import MFAMethodAlreadyActiveError
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class CreateMFAMethodCommand:
def __init__(self, secret_generator: Callable, mfa_model: Type[MFAMethod]) -> None:
self._mfa_model = mfa_model
self._create_secret = secret_generator

def execute(self, user_id: int, name: str) -> MFAMethod:
mfa, created = self._mfa_model.objects.get_or_create(
user_id=user_id,
name=name,
defaults={
"secret": self._create_secret,
"is_active": False,
},
)
if not created and mfa.is_active:
raise MFAMethodAlreadyActiveError()
return mfa


create_mfa_method_command = CreateMFAMethodCommand(
secret_generator=create_secret_command, mfa_model=get_mfa_model()
).execute
7 changes: 7 additions & 0 deletions api/custom_auth/mfa/trench/command/create_secret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from django.conf import settings
from pyotp import random_base32


def create_secret_command() -> str:
generator = random_base32
return generator(length=settings.TRENCH_AUTH["SECRET_KEY_LENGTH"])
27 changes: 27 additions & 0 deletions api/custom_auth/mfa/trench/command/deactivate_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Type

from django.db.transaction import atomic

from custom_auth.mfa.trench.exceptions import MFANotEnabledError
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class DeactivateMFAMethodCommand:
def __init__(self, mfa_model: Type[MFAMethod]) -> None:
self._mfa_model = mfa_model

@atomic
def execute(self, mfa_method_name: str, user_id: int) -> None:
mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=mfa_method_name)
if not mfa.is_active:
raise MFANotEnabledError()

self._mfa_model.objects.filter(user_id=user_id, name=mfa_method_name).update(
is_active=False, is_primary=False
)


deactivate_mfa_method_command = DeactivateMFAMethodCommand(
mfa_model=get_mfa_model()
).execute
38 changes: 38 additions & 0 deletions api/custom_auth/mfa/trench/command/generate_backup_codes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Callable, Set

from django.conf import settings
from django.utils.crypto import get_random_string


class GenerateBackupCodesCommand:
def __init__(self, random_string_generator: Callable) -> None:
self._random_string_generator = random_string_generator

def execute(
self,
quantity: int = settings.TRENCH_AUTH["BACKUP_CODES_QUANTITY"],
length: int = settings.TRENCH_AUTH["BACKUP_CODES_LENGTH"],
allowed_chars: str = settings.TRENCH_AUTH["BACKUP_CODES_CHARACTERS"],
) -> Set[str]:
"""
Generates random encrypted backup codes.
:param quantity: How many codes should be generated
:type quantity: int
:param length: How long codes should be
:type length: int
:param allowed_chars: Characters to create backup codes from
:type allowed_chars: str
:returns: Encrypted backup codes
:rtype: set[str]
"""
return {
self._random_string_generator(length, allowed_chars)
for _ in range(quantity)
}


generate_backup_codes_command = GenerateBackupCodesCommand(
random_string_generator=get_random_string,
).execute
29 changes: 29 additions & 0 deletions api/custom_auth/mfa/trench/command/remove_backup_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Any, Optional, Set

from django.contrib.auth.hashers import check_password

from custom_auth.mfa.trench.models import MFAMethod


def remove_backup_code_command(user_id: Any, method_name: str, code: str) -> None:
serialized_codes = (
MFAMethod.objects.filter(user_id=user_id, name=method_name)
.values_list("_backup_codes", flat=True)
.first()
)
codes = MFAMethod._BACKUP_CODES_DELIMITER.join(
_remove_code_from_set(
backup_codes=set(serialized_codes.split(MFAMethod._BACKUP_CODES_DELIMITER)),
code=code,
)
)
MFAMethod.objects.filter(user_id=user_id, name=method_name).update(
_backup_codes=codes
)


def _remove_code_from_set(backup_codes: Set[str], code: str) -> Optional[Set[str]]:
for backup_code in backup_codes:
if check_password(code, backup_code):
backup_codes.remove(backup_code)
return backup_codes
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Callable, Set, Type

from django.contrib.auth.hashers import make_password

from custom_auth.mfa.trench.command.generate_backup_codes import (
generate_backup_codes_command,
)
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class RegenerateBackupCodesForMFAMethodCommand:
def __init__(
self,
mfa_model: Type[MFAMethod],
code_hasher: Callable,
codes_generator: Callable,
) -> None:
self._mfa_model = mfa_model
self._code_hasher = code_hasher
self._codes_generator = codes_generator

def execute(self, user_id: int, name: str) -> Set[str]:
backup_codes = self._codes_generator()
self._mfa_model.objects.filter(user_id=user_id, name=name).update(
_backup_codes=MFAMethod._BACKUP_CODES_DELIMITER.join(
[self._code_hasher(backup_code) for backup_code in backup_codes]
),
)
return backup_codes


regenerate_backup_codes_for_mfa_method_command = (
RegenerateBackupCodesForMFAMethodCommand(
mfa_model=get_mfa_model(),
code_hasher=make_password,
codes_generator=generate_backup_codes_command,
).execute
)
10 changes: 10 additions & 0 deletions api/custom_auth/mfa/trench/command/validate_backup_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Iterable, Optional

from django.contrib.auth.hashers import check_password


def validate_backup_code_command(value: str, backup_codes: Iterable) -> Optional[str]:
for backup_code in backup_codes:
if check_password(value, backup_code):
return backup_code
return None
46 changes: 46 additions & 0 deletions api/custom_auth/mfa/trench/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from django.utils.translation import gettext_lazy as _
from rest_framework.serializers import ValidationError


class MFAValidationError(ValidationError):
def __str__(self) -> str:
return ", ".join(detail for detail in self.detail)


class CodeInvalidOrExpiredError(MFAValidationError):
def __init__(self) -> None:
super().__init__(
detail=_("Code invalid or expired."),
code="code_invalid_or_expired",
)


class MFAMethodDoesNotExistError(MFAValidationError):
def __init__(self) -> None:
super().__init__(
detail=_("Requested MFA method does not exist."),
code="mfa_method_does_not_exist",
)


class MFAMethodAlreadyActiveError(MFAValidationError):
def __init__(self) -> None:
super().__init__(
detail=_("MFA method already active."),
code="method_already_active",
)


class MFANotEnabledError(MFAValidationError):
def __init__(self) -> None:
super().__init__(detail=_("2FA is not enabled."), code="not_enabled")


class InvalidTokenError(MFAValidationError):
def __init__(self) -> None:
super().__init__(detail=_("Invalid or expired token."), code="invalid_token")


class InvalidCodeError(MFAValidationError):
def __init__(self) -> None:
super().__init__(detail=_("Invalid or expired code."), code="invalid_code")
Loading

0 comments on commit e217df7

Please sign in to comment.