Skip to content

Commit

Permalink
fix: Missing permissions for integration API endpoints (#4530)
Browse files Browse the repository at this point in the history
  • Loading branch information
khvn26 committed Aug 22, 2024
1 parent 968b894 commit cd99a07
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 56 deletions.
96 changes: 44 additions & 52 deletions api/integrations/common/views.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,76 @@
from django.db.models import QuerySet
from django.shortcuts import get_object_or_404
from rest_framework import viewsets
from rest_framework.exceptions import (
NotFound,
PermissionDenied,
ValidationError,
)
from rest_framework.exceptions import ValidationError
from rest_framework.permissions import BasePermission, IsAuthenticated
from rest_framework.request import Request
from rest_framework.serializers import BaseSerializer

from environments.models import Environment
from environments.permissions.constants import VIEW_ENVIRONMENT
from projects.permissions import VIEW_PROJECT
from environments.permissions.permissions import NestedEnvironmentPermissions
from projects.permissions import VIEW_PROJECT, NestedProjectPermissions


class EnvironmentIntegrationCommonViewSet(viewsets.ModelViewSet):
serializer_class = None
pagination_class = None # set here to ensure documentation is correct
model_class = None

def get_queryset(self):
def initial(self, request: Request, *args, **kwargs) -> None:
super().initial(request, *args, **kwargs)
request.environment = get_object_or_404(
Environment,
api_key=self.kwargs["environment_api_key"],
)

def get_queryset(self) -> QuerySet:
if getattr(self, "swagger_fake_view", False):
return self.model_class.objects.none()

environment_api_key = self.kwargs["environment_api_key"]

try:
environment = Environment.objects.get(api_key=environment_api_key)
if not self.request.user.has_environment_permission(
VIEW_ENVIRONMENT, environment
):
raise PermissionDenied(
"User does not have permission to perform action in environment."
)
return self.model_class.objects.filter(environment=self.request.environment)

return self.model_class.objects.filter(environment=environment)
except Environment.DoesNotExist:
raise NotFound("Environment not found.")
def get_permissions(self) -> list[BasePermission]:
return [
IsAuthenticated(),
NestedEnvironmentPermissions(
action_permission_map={"retrieve": VIEW_ENVIRONMENT},
),
]

def perform_create(self, serializer):
environment = self.get_environment_from_request()

if self.model_class.objects.filter(environment=environment).exists():
def perform_create(self, serializer: BaseSerializer) -> None:
if self.get_queryset().exists():
raise ValidationError(
f"{self.model_class.__name__} for environment already exist."
"This integration already exists for this environment."
)
serializer.save(environment=self.request.environment)

serializer.save(environment=environment)

def perform_update(self, serializer):
environment = self.get_environment_from_request()
serializer.save(environment=environment)

def get_environment_from_request(self):
"""
Get environment object from URL parameters in request.
"""
return Environment.objects.get(api_key=self.kwargs["environment_api_key"])
def perform_update(self, serializer: BaseSerializer) -> None:
serializer.save(environment=self.request.environment)


class ProjectIntegrationBaseViewSet(viewsets.ModelViewSet):
serializer_class = None
pagination_class = None
model_class = None

def get_queryset(self):
def get_queryset(self) -> QuerySet:
if getattr(self, "swagger_fake_view", False):
return self.model_class.objects.none()

project = get_object_or_404(
self.request.user.get_permitted_projects(VIEW_PROJECT),
pk=self.kwargs["project_pk"],
)
return self.model_class.objects.filter(project=project)
return self.model_class.objects.filter(project_id=self.kwargs["project_pk"])

def perform_create(self, serializer):
project_id = self.kwargs["project_pk"]
if self.model_class.objects.filter(project_id=project_id).exists():
raise ValidationError(
f"{self.model_class.__name__} for this project already exists."
)
serializer.save(project_id=project_id)
def get_permissions(self) -> list[BasePermission]:
return [
NestedProjectPermissions(
action_permission_map={"retrieve": VIEW_PROJECT},
),
]

def perform_create(self, serializer: BaseSerializer) -> None:
if self.get_queryset().exists():
raise ValidationError("This integration already exists for this project.")
serializer.save(project_id=self.kwargs["project_pk"])

def perform_update(self, serializer):
project_id = self.kwargs["project_pk"]
serializer.save(project_id=project_id)
def perform_update(self, serializer: BaseSerializer) -> None:
serializer.save(project_id=self.kwargs["project_pk"])
17 changes: 13 additions & 4 deletions api/integrations/slack/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.shortcuts import redirect
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.permissions import BasePermission
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from slack_sdk.oauth import AuthorizeUrlGenerator
Expand Down Expand Up @@ -63,26 +64,35 @@ class SlackEnvironmentViewSet(EnvironmentIntegrationCommonViewSet):
pagination_class = None # set here to ensure documentation is correct
model_class = SlackEnvironment

def get_permissions(self) -> list[BasePermission]:
if (action := self.action) in [
"slack_oauth_callback",
"get_temporary_signature",
]:
return []
if action == "slack_oauth_init":
return [OauthInitPermission()]
return super().get_permissions()

@action(detail=False, methods=["GET"], url_path="signature")
def get_temporary_signature(self, request, *args, **kwargs):
return Response({"signature": signer.sign(request.user.id)})

@action(detail=False, methods=["GET"], url_path="callback", permission_classes=[])
@action(detail=False, methods=["GET"], url_path="callback")
def slack_oauth_callback(self, request, environment_api_key):
code = request.GET.get("code")
if not code:
return Response(
"code not found in query params",
status.HTTP_400_BAD_REQUEST,
)
env = self.get_environment_from_request()
validate_state(request.GET.get("state"), request)
bot_token = SlackWrapper().get_bot_token(
code, self._get_slack_callback_url(environment_api_key)
)

SlackConfiguration.objects.update_or_create(
project=env.project, defaults={"api_token": bot_token}
project=request.environment.project, defaults={"api_token": bot_token}
)
return redirect(self._get_front_end_redirect_url())

Expand All @@ -91,7 +101,6 @@ def slack_oauth_callback(self, request, environment_api_key):
methods=["GET"],
url_path="oauth",
authentication_classes=[OauthInitAuthentication],
permission_classes=[OauthInitPermission],
)
def slack_oauth_init(self, request, environment_api_key):
if not settings.SLACK_CLIENT_ID:
Expand Down
3 changes: 3 additions & 0 deletions api/projects/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ def has_permission(self, request, view):
self.action_permission_map[view.action], project
)

if view.action == "create":
return request.user.is_project_admin(project)

return view.detail

def has_object_permission(self, request, view, obj):
Expand Down
24 changes: 24 additions & 0 deletions api/tests/unit/integrations/datadog/test_unit_datadog_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,27 @@ def test_create_datadog_configuration_in_project_with_deleted_configuration(
response_json = response.json()
assert response_json["api_key"] == api_key
assert response_json["base_url"] == base_url


def test_datadog_project_view__no_permissions__return_expected(
test_user_client: APIClient,
project: Project,
) -> None:
# Given
data = {
"base_url": "http://test.com",
"api_key": "abc-123",
"use_custom_source": True,
}
url = reverse("api-v1:projects:integrations-datadog-list", args=[project.id])

# When
response = test_user_client.post(
url,
data=json.dumps(data),
content_type="application/json",
)

# Then
assert response.status_code == status.HTTP_403_FORBIDDEN
assert not DataDogConfiguration.objects.filter(project=project).exists()
27 changes: 27 additions & 0 deletions api/tests/unit/integrations/dynatrace/test_unit_dynatrace_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,30 @@ def test_should_remove_configuration_when_delete(
# Then
assert response.status_code == status.HTTP_204_NO_CONTENT
assert not DynatraceConfiguration.objects.filter(environment=environment).exists()


def test_dynatrace_environment_view__no_permissions__return_expected(
test_user_client: APIClient,
environment: Environment,
) -> None:
# Given
data = {
"base_url": "http://test.com",
"api_key": "abc-123",
"entity_selector": "type(APPLICATION),entityName(docs)",
}
url = reverse(
"api-v1:environments:integrations-dynatrace-list",
args=[environment.api_key],
)

# When
response = test_user_client.post(
url,
data=json.dumps(data),
content_type="application/json",
)

# Then
assert response.status_code == status.HTTP_403_FORBIDDEN
assert not DynatraceConfiguration.objects.filter(environment=environment).exists()

0 comments on commit cd99a07

Please sign in to comment.