Skip to content

Commit

Permalink
Web: Show timeline of the called_processes (#238)
Browse files Browse the repository at this point in the history
Adds the `called_process` option in the timeline. Switching to this option will show the timeline for all the processes called by this WorkGraph.
  • Loading branch information
superstar54 committed Aug 18, 2024
1 parent 21eb8d0 commit 116b130
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 70 deletions.
82 changes: 48 additions & 34 deletions aiida_workgraph/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def get_parent_workgraphs(pk: int) -> list:


def get_processes_latest(
pk: int, node_name: str = None
pk: int, node_name: str = None, item_type: str = "task"
) -> Dict[str, Dict[str, Union[int, str]]]:
"""Get the latest info of all tasks from the process."""
import aiida
Expand All @@ -270,39 +270,53 @@ def get_processes_latest(
from aiida_workgraph.engine.workgraph import WorkGraphEngine

tasks = {}
node_names = [node_name] if node_name else []
if node_name:
projections = [
f"extras._task_state_{node_name}",
f"extras._task_process_{node_name}",
]
else:
projections = []
process = aiida.orm.load_node(pk)
node_names = [
key[12:]
for key in process.base.extras.keys()
if key.startswith("_task_state")
]
projections = [f"extras._task_state_{name}" for name in node_names]
projections.extend([f"extras._task_process_{name}" for name in node_names])
qb = QueryBuilder()
qb.append(WorkGraphEngine, filters={"id": pk}, project=projections)
# print("projections: ", projections)
results = qb.all()
# change results to dict
results = dict(zip(projections, results[0]))
# print("results: ", results)
for name in node_names:
state = results[f"extras._task_state_{name}"]
task_process = deserialize_unsafe(results[f"extras._task_process_{name}"])
tasks[name] = {
"pk": task_process.pk if task_process else None,
"process_type": task_process.process_type if task_process else "",
"state": state,
"ctime": task_process.ctime if task_process else None,
"mtime": task_process.mtime if task_process else None,
}
if item_type == "called_process":
# fetch the process that called by the workgraph
node = aiida.orm.load_node(pk)
for link in node.base.links.get_outgoing().all():
if isinstance(link.node, aiida.orm.ProcessNode):
tasks[f"{link.node.process_label}_{link.node.pk}"] = {
"pk": link.node.pk,
"process_type": link.node.process_type,
"state": link.node.process_state.value,
"ctime": link.node.ctime,
"mtime": link.node.mtime,
}
elif item_type == "task":
node_names = [node_name] if node_name else []
if node_name:
projections = [
f"extras._task_state_{node_name}",
f"extras._task_process_{node_name}",
]
else:
projections = []
process = aiida.orm.load_node(pk)
node_names = [
key[12:]
for key in process.base.extras.keys()
if key.startswith("_task_state")
]
projections = [f"extras._task_state_{name}" for name in node_names]
projections.extend([f"extras._task_process_{name}" for name in node_names])
qb = QueryBuilder()
qb.append(WorkGraphEngine, filters={"id": pk}, project=projections)
# print("projections: ", projections)
results = qb.all()
# change results to dict
results = dict(zip(projections, results[0]))
# print("results: ", results)
for name in node_names:
state = results[f"extras._task_state_{name}"]
task_process = deserialize_unsafe(results[f"extras._task_process_{name}"])
tasks[name] = {
"pk": task_process.pk if task_process else None,
"process_type": task_process.process_type if task_process else "",
"state": state,
"ctime": task_process.ctime if task_process else None,
"mtime": task_process.mtime if task_process else None,
}

return tasks


Expand Down
4 changes: 2 additions & 2 deletions aiida_workgraph/web/backend/app/workgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ async def read_workgraph(id: int):


@router.get("/api/workgraph-state/{id}")
async def read_workgraph_tasks_state(id: int):
async def read_workgraph_tasks_state(id: int, item_type: str = "task"):
from aiida_workgraph.utils import get_processes_latest

try:
processes_info = get_processes_latest(id)
processes_info = get_processes_latest(id, item_type=item_type)
return processes_info
except KeyError:
raise HTTPException(status_code=404, detail=f"Workgraph {id} not found")
Expand Down
100 changes: 66 additions & 34 deletions aiida_workgraph/web/frontend/src/components/WorkGraphDuration.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ const NodeDurationGraph = ({ id }) => {
const [timeStart, setTimeStart] = useState(null);
const [timeEnd, setTimeEnd] = useState(null);
const [initialLoad, setInitialLoad] = useState(true);
const [useItemType, setUseItemType] = useState("task");

// Function to fetch data from the backend
const fetchData = async () => {
try {
const response = await fetch(`http://localhost:8000/api/workgraph-state/${id}`);
const response = await fetch(`http://localhost:8000/api/workgraph-state/${id}?item_type=${useItemType}`);
if (!response.ok) {
throw new Error('Network response was not ok');
}
Expand All @@ -26,10 +26,11 @@ const NodeDurationGraph = ({ id }) => {
};

useEffect(() => {
setInitialLoad(true);
fetchData();
const interval = setInterval(fetchData, 5000);
return () => clearInterval(interval);
}, [id]);
}, [id, useItemType]);

useEffect(() => {
if (Object.keys(processesInfo).length) {
Expand All @@ -38,15 +39,13 @@ const NodeDurationGraph = ({ id }) => {
title: key
}));

const newItems = Object.entries(processesInfo).map(([key, { ctime, mtime, process_type }], idx) => {
return process_type ? {
id: idx,
group: idx,
title: key,
start_time: ctime ? moment(ctime) : null,
end_time: mtime ? moment(mtime) : null
} : null;
}).filter(item => item !== null);
const newItems = Object.entries(processesInfo).map(([key, { ctime, mtime, process_type }], idx) => ({
id: idx,
group: idx,
title: key,
start_time: ctime ? moment(ctime) : null,
end_time: mtime ? moment(mtime) : null
}));

setGroups(newGroups);
setItems(newItems);
Expand All @@ -58,20 +57,27 @@ const NodeDurationGraph = ({ id }) => {
setTimeStart(moment.min(validStartTimes).valueOf());
setTimeEnd(moment.max(validEndTimes).valueOf());
}
else {
// use the current time as the start time
setTimeStart(moment().valueOf());
// use the current time + 1 hour as the end time
setTimeEnd(moment().add(1, 'hour').valueOf());
}
setInitialLoad(false);
}
} else {
setGroups([]);
setItems([]);
setTimeStart(null);
setTimeEnd(null);
}
}, [processesInfo]);

const minZoom = 10000; // 10 seconds in milliseconds
const maxZoom = 365.25 * 24 * 60 * 60 * 1000; // 1 year in milliseconds

if (!timeStart || !timeEnd) {
return <div>Loading timeline...</div>;
}

return (
<div style={{ padding: '10px', margin: '20px', border: '1px solid #ccc', }}>
<div style={{ padding: '10px', margin: '20px', border: '1px solid #ccc' }}>
<h1 style={{ textAlign: 'center', color: '#2a3f5f' }}>Node Process Timeline</h1>
<div style={{textAlign: 'left', fontSize: '16px', color: '#555' }}>
<p>
Expand All @@ -80,24 +86,50 @@ const NodeDurationGraph = ({ id }) => {
<p>
Data nodes are shown as rows without bars.
</p>
</div>
<div style={{ margin: '10px' }}>
<label>
Task:
<input
type="radio"
value="task"
checked={useItemType === "task"}
onChange={(e) => setUseItemType(e.target.value)}
/>
</label>
<label style={{ marginLeft: '20px' }}>
Called Process:
<input
type="radio"
value="called_process"
checked={useItemType === "called_process"}
onChange={(e) => setUseItemType(e.target.value)}
/>
</label>
</div>
{items.length > 0 ? (
<Timeline
groups={groups}
items={items}
visibleTimeStart={timeStart}
visibleTimeEnd={timeEnd}
onTimeChange={(visibleTimeStart, visibleTimeEnd, updateScrollCanvas) => {
setTimeStart(visibleTimeStart);
setTimeEnd(visibleTimeEnd);
updateScrollCanvas(visibleTimeStart, visibleTimeEnd);
}}
lineHeight={50}
minZoom={minZoom}
maxZoom={maxZoom}
canMove={false}
canChangeGroup={false}
canResize={'both'}
/>
) : (
<div style={{ textAlign: 'center', color: '#D32F2F', marginTop: '20px', fontSize: '18px', fontWeight: 'bold' }}>
There are no items to display.
</div>
<Timeline
groups={groups}
items={items}
visibleTimeStart={timeStart}
visibleTimeEnd={timeEnd}
onTimeChange={(visibleTimeStart, visibleTimeEnd, updateScrollCanvas) => {
setTimeStart(visibleTimeStart);
setTimeEnd(visibleTimeEnd);
updateScrollCanvas(visibleTimeStart, visibleTimeEnd);
}}
lineHeight={50}
minZoom={minZoom}
maxZoom={maxZoom}
canMove={false}
canChangeGroup={false}
canResize={'both'}
/>
)}
</div>
);
};
Expand Down

0 comments on commit 116b130

Please sign in to comment.