From a1d4d74f7fec368783bafd3aa083abb244fb1d3a Mon Sep 17 00:00:00 2001 From: John Bodley Date: Thu, 2 Feb 2023 20:59:22 +1300 Subject: [PATCH] chore(datasets): Refactor DatasetDAO with bulk operations --- superset/datasets/dao.py | 115 +++++++++++++++++++++++---------------- 1 file changed, 68 insertions(+), 47 deletions(-) diff --git a/superset/datasets/dao.py b/superset/datasets/dao.py index d260df3610002..1c55723b47a23 100644 --- a/superset/datasets/dao.py +++ b/superset/datasets/dao.py @@ -187,39 +187,50 @@ def update_columns( then we delete. """ - column_by_id = {column.id: column for column in model.columns} - seen = set() - original_cols = {obj.id for obj in model.columns} - if override_columns: - for id_ in original_cols: - DatasetDAO.delete_column(column_by_id[id_], commit=False) + db.session.query(TableColumn).filter( + TableColumn.table_id == model.id + ).delete(synchronize_session="fetch") - db.session.flush() + db.session.bulk_insert_mappings( + TableColumn, + [ + {**properties, "table_id": model.id} + for properties in property_columns + ], + ) + else: + columns_by_id = {column.id: column for column in model.columns} + + property_columns_by_id = { + properties["id"]: properties + for properties in property_columns + if "id" in properties + } + + db.session.bulk_insert_mappings( + TableColumn, + [ + {**properties, "table_id": model.id} + for properties in property_columns + if not "id" in properties + ], + ) + + db.session.bulk_update_mappings( + TableColumn, + [ + {**columns_by_id[properties["id"]].__dict__, **properties} + for properties in property_columns_by_id.values() + ], + ) - for properties in property_columns: - DatasetDAO.create_column( - {**properties, "table_id": model.id}, - commit=False, + db.session.query(TableColumn).filter( + TableColumn.id.in_( + {column.id for column in model.columns} + - property_columns_by_id.keys() ) - else: - for properties in property_columns: - if "id" in properties: - seen.add(properties["id"]) - - DatasetDAO.update_column( - column_by_id[properties["id"]], - properties, - commit=False, - ) - else: - DatasetDAO.create_column( - {**properties, "table_id": model.id}, - commit=False, - ) - - for id_ in {obj.id for obj in model.columns} - seen: - DatasetDAO.delete_column(column_by_id[id_], commit=False) + ).delete(synchronize_session="fetch") if commit: db.session.commit() @@ -241,26 +252,36 @@ def update_metrics( then we delete. """ - metric_by_id = {metric.id: metric for metric in model.metrics} - seen = set() - - for properties in property_metrics: - if "id" in properties: - seen.add(properties["id"]) + metrics_by_id = {metric.id: metric for metric in model.metrics} + + property_metrics_by_id = { + properties["id"]: properties + for properties in property_metrics + if "id" in properties + } + + db.session.bulk_insert_mappings( + SqlMetric, + [ + {**properties, "table_id": model.id} + for properties in property_metrics + if not "id" in properties + ], + ) - DatasetDAO.update_metric( - metric_by_id[properties["id"]], - properties, - commit=False, - ) - else: - DatasetDAO.create_metric( - {**properties, "table_id": model.id}, - commit=False, - ) + db.session.bulk_update_mappings( + SqlMetric, + [ + {**metrics_by_id[properties["id"]].__dict__, **properties} + for properties in property_metrics_by_id.values() + ], + ) - for id_ in {obj.id for obj in model.metrics} - seen: - DatasetDAO.delete_column(metric_by_id[id_], commit=False) + db.session.query(SqlMetric).filter( + SqlMetric.id.in_( + {metric.id for metric in model.metrics} - property_metrics_by_id.keys() + ) + ).delete(synchronize_session="fetch") if commit: db.session.commit()