Skip to content

Commit

Permalink
Merge branch 'main' into feature/submission-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
superstar54 committed Aug 19, 2024
2 parents 2774f56 + 116b130 commit adffe35
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 72 deletions.
7 changes: 5 additions & 2 deletions aiida_workgraph/engine/workgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,10 +1079,13 @@ def run_tasks(self, names: t.List[str], continue_workgraph: bool = True) -> None
executor, args, kwargs, var_args, var_kwargs
)
if not isinstance(results, dict):
self.report("The results of the awaitable builder must be a dict.")

self.logger.error(
"The results of the awaitable builder must be a dict."
)
for key, value in results.items():
if not isinstance(value, ProcessNode):
self.report(
self.logger.error(
f"The value of key {key} is not an instance of ProcessNode."
)
self.set_task_state_info(name, "state", "Failed")
Expand Down
82 changes: 48 additions & 34 deletions aiida_workgraph/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def get_parent_workgraphs(pk: int) -> list:


def get_processes_latest(
pk: int, node_name: str = None
pk: int, node_name: str = None, item_type: str = "task"
) -> Dict[str, Dict[str, Union[int, str]]]:
"""Get the latest info of all tasks from the process."""
import aiida
Expand All @@ -270,39 +270,53 @@ def get_processes_latest(
from aiida_workgraph.engine.workgraph import WorkGraphEngine

tasks = {}
node_names = [node_name] if node_name else []
if node_name:
projections = [
f"extras._task_state_{node_name}",
f"extras._task_process_{node_name}",
]
else:
projections = []
process = aiida.orm.load_node(pk)
node_names = [
key[12:]
for key in process.base.extras.keys()
if key.startswith("_task_state")
]
projections = [f"extras._task_state_{name}" for name in node_names]
projections.extend([f"extras._task_process_{name}" for name in node_names])
qb = QueryBuilder()
qb.append(WorkGraphEngine, filters={"id": pk}, project=projections)
# print("projections: ", projections)
results = qb.all()
# change results to dict
results = dict(zip(projections, results[0]))
# print("results: ", results)
for name in node_names:
state = results[f"extras._task_state_{name}"]
task_process = deserialize_unsafe(results[f"extras._task_process_{name}"])
tasks[name] = {
"pk": task_process.pk if task_process else None,
"process_type": task_process.process_type if task_process else "",
"state": state,
"ctime": task_process.ctime if task_process else None,
"mtime": task_process.mtime if task_process else None,
}
if item_type == "called_process":
# fetch the process that called by the workgraph
node = aiida.orm.load_node(pk)
for link in node.base.links.get_outgoing().all():
if isinstance(link.node, aiida.orm.ProcessNode):
tasks[f"{link.node.process_label}_{link.node.pk}"] = {
"pk": link.node.pk,
"process_type": link.node.process_type,
"state": link.node.process_state.value,
"ctime": link.node.ctime,
"mtime": link.node.mtime,
}
elif item_type == "task":
node_names = [node_name] if node_name else []
if node_name:
projections = [
f"extras._task_state_{node_name}",
f"extras._task_process_{node_name}",
]
else:
projections = []
process = aiida.orm.load_node(pk)
node_names = [
key[12:]
for key in process.base.extras.keys()
if key.startswith("_task_state")
]
projections = [f"extras._task_state_{name}" for name in node_names]
projections.extend([f"extras._task_process_{name}" for name in node_names])
qb = QueryBuilder()
qb.append(WorkGraphEngine, filters={"id": pk}, project=projections)
# print("projections: ", projections)
results = qb.all()
# change results to dict
results = dict(zip(projections, results[0]))
# print("results: ", results)
for name in node_names:
state = results[f"extras._task_state_{name}"]
task_process = deserialize_unsafe(results[f"extras._task_process_{name}"])
tasks[name] = {
"pk": task_process.pk if task_process else None,
"process_type": task_process.process_type if task_process else "",
"state": state,
"ctime": task_process.ctime if task_process else None,
"mtime": task_process.mtime if task_process else None,
}

return tasks


Expand Down
4 changes: 2 additions & 2 deletions aiida_workgraph/web/backend/app/workgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ async def read_workgraph(id: int):


@router.get("/api/workgraph-state/{id}")
async def read_workgraph_tasks_state(id: int):
async def read_workgraph_tasks_state(id: int, item_type: str = "task"):
from aiida_workgraph.utils import get_processes_latest

try:
processes_info = get_processes_latest(id)
processes_info = get_processes_latest(id, item_type=item_type)
return processes_info
except KeyError:
raise HTTPException(status_code=404, detail=f"Workgraph {id} not found")
Expand Down
100 changes: 66 additions & 34 deletions aiida_workgraph/web/frontend/src/components/WorkGraphDuration.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ const NodeDurationGraph = ({ id }) => {
const [timeStart, setTimeStart] = useState(null);
const [timeEnd, setTimeEnd] = useState(null);
const [initialLoad, setInitialLoad] = useState(true);
const [useItemType, setUseItemType] = useState("task");

// Function to fetch data from the backend
const fetchData = async () => {
try {
const response = await fetch(`http://localhost:8000/api/workgraph-state/${id}`);
const response = await fetch(`http://localhost:8000/api/workgraph-state/${id}?item_type=${useItemType}`);
if (!response.ok) {
throw new Error('Network response was not ok');
}
Expand All @@ -26,10 +26,11 @@ const NodeDurationGraph = ({ id }) => {
};

useEffect(() => {
setInitialLoad(true);
fetchData();
const interval = setInterval(fetchData, 5000);
return () => clearInterval(interval);
}, [id]);
}, [id, useItemType]);

useEffect(() => {
if (Object.keys(processesInfo).length) {
Expand All @@ -38,15 +39,13 @@ const NodeDurationGraph = ({ id }) => {
title: key
}));

const newItems = Object.entries(processesInfo).map(([key, { ctime, mtime, process_type }], idx) => {
return process_type ? {
id: idx,
group: idx,
title: key,
start_time: ctime ? moment(ctime) : null,
end_time: mtime ? moment(mtime) : null
} : null;
}).filter(item => item !== null);
const newItems = Object.entries(processesInfo).map(([key, { ctime, mtime, process_type }], idx) => ({
id: idx,
group: idx,
title: key,
start_time: ctime ? moment(ctime) : null,
end_time: mtime ? moment(mtime) : null
}));

setGroups(newGroups);
setItems(newItems);
Expand All @@ -58,20 +57,27 @@ const NodeDurationGraph = ({ id }) => {
setTimeStart(moment.min(validStartTimes).valueOf());
setTimeEnd(moment.max(validEndTimes).valueOf());
}
else {
// use the current time as the start time
setTimeStart(moment().valueOf());
// use the current time + 1 hour as the end time
setTimeEnd(moment().add(1, 'hour').valueOf());
}
setInitialLoad(false);
}
} else {
setGroups([]);
setItems([]);
setTimeStart(null);
setTimeEnd(null);
}
}, [processesInfo]);

const minZoom = 10000; // 10 seconds in milliseconds
const maxZoom = 365.25 * 24 * 60 * 60 * 1000; // 1 year in milliseconds

if (!timeStart || !timeEnd) {
return <div>Loading timeline...</div>;
}

return (
<div style={{ padding: '10px', margin: '20px', border: '1px solid #ccc', }}>
<div style={{ padding: '10px', margin: '20px', border: '1px solid #ccc' }}>
<h1 style={{ textAlign: 'center', color: '#2a3f5f' }}>Node Process Timeline</h1>
<div style={{textAlign: 'left', fontSize: '16px', color: '#555' }}>
<p>
Expand All @@ -80,24 +86,50 @@ const NodeDurationGraph = ({ id }) => {
<p>
Data nodes are shown as rows without bars.
</p>
</div>
<div style={{ margin: '10px' }}>
<label>
Task:
<input
type="radio"
value="task"
checked={useItemType === "task"}
onChange={(e) => setUseItemType(e.target.value)}
/>
</label>
<label style={{ marginLeft: '20px' }}>
Called Process:
<input
type="radio"
value="called_process"
checked={useItemType === "called_process"}
onChange={(e) => setUseItemType(e.target.value)}
/>
</label>
</div>
{items.length > 0 ? (
<Timeline
groups={groups}
items={items}
visibleTimeStart={timeStart}
visibleTimeEnd={timeEnd}
onTimeChange={(visibleTimeStart, visibleTimeEnd, updateScrollCanvas) => {
setTimeStart(visibleTimeStart);
setTimeEnd(visibleTimeEnd);
updateScrollCanvas(visibleTimeStart, visibleTimeEnd);
}}
lineHeight={50}
minZoom={minZoom}
maxZoom={maxZoom}
canMove={false}
canChangeGroup={false}
canResize={'both'}
/>
) : (
<div style={{ textAlign: 'center', color: '#D32F2F', marginTop: '20px', fontSize: '18px', fontWeight: 'bold' }}>
There are no items to display.
</div>
<Timeline
groups={groups}
items={items}
visibleTimeStart={timeStart}
visibleTimeEnd={timeEnd}
onTimeChange={(visibleTimeStart, visibleTimeEnd, updateScrollCanvas) => {
setTimeStart(visibleTimeStart);
setTimeEnd(visibleTimeEnd);
updateScrollCanvas(visibleTimeStart, visibleTimeEnd);
}}
lineHeight={50}
minZoom={minZoom}
maxZoom={maxZoom}
canMove={false}
canChangeGroup={false}
canResize={'both'}
/>
)}
</div>
);
};
Expand Down

0 comments on commit adffe35

Please sign in to comment.