Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add source table #763

Merged
merged 5 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions mirar/database/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def validate_sql(cls, value: Any, info: FieldValidationInfo) -> Any:

def _insert_entry(
self,
duplicate_protocol: str,
returning_key_names: str | list[str] = None,
duplicate_protocol: str = "ignore",
) -> pd.DataFrame:
"""
Insert the pydantic-ified data into the corresponding sql database
Expand Down Expand Up @@ -186,15 +186,21 @@ def get_available_unique_keys(self) -> list[Column]:
return [x for x in self.get_unique_keys() if x.name in self.model_fields]

def insert_entry(
self, returning_key_names: str | list[str] | None = None
self,
duplicate_protocol: str,
returning_key_names: str | list[str] | None = None,
) -> pd.DataFrame:
"""
Insert the pydantic-ified data into the corresponding sql database

:param duplicate_protocol: protocol to follow if duplicate entry is found
:param returning_key_names: names of the keys to return
:return: dataframe of the sequence keys
"""
result = self._insert_entry(returning_key_names=returning_key_names)
result = self._insert_entry(
duplicate_protocol=duplicate_protocol,
returning_key_names=returning_key_names,
)
logger.debug(f"Return result {result}")
return result

Expand All @@ -218,6 +224,9 @@ def _update_entry(self, update_key_names: list[str] | str | None = None):

full_dict = self.model_dump()

if update_key_names is None:
update_key_names = full_dict.keys()

update_dict = {key: full_dict[key] for key in update_key_names}

_update_database_entry(
Expand Down
15 changes: 11 additions & 4 deletions mirar/pipelines/summer/models/_exposures.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,28 @@ def validate_fid(cls, nightdate: str) -> datetime:
"""
return datetime.strptime(nightdate, SUMMER_NIGHT_FORMAT)

def insert_entry(self, returning_key_names=None) -> pd.DataFrame:
def insert_entry(
self, duplicate_protocol: str, returning_key_names=None
) -> pd.DataFrame:
"""
Insert the pydantic-ified data into the corresponding sql database

:return: None
:param duplicate_protocol: protocol to follow if duplicate entry is found
:param returning_key_names: names of keys to return
:return: DataFrame of returning keys
"""
night = Night(nightdate=self.nightdate)
if not night.exists():
night.insert_entry()
night.insert_entry(duplicate_protocol="ignore")

logger.debug(f"puid: {self.puid}")
if not Program._exists(values=self.puid, keys="puid"):
self.puid = 1

return self._insert_entry()
return self._insert_entry(
duplicate_protocol=duplicate_protocol,
returning_key_names=returning_key_names,
)

def exists(self) -> bool:
"""
Expand Down
2 changes: 1 addition & 1 deletion mirar/pipelines/summer/models/_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ def populate_filters(filter_map: dict = None):
for filter_name, fid in filter_map.items():
summer_filter = Filter(fid=fid, filtername=filter_name)
if not summer_filter.exists():
summer_filter.insert_entry()
summer_filter.insert_entry(duplicate_protocol="ignore")
2 changes: 1 addition & 1 deletion mirar/pipelines/summer/models/_img_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ def populate_itid():
for i, img_type in enumerate(ALL_ITID):
itid = ImgType(itid=i + 1, imgtype=img_type)
if not itid.exists():
itid.insert_entry()
itid.insert_entry(duplicate_protocol="ignore")
2 changes: 1 addition & 1 deletion mirar/pipelines/summer/models/_programs.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,4 @@ def populate_programs():
"""

if not default_program.exists():
default_program.insert_entry()
default_program.insert_entry(duplicate_protocol="ignore")
2 changes: 1 addition & 1 deletion mirar/pipelines/summer/models/_subdets.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ def populate_subdets(ndetectors: int = 1, nxtot: int = 1, nytot: int = 1):
nxtot=nxtot,
nytot=nytot,
)
new.insert_entry()
new.insert_entry(duplicate_protocol="fail")
49 changes: 36 additions & 13 deletions mirar/pipelines/winter/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
MAX_DITHER_KEY,
OBSCLASS_KEY,
RAW_IMG_KEY,
SOURCE_HISTORY_KEY,
SOURCE_NAME_KEY,
TARGET_KEY,
ZP_KEY,
Expand Down Expand Up @@ -44,12 +45,14 @@
winter_candidate_quality_filterer,
winter_fourier_filtered_image_generator,
winter_history_deprecated_constraint,
winter_new_source_updater,
winter_photometric_catalog_generator,
winter_photometric_ref_catalog_namer,
winter_reference_generator,
winter_reference_image_resampler_for_zogy,
winter_reference_psfex,
winter_reference_sextractor,
winter_source_entry_updater,
winter_stackid_annotator,
)
from mirar.pipelines.winter.load_winter_image import (
Expand All @@ -61,14 +64,15 @@
load_winter_stack,
)
from mirar.pipelines.winter.models import (
CANDIDATE_PREFIX,
DEFAULT_FIELD,
NAME_START,
SOURCE_PREFIX,
AstrometryStat,
Candidate,
Diff,
Exposure,
Raw,
Source,
Stack,
)
from mirar.pipelines.winter.validator import (
Expand All @@ -94,8 +98,8 @@
DatabaseSourceInserter,
)
from mirar.processors.database.database_selector import (
CrossmatchSourceWithDatabase,
DatabaseHistorySelector,
SelectSourcesWithMetadata,
SingleSpatialCrossmatchSource,
)
from mirar.processors.database.database_updater import ImageDatabaseMultiEntryUpdater
from mirar.processors.flat import FlatCalibrator
Expand Down Expand Up @@ -550,28 +554,47 @@
XMatch(catalog=TMASS(num_sources=3, search_radius_arcmin=0.5)),
XMatch(catalog=PS1(num_sources=3, search_radius_arcmin=0.5)),
SourceWriter(output_dir_name="kowalski"),
CrossmatchSourceWithDatabase(
db_table=Candidate,
db_output_columns=[SOURCE_NAME_KEY],
# Check if the source is already in the source table
SingleSpatialCrossmatchSource(
db_table=Source,
db_output_columns=["sourceid", SOURCE_NAME_KEY],
crossmatch_radius_arcsec=2.0,
max_num_results=1,
ra_field_name="average_ra",
dec_field_name="average_dec",
),
# Assign names to the new sources
CandidateNamer(
db_table=Candidate,
base_name=CANDIDATE_PREFIX,
db_table=Source,
base_name=SOURCE_PREFIX,
name_start=NAME_START,
db_name_field=SOURCE_NAME_KEY,
db_order_field="sourceid",
),
DatabaseHistorySelector(
crossmatch_radius_arcsec=2.0,
time_field_name="jd",
history_duration_days=500.0,
# Add the new sources to the source table
CustomSourceTableModifier(modifier_function=winter_new_source_updater),
DatabaseSourceInserter(
db_table=Source,
duplicate_protocol="ignore",
),
# Get all candidates associated with source
SelectSourcesWithMetadata(
db_query_columns=["sourceid"],
db_table=Candidate,
db_output_columns=prv_candidate_cols + [SOURCE_NAME_KEY],
base_output_column=SOURCE_HISTORY_KEY,
additional_query_constraints=winter_history_deprecated_constraint,
),
CustomSourceTableModifier(
modifier_function=winter_candidate_avro_fields_calculator
),
# Update average ra and dec for source
CustomSourceTableModifier(modifier_function=winter_source_entry_updater),
# Update sources in the source table
DatabaseSourceInserter(
db_table=Source,
duplicate_protocol="replace",
),
# Add candidates in the candidate table
DatabaseSourceInserter(
db_table=Candidate,
duplicate_protocol="fail",
Expand Down
68 changes: 63 additions & 5 deletions mirar/pipelines/winter/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
OBSCLASS_KEY,
REF_CAT_PATH_KEY,
SATURATE_KEY,
SOURCE_HISTORY_KEY,
TIME_KEY,
ZP_KEY,
ZP_STD_KEY,
Expand Down Expand Up @@ -419,6 +420,62 @@ def winter_candidate_annotator_filterer(source_batch: SourceBatch) -> SourceBatc
return new_batch


def winter_new_source_updater(source_table: SourceBatch) -> SourceBatch:
"""
Function to add relevant fields for new sources

:param source_table: Original source table
:return: Updated source table
"""
for source in source_table:
src_df = source.get_data()

src_df["ndet"] = 1
src_df["average_ra"] = src_df["ra"]
src_df["average_dec"] = src_df["dec"]

source.set_data(src_df)

return source_table


def winter_source_entry_updater(source_table: SourceBatch) -> SourceBatch:
"""
Function to update the source table with new source averages

:param source_table: Original source table
:return: Updated source table
"""
for source in source_table:
src_df = source.get_data()

hist_dfs = [
pd.DataFrame(src_df[SOURCE_HISTORY_KEY].loc[x]) for x in range(len(src_df))
]

src_df["ndet"] = [len(x) + 1 for x in hist_dfs]

average_ras, average_decs = [], []

for i, hist_df in enumerate(hist_dfs):
if len(hist_df) == 0:
average_ras.append(src_df["ra"].iloc[i])
average_decs.append(src_df["dec"].iloc[i])
else:
average_ras.append(
np.mean(hist_df["ra"].tolist() + [src_df["ra"].iloc[i]])
)
average_decs.append(
np.mean(hist_df["dec"].tolist() + [src_df["dec"].iloc[i]])
)

src_df["average_ra"] = average_ras
src_df["average_dec"] = average_decs
source.set_data(src_df)

return source_table


def winter_candidate_avro_fields_calculator(source_table: SourceBatch) -> SourceBatch:
"""
Function to calculate the AVRO fields for WINTER
Expand All @@ -433,7 +490,7 @@ def winter_candidate_avro_fields_calculator(source_table: SourceBatch) -> Source
src_df["magfromlim"] = source["diffmaglim"] - src_df["magpsf"]

hist_dfs = [
pd.DataFrame(src_df["prv_candidates"].loc[x]) for x in range(len(src_df))
pd.DataFrame(src_df[SOURCE_HISTORY_KEY].loc[x]) for x in range(len(src_df))
]

jdstarthists, jdendhists = [], []
Expand Down Expand Up @@ -591,10 +648,11 @@ def winter_reference_generator(image: Image):
stack_image_annotator=winter_reference_stack_annotator,
)

if filtername == "Y":
# Use PS1 references for Y-band
logger.debug("Will query reference image from PS1")
return PS1Ref(filter_name=filtername)
assert filtername == "Y", f"Filter {filtername} not recognized for WINTER"

# Use PS1 references for Y-band
logger.debug("Will query reference image from PS1")
return PS1Ref(filter_name=filtername)


winter_history_deprecated_constraint = DBQueryConstraints(
Expand Down
23 changes: 16 additions & 7 deletions mirar/pipelines/winter/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@
AstrometryStat,
AstrometryStatsTable,
)
from mirar.pipelines.winter.models._candidates import (
CANDIDATE_PREFIX,
NAME_START,
Candidate,
CandidatesTable,
)
from mirar.pipelines.winter.models._candidates import Candidate, CandidatesTable
from mirar.pipelines.winter.models._diff import Diff, DiffsTable
from mirar.pipelines.winter.models._exposures import Exposure, ExposuresTable
from mirar.pipelines.winter.models._fields import (
Expand Down Expand Up @@ -55,6 +50,13 @@
)
from mirar.pipelines.winter.models._ref_queries import RefQueriesTable, RefQuery
from mirar.pipelines.winter.models._ref_stacks import RefStack, RefStacksTable
from mirar.pipelines.winter.models._sources import (
MIN_NAME_LENGTH,
NAME_START,
SOURCE_PREFIX,
Source,
SourcesTable,
)
from mirar.pipelines.winter.models._stack import Stack, StacksTable
from mirar.pipelines.winter.models._subdets import (
Subdet,
Expand Down Expand Up @@ -85,7 +87,14 @@ def set_up_q3c(db_name: str, db_table: BaseTable):
if DB_USER is not None:
setup_database(db_base=WinterBase)

for table in [ExposuresTable, CandidatesTable, RefQueriesTable, StacksTable]:
for table in [
ExposuresTable,
AstrometryStatsTable,
CandidatesTable,
RefQueriesTable,
StacksTable,
SourcesTable,
]:
set_up_q3c(db_name=WinterBase.db_name, db_table=table)

populate_fields()
Expand Down
20 changes: 16 additions & 4 deletions mirar/pipelines/winter/models/_candidates.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@ class CandidatesTable(WinterBase): # pylint: disable=too-few-public-methods

# Image properties

diffid: Mapped[int] = mapped_column(ForeignKey("diffs.diffid")) # FIXME
sourceid: Mapped[int] = mapped_column(ForeignKey("sources.sourceid"))
source: Mapped["SourcesTable"] = relationship(back_populates="candidates")

diffid: Mapped[int] = mapped_column(ForeignKey("diffs.diffid"))
diff_id: Mapped["DiffsTable"] = relationship(back_populates="candidates")

stackid: Mapped[int] = mapped_column(ForeignKey("stacks.stackid")) # FIXME
stackid: Mapped[int] = mapped_column(ForeignKey("stacks.stackid"))
stack_id: Mapped["StacksTable"] = relationship(back_populates="candidates")

fid: Mapped[int] = mapped_column(ForeignKey("filters.fid"))
Expand Down Expand Up @@ -212,6 +215,8 @@ class Candidate(BaseDB):
objectid: str = Field(min_length=MIN_NAME_LENGTH)
deprecated: bool = Field(default=False)

sourceid: int = Field(ge=0)

jd: float = Field(ge=0)

diffid: int | None = Field(ge=0, default=None)
Expand Down Expand Up @@ -327,10 +332,14 @@ class Candidate(BaseDB):
maggaia: float | None = Field(default=None)
maggaiabright: float | None = Field(default=None)

def insert_entry(self, returning_key_names=None) -> pd.DataFrame:
def insert_entry(
self, duplicate_protocol, returning_key_names=None
) -> pd.DataFrame:
"""
Insert the pydantic-ified data into the corresponding sql database

:param duplicate_protocol: protocol to follow if duplicate entry is found
:param returning_key_names: names of the keys to return
:return: None
"""

Expand All @@ -345,4 +354,7 @@ def insert_entry(self, returning_key_names=None) -> pd.DataFrame:
)
self.progname = default_program.progname

return self._insert_entry(returning_key_names=returning_key_names)
return self._insert_entry(
duplicate_protocol=duplicate_protocol,
returning_key_names=returning_key_names,
)
Loading
Loading