Skip to content

Commit

Permalink
feat: create data element along with syncing table (pinterest#1193)
Browse files Browse the repository at this point in the history
* feat: create data element along with syncing table

* comment
  • Loading branch information
jczhong84 authored and rohan-sh1 committed Apr 11, 2023
1 parent fd972a0 commit 08575e3
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 16 deletions.
8 changes: 4 additions & 4 deletions querybook/server/const/data_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ class DataElementTuple(NamedTuple):
class DataElementAssociationTuple(NamedTuple):
# association type
type: DataElementAssociationType
# data element name. required for all association types
value: str
# data element tuple or name. required for all association types
value_data_element: Union[DataElementTuple, str]
# for map association, value can either be a data element or a primitive type, e.g. i32
value_primitive_type: str = None
# data element name. required if associtaion type is map
key: str = None
# data element tuple or name. required if associtaion type is map
key_data_element: Union[DataElementTuple, str] = None
# for map association, key can either be a data element or a primitive type, e.g. i32
key_primitive_type: str = None
35 changes: 33 additions & 2 deletions querybook/server/lib/metastore/base_metastore_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import gevent
from app.db import DBSession, with_session
from const.data_element import DataElementTuple, DataElementAssociationTuple
from const.metastore import (
DataColumn,
DataOwnerType,
Expand All @@ -16,6 +17,7 @@
from lib.logger import get_logger
from lib.utils import json
from lib.utils.utils import with_exception
from logic.data_element import create_column_data_element_association
from logic.elasticsearch import delete_es_table_by_id, update_table_by_id
from logic.metastore import (
create_column,
Expand All @@ -33,7 +35,6 @@
iterate_data_schema,
)
from logic.tag import create_column_tags, create_table_tags
from logic.data_element import create_column_data_element_association

from .utils import MetastoreTableACLChecker

Expand Down Expand Up @@ -405,9 +406,15 @@ def _create_table_table(
if self.loader_config.can_load_external_metadata(
MetadataType.DATA_ELEMENT
):
data_element_association = (
self._populate_column_data_element_association(
column.data_element
)
)
create_column_data_element_association(
metastore_id=self.metastore_id,
column_id=column_id,
data_element_association=column.data_element,
data_element_association=data_element_association,
commit=False,
session=session,
)
Expand Down Expand Up @@ -436,6 +443,26 @@ def _create_table_table(
session.rollback()
LOG.error(traceback.format_exc())

def _populate_column_data_element_association(
self, data_element_association: DataElementAssociationTuple
) -> DataElementAssociationTuple:
"""If the value_data_element or key_data_element is a name instead of DataElementTuple,
this function will help to replace the name with the actual DataElementTuple"""
if data_element_association is None:
return None

value_data_element = data_element_association.value_data_element
if type(value_data_element) is str:
value_data_element = self.get_data_element(value_data_element)

key_data_element = data_element_association.key_data_element
if type(key_data_element) is str:
key_data_element = self.get_data_element(key_data_element)

return data_element_association._replace(
value_data_element=value_data_element, key_data_element=key_data_element
)

@with_exception
def _get_all_filtered_schema_names(self) -> List[str]:
return [
Expand Down Expand Up @@ -507,6 +534,10 @@ def get_table_and_columns(
"""
pass

def get_data_element(self, data_element_name: str) -> Optional[DataElementTuple]:
"""Override this to get data element by name"""
pass

@abstractclassmethod
def get_metastore_params_template(self) -> AllFormField:
"""Override this to get the form field required for the metastore
Expand Down
33 changes: 23 additions & 10 deletions querybook/server/logic/data_element.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Union
from app.db import with_session
from const.data_element import (
DataElementAssociationProperty,
Expand Down Expand Up @@ -61,23 +62,32 @@ def create_or_update_data_element(
else:
session.flush()

return data_element


@with_session
def create_data_element_association_by_name(
data_element_name: str,
def create_data_element_association(
metastore_id: int,
data_element_tuple: Union[DataElementTuple, str],
column_id: int,
association_type: DataElementAssociationType,
property_name: DataElementAssociationProperty,
primitive_type: str = None,
session=None,
):
data_element = None
if data_element_name:
data_element = get_data_element_by_name(data_element_name, session=session)
if data_element_tuple is None:
return None

if type(data_element_tuple) is str:
data_element = get_data_element_by_name(data_element_tuple, session=session)
else:
data_element = create_or_update_data_element(
metastore_id, data_element_tuple, session=session
)

if not data_element and not primitive_type:
raise Exception(
f"Can not create DataElementAssociation: {data_element_name} is not a valid data element name and primitive type is empty"
f"Can not create DataElementAssociation: {data_element_tuple} is not a valid data element and primitive type is empty"
)

return DataElementAssociation(
Expand All @@ -91,6 +101,7 @@ def create_data_element_association_by_name(

@with_session
def create_column_data_element_association(
metastore_id: int,
column_id: int,
data_element_association: DataElementAssociationTuple,
commit=True,
Expand All @@ -102,8 +113,9 @@ def create_column_data_element_association(

if data_element_association is not None:
try:
value_association = create_data_element_association_by_name(
data_element_name=data_element_association.value,
value_association = create_data_element_association(
metastore_id=metastore_id,
data_element_tuple=data_element_association.value_data_element,
column_id=column_id,
association_type=data_element_association.type,
property_name=DataElementAssociationProperty.VALUE,
Expand All @@ -112,8 +124,9 @@ def create_column_data_element_association(
)

if data_element_association.type == DataElementAssociationType.MAP:
key_association = create_data_element_association_by_name(
data_element_name=data_element_association.key,
key_association = create_data_element_association(
metastore_id=metastore_id,
data_element_tuple=data_element_association.key_data_element,
column_id=column_id,
association_type=data_element_association.type,
property_name=DataElementAssociationProperty.KEY,
Expand Down

0 comments on commit 08575e3

Please sign in to comment.