diff --git a/superset-frontend/spec/javascripts/views/CRUD/data/savedquery/SavedQueryList_spec.jsx b/superset-frontend/spec/javascripts/views/CRUD/data/savedquery/SavedQueryList_spec.jsx index daff76b826368..0b32bb0470856 100644 --- a/superset-frontend/spec/javascripts/views/CRUD/data/savedquery/SavedQueryList_spec.jsx +++ b/superset-frontend/spec/javascripts/views/CRUD/data/savedquery/SavedQueryList_spec.jsx @@ -70,7 +70,7 @@ const mockqueries = [...new Array(3)].map((_, i) => ({ })); fetchMock.get(queriesInfoEndpoint, { - permissions: ['can_delete'], + permissions: ['can_write'], }); fetchMock.get(queriesEndpoint, { result: mockqueries, diff --git a/superset-frontend/src/views/CRUD/data/savedquery/SavedQueryList.tsx b/superset-frontend/src/views/CRUD/data/savedquery/SavedQueryList.tsx index 7215565c098f5..49a9627dc7fc4 100644 --- a/superset-frontend/src/views/CRUD/data/savedquery/SavedQueryList.tsx +++ b/superset-frontend/src/views/CRUD/data/savedquery/SavedQueryList.tsx @@ -95,8 +95,8 @@ function SavedQueryList({ setSavedQueryCurrentlyPreviewing, ] = useState(null); - const canEdit = hasPerm('can_edit'); - const canDelete = hasPerm('can_delete'); + const canEdit = hasPerm('can_write'); + const canDelete = hasPerm('can_write'); const openNewQuery = () => { window.open(`${window.location.origin}/superset/sqllab?new=true`); diff --git a/superset/constants.py b/superset/constants.py index 65ef2b13a90ed..167e128676177 100644 --- a/superset/constants.py +++ b/superset/constants.py @@ -63,3 +63,36 @@ class RouteMethod: # pylint: disable=too-few-public-methods CRUD_SET = {ADD, LIST, EDIT, DELETE, ACTION_POST, SHOW} RELATED_VIEW_SET = {ADD, LIST, EDIT, DELETE} REST_MODEL_VIEW_CRUD_SET = {DELETE, GET, GET_LIST, POST, PUT, INFO} + + +MODEL_VIEW_RW_METHOD_PERMISSION_MAP = { + "add": "write", + "api": "read", + "api_column_add": "write", + "api_column_edit": "write", + "api_create": "write", + "api_delete": "write", + "api_get": "read", + "api_read": "read", + "api_readvalues": "read", + "api_update": "write", + "delete": "write", + "download": "read", + "edit": "write", + "list": "read", + "muldelete": "write", + "show": "read", +} + +MODEL_API_RW_METHOD_PERMISSION_MAP = { + "bulk_delete": "write", + "delete": "write", + "distinct": "read", + "export": "read", + "get": "read", + "get_list": "read", + "info": "read", + "post": "write", + "put": "write", + "related": "read", +} diff --git a/superset/migrations/shared/__init__.py b/superset/migrations/shared/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/superset/migrations/shared/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/superset/migrations/shared/security_converge.py b/superset/migrations/shared/security_converge.py new file mode 100644 index 0000000000000..973952653ca13 --- /dev/null +++ b/superset/migrations/shared/security_converge.py @@ -0,0 +1,273 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from dataclasses import dataclass +from typing import Dict, List, Tuple + +from sqlalchemy import ( + Column, + ForeignKey, + Integer, + Sequence, + String, + Table, + UniqueConstraint, +) +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import Load, relationship, Session + +logger = logging.getLogger(__name__) + +Base = declarative_base() + + +@dataclass(frozen=True) +class Pvm: + view: str + permission: str + + +PvmMigrationMapType = Dict[Pvm, Tuple[Pvm, ...]] + +# Partial freeze of the current metadata db schema + + +class Permission(Base): # type: ignore + __tablename__ = "ab_permission" + id = Column(Integer, Sequence("ab_permission_id_seq"), primary_key=True) + name = Column(String(100), unique=True, nullable=False) + + def __repr__(self) -> str: + return f"{self.name}" + + +class ViewMenu(Base): # type: ignore + __tablename__ = "ab_view_menu" + id = Column(Integer, Sequence("ab_view_menu_id_seq"), primary_key=True) + name = Column(String(250), unique=True, nullable=False) + + def __repr__(self) -> str: + return f"{self.name}" + + def __eq__(self, other: object) -> bool: + return (isinstance(other, self.__class__)) and (self.name == other.name) + + def __neq__(self, other: object) -> bool: + return (isinstance(other, self.__class__)) and self.name != other.name + + +assoc_permissionview_role = Table( + "ab_permission_view_role", + Base.metadata, + Column("id", Integer, Sequence("ab_permission_view_role_id_seq"), primary_key=True), + Column("permission_view_id", Integer, ForeignKey("ab_permission_view.id")), + Column("role_id", Integer, ForeignKey("ab_role.id")), + UniqueConstraint("permission_view_id", "role_id"), +) + + +class Role(Base): # type: ignore + __tablename__ = "ab_role" + + id = Column(Integer, Sequence("ab_role_id_seq"), primary_key=True) + name = Column(String(64), unique=True, nullable=False) + permissions = relationship( + "PermissionView", secondary=assoc_permissionview_role, backref="role" + ) + + def __repr__(self) -> str: + return f"{self.name}" + + +class PermissionView(Base): # type: ignore + __tablename__ = "ab_permission_view" + __table_args__ = (UniqueConstraint("permission_id", "view_menu_id"),) + id = Column(Integer, Sequence("ab_permission_view_id_seq"), primary_key=True) + permission_id = Column(Integer, ForeignKey("ab_permission.id")) + permission = relationship("Permission") + view_menu_id = Column(Integer, ForeignKey("ab_view_menu.id")) + view_menu = relationship("ViewMenu") + + def __repr__(self) -> str: + return f"{self.permission} {self.view_menu}" + + +def _add_view_menu(session: Session, view_name: str) -> ViewMenu: + """ + Check and add the new view menu + """ + new_view = session.query(ViewMenu).filter(ViewMenu.name == view_name).one_or_none() + if not new_view: + new_view = ViewMenu(name=view_name) + session.add(new_view) + return new_view + + +def _add_permission(session: Session, permission_name: str) -> Permission: + """ + Check and add the new Permission + """ + new_permission = ( + session.query(Permission) + .filter(Permission.name == permission_name) + .one_or_none() + ) + if not new_permission: + new_permission = Permission(name=permission_name) + session.add(new_permission) + return new_permission + + +def _add_permission_view( + session: Session, permission: Permission, view_menu: ViewMenu +) -> PermissionView: + """ + Check and add the new Permission View + """ + new_pvm = ( + session.query(PermissionView) + .filter( + PermissionView.view_menu_id == view_menu.id, + PermissionView.permission_id == permission.id, + ) + .one_or_none() + ) + if not new_pvm: + new_pvm = PermissionView(view_menu=view_menu, permission=permission) + session.add(new_pvm) + return new_pvm + + +def _find_pvm(session: Session, view_name: str, permission_name: str) -> PermissionView: + return ( + session.query(PermissionView) + .join(Permission) + .join(ViewMenu) + .filter(ViewMenu.name == view_name, Permission.name == permission_name) + ).one_or_none() + + +def add_pvms( + session: Session, pvm_data: Dict[str, Tuple[str, ...]], commit: bool = False +) -> List[PermissionView]: + """ + Checks if exists and adds new Permissions, Views and PermissionView's + """ + pvms = [] + for view_name, permissions in pvm_data.items(): + # Check and add the new View + new_view = _add_view_menu(session, view_name) + for permission_name in permissions: + new_permission = _add_permission(session, permission_name) + # Check and add the new PVM + pvms.append(_add_permission_view(session, new_permission, new_view)) + if commit: + session.commit() + return pvms + + +def _delete_old_permissions( + session: Session, pvm_map: Dict[PermissionView, List[PermissionView]] +) -> None: + """ + Delete old permissions: + - Delete the PermissionView + - Deletes the Permission if it's an orphan now + - Deletes the ViewMenu if it's an orphan now + """ + # Delete old permissions + for old_pvm, new_pvms in pvm_map.items(): + old_permission_name = old_pvm.permission.name + old_view_name = old_pvm.view_menu.name + logger.info(f"Going to delete pvm: {old_pvm}") + session.delete(old_pvm) + pvms_with_permission = ( + session.query(PermissionView) + .join(Permission) + .filter(Permission.name == old_permission_name) + ).first() + if not pvms_with_permission: + logger.info(f"Going to delete permission: {old_pvm.permission}") + session.delete(old_pvm.permission) + pvms_with_view_menu = ( + session.query(PermissionView) + .join(ViewMenu) + .filter(ViewMenu.name == old_view_name) + ).first() + if not pvms_with_view_menu: + logger.info(f"Going to delete view_menu: {old_pvm.view_menu}") + session.delete(old_pvm.view_menu) + + +def migrate_roles( + session: Session, pvm_key_map: PvmMigrationMapType, commit: bool = False, +) -> None: + """ + Migrates all existing roles that have the permissions to be migrated + """ + # Collect a map of PermissionView objects for migration + pvm_map: Dict[PermissionView, List[PermissionView]] = {} + for old_pvm_key, new_pvms_ in pvm_key_map.items(): + old_pvm = _find_pvm(session, old_pvm_key.view, old_pvm_key.permission) + if old_pvm: + for new_pvm_key in new_pvms_: + new_pvm = _find_pvm(session, new_pvm_key.view, new_pvm_key.permission) + if old_pvm not in pvm_map: + pvm_map[old_pvm] = [new_pvm] + else: + pvm_map[old_pvm].append(new_pvm) + + # Replace old permissions by the new ones on all existing roles + roles = session.query(Role).options(Load(Role).joinedload(Role.permissions)).all() + for role in roles: + for old_pvm, new_pvms in pvm_map.items(): + if old_pvm in role.permissions: + logger.info(f"Removing {old_pvm} from {role}") + role.permissions.remove(old_pvm) + for new_pvm in new_pvms: + if new_pvm not in role.permissions: + logger.info(f"Add {new_pvm} to {role}") + role.permissions.append(new_pvm) + session.merge(role) + + # Delete old permissions + _delete_old_permissions(session, pvm_map) + if commit: + session.commit() + + +def get_reversed_new_pvms(pvm_map: PvmMigrationMapType) -> Dict[str, Tuple[str, ...]]: + reversed_pvms: Dict[str, Tuple[str, ...]] = {} + for old_pvm, new_pvms in pvm_map.items(): + if old_pvm.view not in reversed_pvms: + reversed_pvms[old_pvm.view] = (old_pvm.view,) + else: + reversed_pvms[old_pvm.permission] = reversed_pvms[old_pvm.permission] + ( + old_pvm.permission, + ) + return reversed_pvms + + +def get_reversed_pvm_map(pvm_map: PvmMigrationMapType) -> PvmMigrationMapType: + reversed_pvm_map: PvmMigrationMapType = {} + for old_pvm, new_pvms in pvm_map.items(): + for new_pvm in new_pvms: + if new_pvm not in reversed_pvm_map: + reversed_pvm_map[new_pvm] = (old_pvm,) + else: + reversed_pvm_map[new_pvm] = reversed_pvm_map[new_pvm] + (old_pvm,) + return reversed_pvm_map diff --git a/superset/migrations/versions/e38177dbf641_security_converge_saved_queries.py b/superset/migrations/versions/e38177dbf641_security_converge_saved_queries.py new file mode 100644 index 0000000000000..85ce431758dd4 --- /dev/null +++ b/superset/migrations/versions/e38177dbf641_security_converge_saved_queries.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""security converge saved queries + +Revision ID: e38177dbf641 +Revises: a8173232b786 +Create Date: 2020-11-20 14:24:03.643031 + +""" + +# revision identifiers, used by Alembic. +revision = "e38177dbf641" +down_revision = "a8173232b786" + + +from alembic import op +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import Session + +from superset.migrations.shared.security_converge import ( + add_pvms, + get_reversed_new_pvms, + get_reversed_pvm_map, + migrate_roles, + Pvm, +) + +NEW_PVMS = {"SavedQuery": ("can_read", "can_write",)} +PVM_MAP = { + Pvm("SavedQueryView", "can_list"): (Pvm("SavedQuery", "can_read"),), + Pvm("SavedQueryView", "can_show"): (Pvm("SavedQuery", "can_read"),), + Pvm("SavedQueryView", "can_add",): (Pvm("SavedQuery", "can_write"),), + Pvm("SavedQueryView", "can_edit",): (Pvm("SavedQuery", "can_write"),), + Pvm("SavedQueryView", "can_delete",): (Pvm("SavedQuery", "can_write"),), + Pvm("SavedQueryView", "muldelete",): (Pvm("SavedQuery", "can_write"),), + Pvm("SavedQueryView", "can_mulexport",): (Pvm("SavedQuery", "can_read"),), + Pvm("SavedQueryViewApi", "can_show",): (Pvm("SavedQuery", "can_read"),), + Pvm("SavedQueryViewApi", "can_edit",): (Pvm("SavedQuery", "can_write"),), + Pvm("SavedQueryViewApi", "can_list",): (Pvm("SavedQuery", "can_read"),), + Pvm("SavedQueryViewApi", "can_add",): (Pvm("SavedQuery", "can_write"),), + Pvm("SavedQueryViewApi", "muldelete",): (Pvm("SavedQuery", "can_write"),), +} + + +def upgrade(): + bind = op.get_bind() + session = Session(bind=bind) + + # Add the new permissions on the migration itself + add_pvms(session, NEW_PVMS) + migrate_roles(session, PVM_MAP) + try: + session.commit() + except SQLAlchemyError as ex: + print(f"An error occurred while upgrading permissions: {ex}") + session.rollback() + + +def downgrade(): + bind = op.get_bind() + session = Session(bind=bind) + + # Add the old permissions on the migration itself + add_pvms(session, get_reversed_new_pvms(PVM_MAP)) + migrate_roles(session, get_reversed_pvm_map(PVM_MAP)) + try: + session.commit() + except SQLAlchemyError as ex: + print(f"An error occurred while downgrading permissions: {ex}") + session.rollback() + pass diff --git a/superset/queries/saved_queries/api.py b/superset/queries/saved_queries/api.py index cb578976a9d5a..37e82f46d0d67 100644 --- a/superset/queries/saved_queries/api.py +++ b/superset/queries/saved_queries/api.py @@ -25,7 +25,7 @@ from flask_appbuilder.models.sqla.interface import SQLAInterface from flask_babel import ngettext -from superset.constants import RouteMethod +from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod from superset.databases.filters import DatabaseFilter from superset.models.sql_lab import SavedQuery from superset.queries.saved_queries.commands.bulk_delete import ( @@ -60,7 +60,9 @@ class SavedQueryRestApi(BaseSupersetModelRestApi): RouteMethod.DISTINCT, "bulk_delete", # not using RouteMethod since locally defined } - class_permission_name = "SavedQueryView" + class_permission_name = "SavedQuery" + method_permission_name = MODEL_API_RW_METHOD_PERMISSION_MAP + resource_name = "saved_query" allow_browser_login = True diff --git a/superset/views/sql_lab.py b/superset/views/sql_lab.py index c0217687cd579..53a32c63e82c7 100644 --- a/superset/views/sql_lab.py +++ b/superset/views/sql_lab.py @@ -22,7 +22,7 @@ from flask_babel import lazy_gettext as _ from superset import db, is_feature_enabled -from superset.constants import RouteMethod +from superset.constants import MODEL_VIEW_RW_METHOD_PERMISSION_MAP, RouteMethod from superset.models.sql_lab import Query, SavedQuery, TableSchema, TabState from superset.typing import FlaskResponse from superset.utils import core as utils @@ -36,6 +36,8 @@ class SavedQueryView( datamodel = SQLAInterface(SavedQuery) include_route_methods = RouteMethod.CRUD_SET + class_permission_name = "SavedQuery" + method_permission_name = MODEL_VIEW_RW_METHOD_PERMISSION_MAP list_title = _("List Saved Query") show_title = _("Show Saved Query") add_title = _("Add Saved Query") @@ -97,6 +99,10 @@ class SavedQueryViewApi(SavedQueryView): # pylint: disable=too-many-ancestors RouteMethod.API_UPDATE, RouteMethod.API_GET, } + + class_permission_name = "SavedQuery" + method_permission_name = MODEL_VIEW_RW_METHOD_PERMISSION_MAP + list_columns = [ "id", "label", diff --git a/tests/queries/saved_queries/api_tests.py b/tests/queries/saved_queries/api_tests.py index ce55f2413a443..82b82f9bd50e2 100644 --- a/tests/queries/saved_queries/api_tests.py +++ b/tests/queries/saved_queries/api_tests.py @@ -414,10 +414,24 @@ def test_info_saved_query(self): SavedQuery API: Test info """ self.login(username="admin") - uri = f"api/v1/saved_query/_info" + uri = "api/v1/saved_query/_info" rv = self.get_assert_metric(uri, "info") assert rv.status_code == 200 + def test_info_security_saved_query(self): + """ + SavedQuery API: Test info security + """ + self.login(username="admin") + params = {"keys": ["permissions"]} + uri = f"api/v1/saved_query/_info?q={prison.dumps(params)}" + rv = self.get_assert_metric(uri, "info") + data = json.loads(rv.data.decode("utf-8")) + assert rv.status_code == 200 + assert "can_read" in data["permissions"] + assert "can_write" in data["permissions"] + assert len(data["permissions"]) == 2 + def test_related_saved_query(self): """ SavedQuery API: Test related databases diff --git a/tests/security/migrate_roles_tests.py b/tests/security/migrate_roles_tests.py new file mode 100644 index 0000000000000..81f067a48e4c5 --- /dev/null +++ b/tests/security/migrate_roles_tests.py @@ -0,0 +1,237 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Unit tests for alerting in Superset""" +import json +import logging +from unittest.mock import patch + +import pytest +from contextlib2 import contextmanager +from flask_appbuilder.security.sqla.models import Role + +from superset.extensions import db, security_manager +from superset.migrations.shared.security_converge import ( + add_pvms, + migrate_roles, + Pvm, + PvmMigrationMapType, +) +from tests.test_app import app + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@contextmanager +def create_old_role(pvm_map: PvmMigrationMapType, external_pvms): + with app.app_context(): + pvms = [] + for old_pvm, new_pvms in pvm_map.items(): + pvms.append( + security_manager.add_permission_view_menu( + old_pvm.permission, old_pvm.view + ) + ) + for external_pvm in external_pvms: + pvms.append( + security_manager.find_permission_view_menu( + external_pvm.permission, external_pvm.view + ) + ) + + new_role = Role(name="Dummy Role", permissions=pvms) + db.session.add(new_role) + db.session.commit() + + yield new_role + + new_role = ( + db.session.query(Role).filter(Role.name == "Dummy Role").one_or_none() + ) + new_role.permissions = [] + db.session.merge(new_role) + for old_pvm, new_pvms in pvm_map.items(): + security_manager.del_permission_view_menu(old_pvm.permission, old_pvm.view) + for new_pvm in new_pvms: + security_manager.del_permission_view_menu( + new_pvm.permission, new_pvm.view + ) + + db.session.delete(new_role) + db.session.commit() + + +@pytest.mark.parametrize( + "descriptiom, new_pvms, pvm_map, external_pvms, deleted_views, deleted_permissions", + [ + ( + "Many to one readonly", + {"NewDummy": ("can_read",)}, + { + Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), + Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),), + }, + (), + ("DummyView",), + (), + ), + ( + "Many to one with new permission", + {"NewDummy": ("can_new_perm", "can_write")}, + { + Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_new_perm"),), + Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_write"),), + }, + (), + ("DummyView",), + (), + ), + ( + "Many to one with multiple permissions", + {"NewDummy": ("can_read", "can_write",)}, + { + Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), + Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),), + Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), + Pvm("DummyView", "can_delete"): (Pvm("NewDummy", "can_write"),), + }, + (), + ("DummyView",), + (), + ), + ( + "Many to one with multiple views", + {"NewDummy": ("can_read", "can_write",)}, + { + Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), + Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),), + Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), + Pvm("DummyView", "can_delete"): (Pvm("NewDummy", "can_write"),), + Pvm("DummySecondView", "can_list"): (Pvm("NewDummy", "can_read"),), + Pvm("DummySecondView", "can_show"): (Pvm("NewDummy", "can_read"),), + Pvm("DummySecondView", "can_add"): (Pvm("NewDummy", "can_write"),), + Pvm("DummySecondView", "can_delete"): (Pvm("NewDummy", "can_write"),), + }, + (), + ("DummyView", "DummySecondView"), + (), + ), + ( + "Many to one with existing permission-view (pvm)", + {"NewDummy": ("can_read", "can_write",)}, + { + Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), + Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), + }, + (Pvm("UserDBModelView", "can_list"),), + ("DummyView",), + (), + ), + ( + "Many to one with existing multiple permission-view (pvm)", + {"NewDummy": ("can_read", "can_write",)}, + { + Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), + Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), + Pvm("DummySecondView", "can_list"): (Pvm("NewDummy", "can_read"),), + Pvm("DummySecondView", "can_add"): (Pvm("NewDummy", "can_write"),), + }, + (Pvm("UserDBModelView", "can_list"), Pvm("UserDBModelView", "can_add"),), + ("DummyView",), + (), + ), + ( + "Many to one with with old permission that gets deleted", + {"NewDummy": ("can_read", "can_write",)}, + { + Pvm("DummyView", "can_new_perm"): (Pvm("NewDummy", "can_read"),), + Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), + }, + (), + ("DummyView",), + ("can_new_perm",), + ), + ( + "Many to Many (normally should be a downgrade)", + {"DummyView": ("can_list", "can_show", "can_add",)}, + { + Pvm("NewDummy", "can_read"): ( + Pvm("DummyView", "can_list"), + Pvm("DummyView", "can_show"), + ), + Pvm("NewDummy", "can_write"): (Pvm("DummyView", "can_add"),), + }, + (), + ("NewDummy",), + (), + ), + ( + "Many to Many delete old permissions", + {"DummyView": ("can_list", "can_show", "can_add",)}, + { + Pvm("NewDummy", "can_new_perm1"): ( + Pvm("DummyView", "can_list"), + Pvm("DummyView", "can_show"), + ), + Pvm("NewDummy", "can_new_perm2",): (Pvm("DummyView", "can_add"),), + }, + (), + ("NewDummy",), + ("can_new_perm1", "can_new_perm2"), + ), + ], +) +def test_migrate_role( + descriptiom, new_pvms, pvm_map, external_pvms, deleted_views, deleted_permissions +): + """ + Permission migration: generic tests + """ + logger.info(descriptiom) + with create_old_role(pvm_map, external_pvms) as old_role: + role_name = old_role.name + session = db.session + + # Run migrations + add_pvms(session, new_pvms) + migrate_roles(session, pvm_map) + + role = db.session.query(Role).filter(Role.name == role_name).one_or_none() + for old_pvm, new_pvms in pvm_map.items(): + old_pvm_model = security_manager.find_permission_view_menu( + old_pvm.permission, old_pvm.view + ) + assert old_pvm_model is None + new_pvm_model = security_manager.find_permission_view_menu( + new_pvms[0].permission, new_pvms[0].view + ) + assert new_pvm_model is not None + assert new_pvm_model in role.permissions + # assert deleted view menus + for deleted_view in deleted_views: + assert security_manager.find_view_menu(deleted_view) is None + # assert deleted permissions + for deleted_permission in deleted_permissions: + assert security_manager.find_permission(deleted_permission) is None + # assert externals are still there + for external_pvm in external_pvms: + assert ( + security_manager.find_permission_view_menu( + external_pvm.permission, external_pvm.view + ) + is not None + )