Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inheriting ephemeral models' deps for lineage. #1677

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 51 additions & 21 deletions elementary/monitor/fetchers/lineage/lineage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import List
from typing import Dict, List, Optional, Set

from elementary.clients.fetcher.fetcher import FetcherClient
from elementary.monitor.api.lineage.schema import NodeDependsOnNodesSchema
Expand All @@ -15,31 +15,61 @@ class LineageFetcher(FetcherClient):
def get_nodes_depends_on_nodes(
self, exclude_elementary_models: bool = False
) -> List[NodeDependsOnNodesSchema]:
nodes_depends_on_nodes = []
nodes_depends_on_nodes_results = self.dbt_runner.run_operation(
results = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_nodes_depends_on_nodes",
macro_args={"exclude_elementary": exclude_elementary_models},
)
if nodes_depends_on_nodes_results:
for node_depends_on_nodes_result in json.loads(
nodes_depends_on_nodes_results[0]
):
nodes_depends_on_nodes.append(
NodeDependsOnNodesSchema(
unique_id=node_depends_on_nodes_result.get("unique_id"),
depends_on_nodes=json.loads(
node_depends_on_nodes_result.get("depends_on_nodes")
)
if node_depends_on_nodes_result.get("depends_on_nodes")
else None,
type=node_depends_on_nodes_result.get("type"),
sub_type=self.get_node_sub_type(node_depends_on_nodes_result),
)
)
return nodes_depends_on_nodes

nodes = json.loads(results[0]) if results else []
nodes = [self._normalize_result_dict(result) for result in nodes]
id_to_node_map = {node["unique_id"]: node for node in nodes}
return [
NodeDependsOnNodesSchema(
unique_id=node["unique_id"],
depends_on_nodes=list(self._resolve_node_deps(node, id_to_node_map)),
type=node["type"],
sub_type=self.get_node_sub_type(node),
)
for node in nodes
]

@staticmethod
def get_node_sub_type(node_depends_on_nodes_result: dict):
materialization = node_depends_on_nodes_result.get("materialization")
materialization = node_depends_on_nodes_result["materialization"]
if materialization:
return _MATERIALIZATION_TO_SUB_TYPE.get(materialization)

@staticmethod
def _normalize_result_dict(result_dict: dict) -> dict:
return {
**result_dict,
"depends_on_nodes": (
json.loads(result_dict["depends_on_nodes"])
if result_dict["depends_on_nodes"]
else []
),
}

@classmethod
def _resolve_node_deps(
cls,
node: dict,
id_to_node_map: Dict[str, Dict],
agg_deps: Optional[set] = None,
) -> Set[str]:
agg_deps = agg_deps or set()
dep_ids = node["depends_on_nodes"]
for dep_id in dep_ids:
dep_node = id_to_node_map.get(dep_id)
if dep_node and dep_node["materialization"] == "ephemeral":
agg_deps.update(
cls._resolve_node_deps(
dep_node,
id_to_node_map,
agg_deps,
)
Copy link
Collaborator

@haritamar haritamar Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't handle the case where dep_node["depends_on_nodes"] by itself contains ephemeral nodes.

Something I can suggest (other than making this function recursive) is to first sort by dependencies (toposort is a good library) and then handle nodes in reverse order.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - new_dep_ids may contain duplicates, probably better to make it a set

)
else:
agg_deps.add(dep_id)

return agg_deps
Loading