From 52019c6584345820cd2d993c1a6aa31c298d0aa4 Mon Sep 17 00:00:00 2001 From: GabrielKS <23368820+GabrielKS@users.noreply.github.com> Date: Wed, 16 Jun 2021 15:54:44 -0500 Subject: [PATCH 1/8] Add a label inference pipeline stage + Includes a placeholder prediction algorithm returning dummy data. + Adds inferred labels to confirmed trips. Testing done: Ensured that placeholder label predictions were being added to confirmed trip objects. No client-side work or testing done yet. --- .../inference/labels/__init__.py | 0 .../inference/labels/pipeline.py | 68 +++++++++++++++++++ emission/analysis/userinput/matcher.py | 9 +++ emission/core/wrapper/confirmedtrip.py | 1 + emission/core/wrapper/entry.py | 2 + emission/core/wrapper/labelprediction.py | 16 +++++ emission/core/wrapper/pipelinestate.py | 1 + emission/pipeline/intake_stage.py | 9 +++ .../analysis_timeseries_queries.py | 1 + emission/storage/pipeline_queries.py | 16 +++++ .../storage/timeseries/builtin_timeseries.py | 1 + 11 files changed, 124 insertions(+) create mode 100644 emission/analysis/classification/inference/labels/__init__.py create mode 100644 emission/analysis/classification/inference/labels/pipeline.py create mode 100644 emission/core/wrapper/labelprediction.py diff --git a/emission/analysis/classification/inference/labels/__init__.py b/emission/analysis/classification/inference/labels/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/emission/analysis/classification/inference/labels/pipeline.py b/emission/analysis/classification/inference/labels/pipeline.py new file mode 100644 index 000000000..b3fa70018 --- /dev/null +++ b/emission/analysis/classification/inference/labels/pipeline.py @@ -0,0 +1,68 @@ +# Standard imports +import logging + +# Our imports +import emission.storage.pipeline_queries as epq +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.wrapper.labelprediction as ecwl + +def infer_labels(user_id): + time_query = epq.get_time_range_for_label_inference(user_id) + try: + lip = LabelInferencePipeline() + lip.user_id = user_id + lip.run_prediction_pipeline(user_id, time_query) + if lip.last_trip_done is None: + logging.debug("After run, last_trip_done == None, must be early return") + epq.mark_label_inference_done(user_id, lip.last_trip_done) + except: + logging.exception("Error while inferring labels, timestamp is unchanged") + epq.mark_label_inference_failed(user_id) + +# Code structure based on emission.analysis.classification.inference.mode.pipeline +# and emission.analysis.classification.inference.mode.rule_engine +class LabelInferencePipeline: + def __init__(self): + self._last_trip_done = None + + @property + def last_trip_done(self): + return self._last_trip_done + + def run_prediction_pipeline(self, user_id, time_range): + self.ts = esta.TimeSeries.get_time_series(user_id) + self.toPredictTrips = esda.get_entries( + esda.CLEANED_TRIP_KEY, user_id, time_query=time_range) + for trip in self.toPredictTrips: + prediction = predict_trip(trip) + lp = ecwl.Labelprediction() + lp.trip_id = trip.get_id() + lp.prediction = prediction + lp.start_ts = trip.data.start_ts + lp.end_ts = trip.data.end_ts + # Insert the Labelprediction into the database as its own independent document + self.ts.insert_data(self.user_id, esda.INFERRED_LABELS_KEY, lp) + if self._last_trip_done is None or self._last_trip_done.data.end_ts < trip.data.end_ts: + self._last_trip_done = trip + +# This is where the labels for a given trip are actually predicted. +# Though the only information passed in is the trip object, the trip object can provide the +# user_id and other potentially useful information. +def predict_trip(trip): + return placeholder_prediction(trip) + +# A placeholder predictor to allow pipeline development without a real inference algorithm +def placeholder_prediction(trip): + # For the moment, the system is configured to work with two labels, "mode_confirm" and + # "purpose_confirm", so I'll do that. This placeholder situation represent a case where it is + # hard to distinguish between biking and walking (e.g., because the user is a very slow biker) + # and hard to distinguish between work and shopping at the grocery store (e.g., because the + # user works at the grocery store), but whenever the user bikes to the location it is to work + # and whenever the user walks to the location it is to shop (e.g., because they don't have a + # basket on their bike), and the user bikes to the location four times more than they walk + # there. Obviously, it is a simplification. + return [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ] diff --git a/emission/analysis/userinput/matcher.py b/emission/analysis/userinput/matcher.py index df4875a1f..603e210e5 100644 --- a/emission/analysis/userinput/matcher.py +++ b/emission/analysis/userinput/matcher.py @@ -79,6 +79,7 @@ def create_confirmed_trips(user_id, timerange): confirmed_trip_dict["data"]["cleaned_trip"] = tct.get_id() confirmed_trip_dict["data"]["user_input"] = \ get_user_input_dict(ts, tct, input_key_list) + confirmed_trip_dict["data"]["inferred_labels"] = get_inferred_labels_list(tct) confirmed_trip_entry = ecwe.Entry(confirmed_trip_dict) # save the entry ts.insert(confirmed_trip_entry) @@ -97,3 +98,11 @@ def get_user_input_dict(ts, tct, input_key_list): logging.debug("for trip %s, returning user input dict %s" % (tct.get_id(), tct_userinput)) return tct_userinput +# For a given trip, find the corresponding list of label inferences if it has been generated +def get_inferred_labels_list(trip): + candidates = esdt.get_sections_for_trip(esda.INFERRED_LABELS_KEY, trip.user_id, trip.get_id()) + if len(candidates) == 0: return {} # Perhaps we have not run the inference step for this trip + assert len(candidates) == 1, \ + "Multiple label inference list objects for trip "+str(trip.get_id()) + return candidates[0]["data"]["prediction"] + diff --git a/emission/core/wrapper/confirmedtrip.py b/emission/core/wrapper/confirmedtrip.py index 3d6357e93..f20fa3632 100644 --- a/emission/core/wrapper/confirmedtrip.py +++ b/emission/core/wrapper/confirmedtrip.py @@ -16,6 +16,7 @@ class Confirmedtrip(ecwt.Trip): # https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-738120752 "primary_section": ecwb.WrapperBase.Access.WORM, "inferred_primary_mode": ecwb.WrapperBase.Access.WORM, + "inferred_labels": ecwb.WrapperBase.Access.WORM, # the user input will have all `manual/*` entries # let's make that be somewhat flexible instead of hardcoding into the data model "user_input": ecwb.WrapperBase.Access.WORM diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index 0986ed657..ca0702d15 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -121,6 +121,8 @@ def _getData2Wrapper(): "mode_inference/model": "modeinfermodel", # the predicted mode for a particular section "inference/prediction": "modeprediction", + # the predicted labels for a particular trip + "inference/labels": "labelprediction", # equivalent of cleaned_section, but with the mode set to the # inferred mode instead of just walk/bike/motorized # used for consistency and to make the client work whether or not we were diff --git a/emission/core/wrapper/labelprediction.py b/emission/core/wrapper/labelprediction.py new file mode 100644 index 000000000..82997458f --- /dev/null +++ b/emission/core/wrapper/labelprediction.py @@ -0,0 +1,16 @@ +# Based on modeprediction.py +import emission.core.wrapper.wrapperbase as ecwb + +class Labelprediction(ecwb.WrapperBase): + props = {"trip_id": ecwb.WrapperBase.Access.WORM, # the trip that this is part of + "prediction": ecwb.WrapperBase.Access.WORM, # What we predict + "start_ts": ecwb.WrapperBase.Access.WORM, # start time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline + "end_ts": ecwb.WrapperBase.Access.WORM, # end time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline + } + + enums = {} + geojson = {} + local_dates = {} + + def _populateDependencies(self): + pass diff --git a/emission/core/wrapper/pipelinestate.py b/emission/core/wrapper/pipelinestate.py index 3f978569b..b0dce4347 100644 --- a/emission/core/wrapper/pipelinestate.py +++ b/emission/core/wrapper/pipelinestate.py @@ -18,6 +18,7 @@ class PipelineStages(enum.Enum): JUMP_SMOOTHING = 3 CLEAN_RESAMPLING = 11 MODE_INFERENCE = 4 + LABEL_INFERENCE = 14 CREATE_CONFIRMED_OBJECTS = 13 TOUR_MODEL = 5 ALTERNATIVES = 10 diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 9657fb2fe..6c87a96ac 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -30,6 +30,7 @@ import emission.analysis.intake.cleaning.location_smoothing as eaicl import emission.analysis.intake.cleaning.clean_and_resample as eaicr import emission.analysis.classification.inference.mode.pipeline as eacimp +import emission.analysis.classification.inference.labels.pipeline as eacilp import emission.net.ext_service.habitica.executor as autocheck import emission.storage.decorations.stats_queries as esds @@ -158,6 +159,14 @@ def run_intake_pipeline_for_user(uuid): esds.store_pipeline_time(uuid, ecwp.PipelineStages.MODE_INFERENCE.name, time.time(), crt.elapsed) + with ect.Timer() as crt: + logging.info("*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10) + eacilp.infer_labels(uuid) + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.LABEL_INFERENCE.name, + time.time(), crt.elapsed) + with ect.Timer() as crt: logging.info("*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) diff --git a/emission/storage/decorations/analysis_timeseries_queries.py b/emission/storage/decorations/analysis_timeseries_queries.py index c7def2790..fedcfbb4f 100644 --- a/emission/storage/decorations/analysis_timeseries_queries.py +++ b/emission/storage/decorations/analysis_timeseries_queries.py @@ -34,6 +34,7 @@ METRICS_DAILY_MEAN_DURATION = "metrics/daily_mean_duration" METRICS_DAILY_USER_MEDIAN_SPEED = "metrics/daily_user_median_speed" METRICS_DAILY_MEAN_MEDIAN_SPEED = "metrics/daily_mean_median_speed" +INFERRED_LABELS_KEY = "inference/labels" # General methods diff --git a/emission/storage/pipeline_queries.py b/emission/storage/pipeline_queries.py index 56183282d..5477a54d4 100644 --- a/emission/storage/pipeline_queries.py +++ b/emission/storage/pipeline_queries.py @@ -185,6 +185,22 @@ def mark_mode_inference_done(user_id, last_section_done): def mark_mode_inference_failed(user_id): mark_stage_failed(user_id, ps.PipelineStages.MODE_INFERENCE) +def get_time_range_for_label_inference(user_id): + tq = get_time_range_for_stage(user_id, ps.PipelineStages.LABEL_INFERENCE) + tq.timeType = "data.end_ts" + return tq + +# This stage operates on trips, not sections +def mark_label_inference_done(user_id, last_trip_done): + if last_trip_done is None: + mark_stage_done(user_id, ps.PipelineStages.LABEL_INFERENCE, None) + else: + mark_stage_done(user_id, ps.PipelineStages.LABEL_INFERENCE, + last_trip_done.data.end_ts + END_FUZZ_AVOID_LTE) + +def mark_label_inference_failed(user_id): + mark_stage_failed(user_id, ps.PipelineStages.LABEL_INFERENCE) + def get_time_range_for_output_gen(user_id): return get_time_range_for_stage(user_id, ps.PipelineStages.OUTPUT_GEN) diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index f1cf9468b..59ed025f1 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -81,6 +81,7 @@ def __init__(self, user_id): "metrics/daily_user_median_speed": self.analysis_timeseries_db, "metrics/daily_mean_median_speed": self.analysis_timeseries_db, "inference/prediction": self.analysis_timeseries_db, + "inference/labels": self.analysis_timeseries_db, "analysis/inferred_section": self.analysis_timeseries_db, "analysis/confirmed_trip": self.analysis_timeseries_db, "analysis/confirmed_section": self.analysis_timeseries_db From e076f4c253561cce5e2d95720b9ef0df27204b3b Mon Sep 17 00:00:00 2001 From: GabrielKS <23368820+GabrielKS@users.noreply.github.com> Date: Sat, 19 Jun 2021 00:26:02 -0500 Subject: [PATCH 2/8] Add more placeholder label inference options + Adds two more placeholder label inference "algorithms" to facilitate front-end testing --- .../inference/labels/pipeline.py | 75 ++++++++++++++++--- 1 file changed, 64 insertions(+), 11 deletions(-) diff --git a/emission/analysis/classification/inference/labels/pipeline.py b/emission/analysis/classification/inference/labels/pipeline.py index b3fa70018..0a18f0505 100644 --- a/emission/analysis/classification/inference/labels/pipeline.py +++ b/emission/analysis/classification/inference/labels/pipeline.py @@ -52,17 +52,70 @@ def run_prediction_pipeline(self, user_id, time_range): def predict_trip(trip): return placeholder_prediction(trip) +# For testing only! +trip_n = 0 +import random + # A placeholder predictor to allow pipeline development without a real inference algorithm -def placeholder_prediction(trip): +def placeholder_prediction(trip, scenario=2): # For the moment, the system is configured to work with two labels, "mode_confirm" and - # "purpose_confirm", so I'll do that. This placeholder situation represent a case where it is - # hard to distinguish between biking and walking (e.g., because the user is a very slow biker) - # and hard to distinguish between work and shopping at the grocery store (e.g., because the - # user works at the grocery store), but whenever the user bikes to the location it is to work - # and whenever the user walks to the location it is to shop (e.g., because they don't have a - # basket on their bike), and the user bikes to the location four times more than they walk - # there. Obviously, it is a simplification. + # "purpose_confirm", so I'll do that. + + # For testing only! + global trip_n + trip_n %= 6 + trip_n += 1 + return [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ] + # The first placeholder scenario represents a case where it is hard to distinguish between + # biking and walking (e.g., because the user is a very slow biker) and hard to distinguish + # between work and shopping at the grocery store (e.g., because the user works at the + # grocery store), but whenever the user bikes to the location it is to work and whenever + # the user walks to the location it is to shop (e.g., because they don't have a basket on + # their bike), and the user bikes to the location four times more than they walk there. + # Obviously, it is a simplification. + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ], + + # The next placeholder scenario provides that same set of labels in 75% of cases and no + # labels in the rest. + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ] + if random.random() > 0.25 else [], + + # This third scenario provides labels designed to test the soundness and resilience of + # the client-side inference processing algorithms. + [ + [ + + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ], + [ + {"labels": {"mode_confirm": "drove_alone"}, "p": 0.8}, + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ] + ][6-trip_n] + ][scenario] + From 0f1ed52e7525ea5f41d63642bca1eaeb9d5af8ba Mon Sep 17 00:00:00 2001 From: GabrielKS <23368820+GabrielKS@users.noreply.github.com> Date: Thu, 1 Jul 2021 12:36:57 -0600 Subject: [PATCH 3/8] Add comments explaining prediction data structure --- emission/core/wrapper/labelprediction.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/emission/core/wrapper/labelprediction.py b/emission/core/wrapper/labelprediction.py index 82997458f..523241e14 100644 --- a/emission/core/wrapper/labelprediction.py +++ b/emission/core/wrapper/labelprediction.py @@ -1,9 +1,16 @@ # Based on modeprediction.py import emission.core.wrapper.wrapperbase as ecwb +# The "prediction" data structure is a list of label possibilities, each one consisting of a set of labels and a probability: +# [ +# {"labels": {"labeltype1": "labelvalue1", "labeltype2": "labelvalue2"}, "p": 0.61}, +# {"labels": {"labeltype1": "labelvalue3", "labeltype2": "labelvalue4"}, "p": 0.27}, +# ... +# ] + class Labelprediction(ecwb.WrapperBase): props = {"trip_id": ecwb.WrapperBase.Access.WORM, # the trip that this is part of - "prediction": ecwb.WrapperBase.Access.WORM, # What we predict + "prediction": ecwb.WrapperBase.Access.WORM, # What we predict -- see above "start_ts": ecwb.WrapperBase.Access.WORM, # start time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline "end_ts": ecwb.WrapperBase.Access.WORM, # end time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline } From d0136fc8c041739bf2809f74b3e8b44846957785 Mon Sep 17 00:00:00 2001 From: GabrielKS <23368820+GabrielKS@users.noreply.github.com> Date: Thu, 1 Jul 2021 14:57:35 -0600 Subject: [PATCH 4/8] Restructure label inference pipeline to accommodate multiple algorithms + The label inference pipeline.py now handles multiple algorithms less awkwardly and has a place for combination of multiple predictions into an ensemble. + The pipeline now stores each algorithm's prediction and a final (perhaps ensemble) prediction in different places Tested by confirming that client-side behavior is the same as before. --- .../inference/labels/pipeline.py | 178 ++++++++++-------- emission/core/wrapper/entry.py | 8 +- emission/core/wrapper/labelprediction.py | 20 +- .../storage/timeseries/builtin_timeseries.py | 1 + 4 files changed, 122 insertions(+), 85 deletions(-) diff --git a/emission/analysis/classification/inference/labels/pipeline.py b/emission/analysis/classification/inference/labels/pipeline.py index 0a18f0505..883b393fe 100644 --- a/emission/analysis/classification/inference/labels/pipeline.py +++ b/emission/analysis/classification/inference/labels/pipeline.py @@ -1,5 +1,7 @@ # Standard imports import logging +import random +import copy # Our imports import emission.storage.pipeline_queries as epq @@ -20,6 +22,77 @@ def infer_labels(user_id): logging.exception("Error while inferring labels, timestamp is unchanged") epq.mark_label_inference_failed(user_id) +# A set of placeholder predictors to allow pipeline development without a real inference algorithm. +# For the moment, the system is configured to work with two labels, "mode_confirm" and +# "purpose_confirm", so I'll do that. + +# The first placeholder scenario represents a case where it is hard to distinguish between +# biking and walking (e.g., because the user is a very slow biker) and hard to distinguish +# between work and shopping at the grocery store (e.g., because the user works at the +# grocery store), but whenever the user bikes to the location it is to work and whenever +# the user walks to the location it is to shop (e.g., because they don't have a basket on +# their bike), and the user bikes to the location four times more than they walk there. +# Obviously, it is a simplification. +def placeholder_predictor_0(trip): + return [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ] + + +# The next placeholder scenario provides that same set of labels in 75% of cases and no +# labels in the rest. +def placeholder_predictor_1(trip): + return [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ] if random.random() > 0.25 else [] + + +# This third scenario provides labels designed to test the soundness and resilience of +# the client-side inference processing algorithms. +trip_n = 0 # ugly use of globals for testing only +def placeholder_predictor_2(trip): + global trip_n # ugly use of globals for testing only + trip_n %= 6 + trip_n += 1 + return [ + [ + + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ], + [ + {"labels": {"mode_confirm": "drove_alone"}, "p": 0.8}, + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ] + ][6-trip_n] + +# For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which +# runs on the results of other algorithms), primary_algorithms specifies a corresponding +# function to run +primary_algorithms = { + # This can be edited to select a different placeholder predictor + ecwl.AlgorithmTypes.PLACEHOLDER: placeholder_predictor_1 +} + # Code structure based on emission.analysis.classification.inference.mode.pipeline # and emission.analysis.classification.inference.mode.rule_engine class LabelInferencePipeline: @@ -35,87 +108,38 @@ def run_prediction_pipeline(self, user_id, time_range): self.toPredictTrips = esda.get_entries( esda.CLEANED_TRIP_KEY, user_id, time_query=time_range) for trip in self.toPredictTrips: - prediction = predict_trip(trip) + results = self.compute_and_save_algorithms(trip) + self.compute_and_save_ensemble(trip, results) + if self._last_trip_done is None or self._last_trip_done.data.end_ts < trip.data.end_ts: + self._last_trip_done = trip + + # This is where the labels for a given trip are actually predicted. + # Though the only information passed in is the trip object, the trip object can provide the + # user_id and other potentially useful information. + def compute_and_save_algorithms(self, trip): + predictions = [] + for algorithm_id, algorithm_fn in primary_algorithms.items(): + prediction = algorithm_fn(trip) lp = ecwl.Labelprediction() lp.trip_id = trip.get_id() + lp.algorithm_id = algorithm_id lp.prediction = prediction lp.start_ts = trip.data.start_ts lp.end_ts = trip.data.end_ts - # Insert the Labelprediction into the database as its own independent document - self.ts.insert_data(self.user_id, esda.INFERRED_LABELS_KEY, lp) - if self._last_trip_done is None or self._last_trip_done.data.end_ts < trip.data.end_ts: - self._last_trip_done = trip + self.ts.insert_data(self.user_id, "inference/labels", lp) + predictions.append(lp) + return predictions -# This is where the labels for a given trip are actually predicted. -# Though the only information passed in is the trip object, the trip object can provide the -# user_id and other potentially useful information. -def predict_trip(trip): - return placeholder_prediction(trip) + # Combine all our predictions into a single ensemble prediction. + def compute_and_save_ensemble(self, trip, predictions): + il = ecwl.Labelprediction() + il.trip_id = trip.get_id() + il.algorithm_id = ecwl.AlgorithmTypes.ENSEMBLE + il.start_ts = trip.data.start_ts + il.end_ts = trip.data.end_ts -# For testing only! -trip_n = 0 -import random - -# A placeholder predictor to allow pipeline development without a real inference algorithm -def placeholder_prediction(trip, scenario=2): - # For the moment, the system is configured to work with two labels, "mode_confirm" and - # "purpose_confirm", so I'll do that. - - # For testing only! - global trip_n - trip_n %= 6 - trip_n += 1 - - return [ - # The first placeholder scenario represents a case where it is hard to distinguish between - # biking and walking (e.g., because the user is a very slow biker) and hard to distinguish - # between work and shopping at the grocery store (e.g., because the user works at the - # grocery store), but whenever the user bikes to the location it is to work and whenever - # the user walks to the location it is to shop (e.g., because they don't have a basket on - # their bike), and the user bikes to the location four times more than they walk there. - # Obviously, it is a simplification. - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ], - - # The next placeholder scenario provides that same set of labels in 75% of cases and no - # labels in the rest. - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ] - if random.random() > 0.25 else [], - - # This third scenario provides labels designed to test the soundness and resilience of - # the client-side inference processing algorithms. - [ - [ - - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ], - [ - {"labels": {"mode_confirm": "drove_alone"}, "p": 0.8}, - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} - ] - ][6-trip_n] - ][scenario] - + # As a placeholder, we just take the first prediction. + # TODO: implement a real combination algorithm. + il.prediction = copy.copy(predictions[0]["prediction"]) + + self.ts.insert_data(self.user_id, "analysis/inferred_labels", il) diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index ca0702d15..752541580 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -119,15 +119,17 @@ def _getData2Wrapper(): # the generated model for the random forest based mode inference # saved so that it can be used for prediction without retraining "mode_inference/model": "modeinfermodel", - # the predicted mode for a particular section + # the predicted mode for a particular section (one entry per algorithm) "inference/prediction": "modeprediction", - # the predicted labels for a particular trip + # the predicted labels for a particular trip (one entry per algorithm) "inference/labels": "labelprediction", # equivalent of cleaned_section, but with the mode set to the # inferred mode instead of just walk/bike/motorized # used for consistency and to make the client work whether or not we were - # running the inference step + # the final inferred section mode (possibly an ensemble result) "analysis/inferred_section": "inferredsection", + # the final inferred label data structure (possibly an ensemble result) + "analysis/inferred_labels": "labelprediction", ### ** END: prediction objects ### ** BEGIN: confirmed objects which combine inferred and user input values "analysis/confirmed_trip": "confirmedtrip", diff --git a/emission/core/wrapper/labelprediction.py b/emission/core/wrapper/labelprediction.py index 523241e14..9bf2b12ff 100644 --- a/emission/core/wrapper/labelprediction.py +++ b/emission/core/wrapper/labelprediction.py @@ -1,5 +1,6 @@ # Based on modeprediction.py import emission.core.wrapper.wrapperbase as ecwb +import enum # The "prediction" data structure is a list of label possibilities, each one consisting of a set of labels and a probability: # [ @@ -8,14 +9,23 @@ # ... # ] + +class AlgorithmTypes(enum.Enum): + ENSEMBLE = 0 + PLACEHOLDER = 1 + + class Labelprediction(ecwb.WrapperBase): - props = {"trip_id": ecwb.WrapperBase.Access.WORM, # the trip that this is part of - "prediction": ecwb.WrapperBase.Access.WORM, # What we predict -- see above - "start_ts": ecwb.WrapperBase.Access.WORM, # start time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline - "end_ts": ecwb.WrapperBase.Access.WORM, # end time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline + props = {"trip_id": ecwb.WrapperBase.Access.WORM, # the trip that this is part of + "algorithm_id": ecwb.WrapperBase.Access.WORM, # the algorithm that made this prediction + "prediction": ecwb.WrapperBase.Access.WORM, # What we predict -- see above + "start_ts": ecwb.WrapperBase.Access.WORM, # start time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline + "end_ts": ecwb.WrapperBase.Access.WORM, # end time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline } - enums = {} + enums = { + "algorithm_id": AlgorithmTypes + } geojson = {} local_dates = {} diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index 59ed025f1..94b11c66a 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -83,6 +83,7 @@ def __init__(self, user_id): "inference/prediction": self.analysis_timeseries_db, "inference/labels": self.analysis_timeseries_db, "analysis/inferred_section": self.analysis_timeseries_db, + "analysis/inferred_labels": self.analysis_timeseries_db, "analysis/confirmed_trip": self.analysis_timeseries_db, "analysis/confirmed_section": self.analysis_timeseries_db } From dee24619a30582baa8c8699fe7ec7ac7edb9bfde Mon Sep 17 00:00:00 2001 From: GabrielKS <23368820+GabrielKS@users.noreply.github.com> Date: Thu, 1 Jul 2021 22:36:39 -0600 Subject: [PATCH 5/8] Add inferences to confirmed trips directly + The label inference pipeline now reads confirmed trips instead of cleaned trips + It then writes inference data directly to confirmed trips instead of relying on matcher.py to find the data in the database Tested by confirming that client-side behavior is the same as before. --- .../inference/labels/pipeline.py | 24 +++++++++++++------ emission/analysis/userinput/matcher.py | 10 -------- emission/core/wrapper/pipelinestate.py | 2 +- emission/pipeline/intake_stage.py | 16 ++++++------- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/emission/analysis/classification/inference/labels/pipeline.py b/emission/analysis/classification/inference/labels/pipeline.py index 883b393fe..54895dfa8 100644 --- a/emission/analysis/classification/inference/labels/pipeline.py +++ b/emission/analysis/classification/inference/labels/pipeline.py @@ -9,6 +9,7 @@ import emission.storage.decorations.analysis_timeseries_queries as esda import emission.core.wrapper.labelprediction as ecwl +# Does all the work necessary for a given user def infer_labels(user_id): time_query = epq.get_time_range_for_label_inference(user_id) try: @@ -87,10 +88,10 @@ def placeholder_predictor_2(trip): # For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which # runs on the results of other algorithms), primary_algorithms specifies a corresponding -# function to run +# function to run. This makes it easy to plug in additional algorithms later. primary_algorithms = { # This can be edited to select a different placeholder predictor - ecwl.AlgorithmTypes.PLACEHOLDER: placeholder_predictor_1 + ecwl.AlgorithmTypes.PLACEHOLDER: placeholder_predictor_2 } # Code structure based on emission.analysis.classification.inference.mode.pipeline @@ -103,13 +104,19 @@ def __init__(self): def last_trip_done(self): return self._last_trip_done + # For a given user and time range, runs all the primary algorithms and ensemble, saves results + # to the database, and records progress def run_prediction_pipeline(self, user_id, time_range): self.ts = esta.TimeSeries.get_time_series(user_id) self.toPredictTrips = esda.get_entries( - esda.CLEANED_TRIP_KEY, user_id, time_query=time_range) + esda.CONFIRMED_TRIP_KEY, user_id, time_query=time_range) for trip in self.toPredictTrips: results = self.compute_and_save_algorithms(trip) - self.compute_and_save_ensemble(trip, results) + ensemble = self.compute_and_save_ensemble(trip, results) + + # Add final prediction to the confirmed trip entry in the database + trip["data"]["inferred_labels"] = ensemble["prediction"] + self.ts.update(trip) if self._last_trip_done is None or self._last_trip_done.data.end_ts < trip.data.end_ts: self._last_trip_done = trip @@ -131,15 +138,18 @@ def compute_and_save_algorithms(self, trip): return predictions # Combine all our predictions into a single ensemble prediction. + # As a placeholder, we just take the first prediction. + # TODO: implement a real combination algorithm. def compute_and_save_ensemble(self, trip, predictions): il = ecwl.Labelprediction() il.trip_id = trip.get_id() - il.algorithm_id = ecwl.AlgorithmTypes.ENSEMBLE + # Since this is not a real ensemble yet, we will not mark it as such + # il.algorithm_id = ecwl.AlgorithmTypes.ENSEMBLE + il.algorithm_id = ecwl.AlgorithmTypes(predictions[0]["algorithm_id"]) il.start_ts = trip.data.start_ts il.end_ts = trip.data.end_ts - # As a placeholder, we just take the first prediction. - # TODO: implement a real combination algorithm. il.prediction = copy.copy(predictions[0]["prediction"]) self.ts.insert_data(self.user_id, "analysis/inferred_labels", il) + return il diff --git a/emission/analysis/userinput/matcher.py b/emission/analysis/userinput/matcher.py index 603e210e5..a15ccc027 100644 --- a/emission/analysis/userinput/matcher.py +++ b/emission/analysis/userinput/matcher.py @@ -79,7 +79,6 @@ def create_confirmed_trips(user_id, timerange): confirmed_trip_dict["data"]["cleaned_trip"] = tct.get_id() confirmed_trip_dict["data"]["user_input"] = \ get_user_input_dict(ts, tct, input_key_list) - confirmed_trip_dict["data"]["inferred_labels"] = get_inferred_labels_list(tct) confirmed_trip_entry = ecwe.Entry(confirmed_trip_dict) # save the entry ts.insert(confirmed_trip_entry) @@ -97,12 +96,3 @@ def get_user_input_dict(ts, tct, input_key_list): tct_userinput[ikey_name] = matched_userinput.data.label logging.debug("for trip %s, returning user input dict %s" % (tct.get_id(), tct_userinput)) return tct_userinput - -# For a given trip, find the corresponding list of label inferences if it has been generated -def get_inferred_labels_list(trip): - candidates = esdt.get_sections_for_trip(esda.INFERRED_LABELS_KEY, trip.user_id, trip.get_id()) - if len(candidates) == 0: return {} # Perhaps we have not run the inference step for this trip - assert len(candidates) == 1, \ - "Multiple label inference list objects for trip "+str(trip.get_id()) - return candidates[0]["data"]["prediction"] - diff --git a/emission/core/wrapper/pipelinestate.py b/emission/core/wrapper/pipelinestate.py index b0dce4347..6b39e04e2 100644 --- a/emission/core/wrapper/pipelinestate.py +++ b/emission/core/wrapper/pipelinestate.py @@ -18,8 +18,8 @@ class PipelineStages(enum.Enum): JUMP_SMOOTHING = 3 CLEAN_RESAMPLING = 11 MODE_INFERENCE = 4 - LABEL_INFERENCE = 14 CREATE_CONFIRMED_OBJECTS = 13 + LABEL_INFERENCE = 14 TOUR_MODEL = 5 ALTERNATIVES = 10 USER_MODEL = 7 diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 6c87a96ac..bf1cda1a7 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -159,14 +159,6 @@ def run_intake_pipeline_for_user(uuid): esds.store_pipeline_time(uuid, ecwp.PipelineStages.MODE_INFERENCE.name, time.time(), crt.elapsed) - with ect.Timer() as crt: - logging.info("*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10) - eacilp.infer_labels(uuid) - - esds.store_pipeline_time(uuid, ecwp.PipelineStages.LABEL_INFERENCE.name, - time.time(), crt.elapsed) - with ect.Timer() as crt: logging.info("*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) @@ -175,6 +167,14 @@ def run_intake_pipeline_for_user(uuid): esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS.name, time.time(), crt.elapsed) + with ect.Timer() as crt: + logging.info("*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10) + eacilp.infer_labels(uuid) + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.LABEL_INFERENCE.name, + time.time(), crt.elapsed) + with ect.Timer() as ogt: logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) From 49e58346c4f7f3c05adc4ead8fd220876554679c Mon Sep 17 00:00:00 2001 From: GabrielKS <23368820+GabrielKS@users.noreply.github.com> Date: Wed, 7 Jul 2021 00:47:38 -0600 Subject: [PATCH 6/8] Refactor inference pipeline stage to create new trip object The inference pipeline stage now takes cleaned trips and outputs "inferred trips," which the CREATE_CONFIRMED_OBJECTS stage then accepts. Tested by confirming that the client-side behavior is the same. --- .../inference/labels/pipeline.py | 27 ++++++++++++------- emission/analysis/userinput/matcher.py | 14 +++++----- emission/core/wrapper/confirmedtrip.py | 3 ++- emission/core/wrapper/entry.py | 1 + emission/core/wrapper/inferredtrip.py | 19 +++++++++++++ emission/core/wrapper/pipelinestate.py | 2 +- emission/pipeline/intake_stage.py | 16 +++++------ .../analysis_timeseries_queries.py | 1 + .../storage/timeseries/builtin_timeseries.py | 1 + 9 files changed, 57 insertions(+), 27 deletions(-) create mode 100644 emission/core/wrapper/inferredtrip.py diff --git a/emission/analysis/classification/inference/labels/pipeline.py b/emission/analysis/classification/inference/labels/pipeline.py index 54895dfa8..60059236d 100644 --- a/emission/analysis/classification/inference/labels/pipeline.py +++ b/emission/analysis/classification/inference/labels/pipeline.py @@ -8,6 +8,7 @@ import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.decorations.analysis_timeseries_queries as esda import emission.core.wrapper.labelprediction as ecwl +import emission.core.wrapper.entry as ecwe # Does all the work necessary for a given user def infer_labels(user_id): @@ -109,16 +110,22 @@ def last_trip_done(self): def run_prediction_pipeline(self, user_id, time_range): self.ts = esta.TimeSeries.get_time_series(user_id) self.toPredictTrips = esda.get_entries( - esda.CONFIRMED_TRIP_KEY, user_id, time_query=time_range) - for trip in self.toPredictTrips: - results = self.compute_and_save_algorithms(trip) - ensemble = self.compute_and_save_ensemble(trip, results) - - # Add final prediction to the confirmed trip entry in the database - trip["data"]["inferred_labels"] = ensemble["prediction"] - self.ts.update(trip) - if self._last_trip_done is None or self._last_trip_done.data.end_ts < trip.data.end_ts: - self._last_trip_done = trip + esda.CLEANED_TRIP_KEY, user_id, time_query=time_range) + for cleaned_trip in self.toPredictTrips: + results = self.compute_and_save_algorithms(cleaned_trip) + ensemble = self.compute_and_save_ensemble(cleaned_trip, results) + + # Create an inferred trip entry in the database + inferred_trip = copy.copy(cleaned_trip); + del inferred_trip["_id"]; + inferred_trip["metadata"]["key"] = "analysis/inferred_trip" + inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id() + inferred_trip["data"]["inferred_labels"] = ensemble["prediction"] + self.ts.insert(ecwe.Entry(inferred_trip)) + + + if self._last_trip_done is None or self._last_trip_done.data.end_ts < cleaned_trip.data.end_ts: + self._last_trip_done = cleaned_trip # This is where the labels for a given trip are actually predicted. # Though the only information passed in is the trip object, the trip object can provide the diff --git a/emission/analysis/userinput/matcher.py b/emission/analysis/userinput/matcher.py index a15ccc027..65d103c22 100644 --- a/emission/analysis/userinput/matcher.py +++ b/emission/analysis/userinput/matcher.py @@ -51,21 +51,21 @@ def match_incoming_inputs(user_id, timerange): def create_confirmed_objects(user_id): time_query = epq.get_time_range_for_confirmed_object_creation(user_id) try: - last_cleaned_trip_done = create_confirmed_trips(user_id, time_query) - if last_cleaned_trip_done is None: - logging.debug("after run, last_cleaned_trip_done == None, must be early return") + last_inferred_trip_done = create_confirmed_trips(user_id, time_query) + if last_inferred_trip_done is None: + logging.debug("after run, last_inferred_trip_done == None, must be early return") epq.mark_confirmed_object_creation_done(user_id, None) else: - epq.mark_confirmed_object_creation_done(user_id, last_cleaned_trip_done.data.end_ts) + epq.mark_confirmed_object_creation_done(user_id, last_inferred_trip_done.data.end_ts) except: logging.exception("Error while creating confirmed objects, timestamp is unchanged") epq.mark_confirmed_object_creation_failed(user_id) def create_confirmed_trips(user_id, timerange): ts = esta.TimeSeries.get_time_series(user_id) - toConfirmTrips = esda.get_entries(esda.CLEANED_TRIP_KEY, user_id, + toConfirmTrips = esda.get_entries(esda.INFERRED_TRIP_KEY, user_id, time_query=timerange) - logging.debug("Converting %d cleaned trips to confirmed ones" % len(toConfirmTrips)) + logging.debug("Converting %d inferred trips to confirmed ones" % len(toConfirmTrips)) lastTripProcessed = None if len(toConfirmTrips) == 0: logging.debug("len(toConfirmTrips) == 0, early return") @@ -76,7 +76,7 @@ def create_confirmed_trips(user_id, timerange): confirmed_trip_dict = copy.copy(tct) del confirmed_trip_dict["_id"] confirmed_trip_dict["metadata"]["key"] = "analysis/confirmed_trip" - confirmed_trip_dict["data"]["cleaned_trip"] = tct.get_id() + confirmed_trip_dict["data"]["inferred_trip"] = tct.get_id() confirmed_trip_dict["data"]["user_input"] = \ get_user_input_dict(ts, tct, input_key_list) confirmed_trip_entry = ecwe.Entry(confirmed_trip_dict) diff --git a/emission/core/wrapper/confirmedtrip.py b/emission/core/wrapper/confirmedtrip.py index f20fa3632..0919861ea 100644 --- a/emission/core/wrapper/confirmedtrip.py +++ b/emission/core/wrapper/confirmedtrip.py @@ -12,11 +12,12 @@ class Confirmedtrip(ecwt.Trip): props = ecwt.Trip.props props.update({"raw_trip": ecwb.WrapperBase.Access.WORM, "cleaned_trip": ecwb.WrapperBase.Access.WORM, + "inferred_labels": ecwb.WrapperBase.Access.WORM, + "inferred_trip": ecwb.WrapperBase.Access.WORM, # the confirmed section that is the "primary" # https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-738120752 "primary_section": ecwb.WrapperBase.Access.WORM, "inferred_primary_mode": ecwb.WrapperBase.Access.WORM, - "inferred_labels": ecwb.WrapperBase.Access.WORM, # the user input will have all `manual/*` entries # let's make that be somewhat flexible instead of hardcoding into the data model "user_input": ecwb.WrapperBase.Access.WORM diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index 752541580..8e436679e 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -132,6 +132,7 @@ def _getData2Wrapper(): "analysis/inferred_labels": "labelprediction", ### ** END: prediction objects ### ** BEGIN: confirmed objects which combine inferred and user input values + "analysis/inferred_trip": "inferredtrip", "analysis/confirmed_trip": "confirmedtrip", "analysis/confirmed_section": "confirmedsection" ### ** END: confirmed objects which combine inferred and user input values diff --git a/emission/core/wrapper/inferredtrip.py b/emission/core/wrapper/inferredtrip.py new file mode 100644 index 000000000..1f9d0c520 --- /dev/null +++ b/emission/core/wrapper/inferredtrip.py @@ -0,0 +1,19 @@ +from __future__ import unicode_literals +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import +from future import standard_library +standard_library.install_aliases() +from builtins import * +import emission.core.wrapper.trip as ecwt +import emission.core.wrapper.wrapperbase as ecwb + +class Inferredtrip(ecwt.Trip): + props = ecwt.Trip.props + props.update({"raw_trip": ecwb.WrapperBase.Access.WORM, + "cleaned_trip": ecwb.WrapperBase.Access.WORM, + "inferred_labels": ecwb.WrapperBase.Access.WORM + }) + + def _populateDependencies(self): + super(Inferredtrip, self)._populateDependencies() diff --git a/emission/core/wrapper/pipelinestate.py b/emission/core/wrapper/pipelinestate.py index 6b39e04e2..b0dce4347 100644 --- a/emission/core/wrapper/pipelinestate.py +++ b/emission/core/wrapper/pipelinestate.py @@ -18,8 +18,8 @@ class PipelineStages(enum.Enum): JUMP_SMOOTHING = 3 CLEAN_RESAMPLING = 11 MODE_INFERENCE = 4 - CREATE_CONFIRMED_OBJECTS = 13 LABEL_INFERENCE = 14 + CREATE_CONFIRMED_OBJECTS = 13 TOUR_MODEL = 5 ALTERNATIVES = 10 USER_MODEL = 7 diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index bf1cda1a7..6c87a96ac 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -159,14 +159,6 @@ def run_intake_pipeline_for_user(uuid): esds.store_pipeline_time(uuid, ecwp.PipelineStages.MODE_INFERENCE.name, time.time(), crt.elapsed) - with ect.Timer() as crt: - logging.info("*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) - eaum.create_confirmed_objects(uuid) - - esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS.name, - time.time(), crt.elapsed) - with ect.Timer() as crt: logging.info("*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10) @@ -175,6 +167,14 @@ def run_intake_pipeline_for_user(uuid): esds.store_pipeline_time(uuid, ecwp.PipelineStages.LABEL_INFERENCE.name, time.time(), crt.elapsed) + with ect.Timer() as crt: + logging.info("*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10) + eaum.create_confirmed_objects(uuid) + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS.name, + time.time(), crt.elapsed) + with ect.Timer() as ogt: logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) diff --git a/emission/storage/decorations/analysis_timeseries_queries.py b/emission/storage/decorations/analysis_timeseries_queries.py index fedcfbb4f..cf6819299 100644 --- a/emission/storage/decorations/analysis_timeseries_queries.py +++ b/emission/storage/decorations/analysis_timeseries_queries.py @@ -25,6 +25,7 @@ CLEANED_STOP_KEY = "analysis/cleaned_stop" CLEANED_UNTRACKED_KEY = "analysis/cleaned_untracked" CLEANED_LOCATION_KEY = "analysis/recreated_location" +INFERRED_TRIP_KEY = "analysis/inferred_trip" CONFIRMED_TRIP_KEY = "analysis/confirmed_trip" METRICS_DAILY_USER_COUNT = "metrics/daily_user_count" METRICS_DAILY_MEAN_COUNT = "metrics/daily_mean_count" diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index 94b11c66a..397438868 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -84,6 +84,7 @@ def __init__(self, user_id): "inference/labels": self.analysis_timeseries_db, "analysis/inferred_section": self.analysis_timeseries_db, "analysis/inferred_labels": self.analysis_timeseries_db, + "analysis/inferred_trip": self.analysis_timeseries_db, "analysis/confirmed_trip": self.analysis_timeseries_db, "analysis/confirmed_section": self.analysis_timeseries_db } From 5940c5dc76bad9250c22b4b7aaddeaea57d7a2df Mon Sep 17 00:00:00 2001 From: GabrielKS <23368820+GabrielKS@users.noreply.github.com> Date: Wed, 7 Jul 2021 23:09:27 -0600 Subject: [PATCH 7/8] Write unit tests for the label inference pipeline stage + TestLabelInferencePipeline.py now tests the plumbing of labels.pipeline.py. It does not test the actual inference algorithms (which have not been implemented for real yet). + The pipeline code has been reworked slightly to fix bugs illuminated by the unit tests and to make placeholder_predictor_2 deterministic Tested by running the unit tests and by confirming that client-side behavior is the same --- .../inference/labels/pipeline.py | 42 ++++----- emission/core/wrapper/labelprediction.py | 4 +- .../TestLabelInferencePipeline.py | 85 +++++++++++++++++++ 3 files changed, 109 insertions(+), 22 deletions(-) create mode 100644 emission/tests/pipelineTests/TestLabelInferencePipeline.py diff --git a/emission/analysis/classification/inference/labels/pipeline.py b/emission/analysis/classification/inference/labels/pipeline.py index 60059236d..2a3722d69 100644 --- a/emission/analysis/classification/inference/labels/pipeline.py +++ b/emission/analysis/classification/inference/labels/pipeline.py @@ -53,11 +53,11 @@ def placeholder_predictor_1(trip): # This third scenario provides labels designed to test the soundness and resilience of # the client-side inference processing algorithms. -trip_n = 0 # ugly use of globals for testing only def placeholder_predictor_2(trip): - global trip_n # ugly use of globals for testing only - trip_n %= 6 - trip_n += 1 + # Hardcoded to match "test_july_22" -- clearly, this is just for testing + timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0} + timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"] + index = timestamp2index[timestamp] if timestamp in timestamp2index else 0 return [ [ @@ -85,14 +85,13 @@ def placeholder_predictor_2(trip): {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} ] - ][6-trip_n] + ][index] # For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which # runs on the results of other algorithms), primary_algorithms specifies a corresponding # function to run. This makes it easy to plug in additional algorithms later. primary_algorithms = { - # This can be edited to select a different placeholder predictor - ecwl.AlgorithmTypes.PLACEHOLDER: placeholder_predictor_2 + ecwl.AlgorithmTypes.PLACEHOLDER_2: placeholder_predictor_2 } # Code structure based on emission.analysis.classification.inference.mode.pipeline @@ -112,19 +111,20 @@ def run_prediction_pipeline(self, user_id, time_range): self.toPredictTrips = esda.get_entries( esda.CLEANED_TRIP_KEY, user_id, time_query=time_range) for cleaned_trip in self.toPredictTrips: - results = self.compute_and_save_algorithms(cleaned_trip) - ensemble = self.compute_and_save_ensemble(cleaned_trip, results) - - # Create an inferred trip entry in the database - inferred_trip = copy.copy(cleaned_trip); - del inferred_trip["_id"]; - inferred_trip["metadata"]["key"] = "analysis/inferred_trip" + # Create an inferred trip + cleaned_trip_dict = copy.copy(cleaned_trip)["data"] + inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict) + + # Run the algorithms and the ensemble, store results + results = self.compute_and_save_algorithms(inferred_trip) + ensemble = self.compute_and_save_ensemble(inferred_trip, results) + + # Put final results into the inferred trip and store it inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id() inferred_trip["data"]["inferred_labels"] = ensemble["prediction"] - self.ts.insert(ecwe.Entry(inferred_trip)) - + self.ts.insert(inferred_trip) - if self._last_trip_done is None or self._last_trip_done.data.end_ts < cleaned_trip.data.end_ts: + if self._last_trip_done is None or self._last_trip_done["data"]["end_ts"] < cleaned_trip["data"]["end_ts"]: self._last_trip_done = cleaned_trip # This is where the labels for a given trip are actually predicted. @@ -138,8 +138,8 @@ def compute_and_save_algorithms(self, trip): lp.trip_id = trip.get_id() lp.algorithm_id = algorithm_id lp.prediction = prediction - lp.start_ts = trip.data.start_ts - lp.end_ts = trip.data.end_ts + lp.start_ts = trip["data"]["start_ts"] + lp.end_ts = trip["data"]["end_ts"] self.ts.insert_data(self.user_id, "inference/labels", lp) predictions.append(lp) return predictions @@ -153,8 +153,8 @@ def compute_and_save_ensemble(self, trip, predictions): # Since this is not a real ensemble yet, we will not mark it as such # il.algorithm_id = ecwl.AlgorithmTypes.ENSEMBLE il.algorithm_id = ecwl.AlgorithmTypes(predictions[0]["algorithm_id"]) - il.start_ts = trip.data.start_ts - il.end_ts = trip.data.end_ts + il.start_ts = trip["data"]["start_ts"] + il.end_ts = trip["data"]["end_ts"] il.prediction = copy.copy(predictions[0]["prediction"]) diff --git a/emission/core/wrapper/labelprediction.py b/emission/core/wrapper/labelprediction.py index 9bf2b12ff..d25e089c3 100644 --- a/emission/core/wrapper/labelprediction.py +++ b/emission/core/wrapper/labelprediction.py @@ -12,7 +12,9 @@ class AlgorithmTypes(enum.Enum): ENSEMBLE = 0 - PLACEHOLDER = 1 + PLACEHOLDER_0 = 1 + PLACEHOLDER_1 = 2 + PLACEHOLDER_2 = 3 class Labelprediction(ecwb.WrapperBase): diff --git a/emission/tests/pipelineTests/TestLabelInferencePipeline.py b/emission/tests/pipelineTests/TestLabelInferencePipeline.py new file mode 100644 index 000000000..f0e931543 --- /dev/null +++ b/emission/tests/pipelineTests/TestLabelInferencePipeline.py @@ -0,0 +1,85 @@ +# This tests the label inference pipeline. It uses fake data and placeholder inference algorithms +# and thus intentionally does not test the real inference algorithms themselves. +import unittest +import numpy as np +from logging import config # epi.run_intake_pipeline needs this to be done here +import time + +import emission.core.wrapper.user as ecwu +import emission.pipeline.intake_stage as epi +import emission.pipeline.reset as epr +import emission.analysis.classification.inference.labels.pipeline as eacilp +import emission.core.wrapper.labelprediction as ecwl +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.storage.timeseries.timequery as estt +import emission.storage.decorations.trip_queries as esdt + +class TestLabelInferencePipeline(unittest.TestCase): + # It is important that these functions be deterministic + test_algorithms = { + ecwl.AlgorithmTypes.PLACEHOLDER_0: eacilp.placeholder_predictor_0, + ecwl.AlgorithmTypes.PLACEHOLDER_2: eacilp.placeholder_predictor_2 + } + + def setUp(self): + np.random.seed(61297777) + self.user_id = ecwu.User.fromEmail("test_july_22").uuid + self.reset_pipeline() + self.run_pipeline(self.test_algorithms) + time_range = estt.TimeQuery("metadata.write_ts", None, time.time()) + self.cleaned_trips = esda.get_entries(esda.CLEANED_TRIP_KEY, self.user_id, time_query=time_range) + self.cleaned_id_to_trip = {trip.get_id(): trip for trip in self.cleaned_trips} + self.inferred_trips = esda.get_entries(esda.INFERRED_TRIP_KEY, self.user_id, time_query=time_range) + + def tearDown(self): + self.reset_pipeline() + + def run_pipeline(self, algorithms): + default_primary_algorithms = eacilp.primary_algorithms + eacilp.primary_algorithms = algorithms + epi.run_intake_pipeline("single", [self.user_id]) + eacilp.primary_algorithms = default_primary_algorithms + + def reset_pipeline(self): + epr.reset_user_to_start(self.user_id, False) + + # Tests that the fields from the cleaned trip are carried over into the inferred trip correctly + def testPipelineIntegrity(self): + self.assertEqual(len(self.inferred_trips), len(self.cleaned_trips)) + for inferred_trip in self.inferred_trips: + cleaned_id = inferred_trip["data"]["cleaned_trip"] + self.assertIn(cleaned_id, self.cleaned_id_to_trip.keys()) + cleaned_trip = self.cleaned_id_to_trip[cleaned_id] + self.assertEqual(inferred_trip["data"]["raw_trip"], cleaned_trip["data"]["raw_trip"]) + self.assertTrue(inferred_trip["data"]["inferred_labels"]) # Check for existence here, check for congruence later + + # Tests that each of the (test) algorithms runs and saves to the database correctly + def testIndividualAlgorithms(self): + for trip in self.inferred_trips: + entries = esdt.get_sections_for_trip("inference/labels", self.user_id, trip.get_id()) + self.assertEqual(len(entries), len(self.test_algorithms)) + for entry in entries: + self.assertEqual(entry["data"]["trip_id"], trip.get_id()) + this_algorithm = ecwl.AlgorithmTypes(entry["data"]["algorithm_id"]) + self.assertIn(this_algorithm, self.test_algorithms) + self.assertEqual(entry["data"]["prediction"], self.test_algorithms[this_algorithm](trip)) + self.assertEqual(entry["data"]["start_ts"], trip["data"]["start_ts"]) + self.assertEqual(entry["data"]["end_ts"], trip["data"]["end_ts"]) + + # Tests that the ensemble algorithm runs and saves to the database correctly + def testEnsemble(self): + for trip in self.inferred_trips: + entries = esdt.get_sections_for_trip("analysis/inferred_labels", self.user_id, trip.get_id()) + self.assertEqual(len(entries), 1) + entry = entries[0] + # TODO: when we have a real ensemble implemented: + # self.assertEqual(entry["data"]["algorithm_id"], ecwl.AlgorithmTypes.ENSEMBLE) + # TODO: perhaps assert something about the prediction when we have a real ensemble + self.assertEqual(entry["data"]["start_ts"], trip["data"]["start_ts"]) + self.assertEqual(entry["data"]["end_ts"], trip["data"]["end_ts"]) + + # Tests that the final inferred labels in the inferred trip are the same as those given by the ensemble algorithm + def testInferredTrip(self): + for trip in self.inferred_trips: + entry = esdt.get_sections_for_trip("analysis/inferred_labels", self.user_id, trip.get_id())[0] + self.assertEqual(trip["data"]["inferred_labels"], entry["data"]["prediction"]) From 7095cceedc62c359a644f85be3aa797b474dc77b Mon Sep 17 00:00:00 2001 From: GabrielKS <23368820+GabrielKS@users.noreply.github.com> Date: Thu, 8 Jul 2021 12:52:45 -0600 Subject: [PATCH 8/8] Use existing test procedures for TestLabelInferencePipeline --- emission/tests/common.py | 2 + .../TestLabelInferencePipeline.py | 37 +++++++++++-------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/emission/tests/common.py b/emission/tests/common.py index 00ba988dd..72be03cc5 100644 --- a/emission/tests/common.py +++ b/emission/tests/common.py @@ -172,6 +172,7 @@ def runIntakePipeline(uuid): import emission.analysis.intake.cleaning.location_smoothing as eaicl import emission.analysis.intake.cleaning.clean_and_resample as eaicr import emission.analysis.classification.inference.mode.pipeline as eacimp + import emission.analysis.classification.inference.labels.pipeline as eacilp eaum.match_incoming_user_inputs(uuid) eaicf.filter_accuracy(uuid) @@ -180,6 +181,7 @@ def runIntakePipeline(uuid): eaicl.filter_current_sections(uuid) eaicr.clean_and_resample(uuid) eacimp.predict_mode(uuid) + eacilp.infer_labels(uuid) eaum.create_confirmed_objects(uuid) def configLogging(): diff --git a/emission/tests/pipelineTests/TestLabelInferencePipeline.py b/emission/tests/pipelineTests/TestLabelInferencePipeline.py index f0e931543..28d37562f 100644 --- a/emission/tests/pipelineTests/TestLabelInferencePipeline.py +++ b/emission/tests/pipelineTests/TestLabelInferencePipeline.py @@ -2,17 +2,15 @@ # and thus intentionally does not test the real inference algorithms themselves. import unittest import numpy as np -from logging import config # epi.run_intake_pipeline needs this to be done here import time -import emission.core.wrapper.user as ecwu -import emission.pipeline.intake_stage as epi -import emission.pipeline.reset as epr import emission.analysis.classification.inference.labels.pipeline as eacilp import emission.core.wrapper.labelprediction as ecwl import emission.storage.decorations.analysis_timeseries_queries as esda import emission.storage.timeseries.timequery as estt import emission.storage.decorations.trip_queries as esdt +import emission.core.get_database as edb +import emission.tests.common as etc class TestLabelInferencePipeline(unittest.TestCase): # It is important that these functions be deterministic @@ -23,26 +21,26 @@ class TestLabelInferencePipeline(unittest.TestCase): def setUp(self): np.random.seed(61297777) - self.user_id = ecwu.User.fromEmail("test_july_22").uuid - self.reset_pipeline() + self.reset_all() + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-jul-22") self.run_pipeline(self.test_algorithms) time_range = estt.TimeQuery("metadata.write_ts", None, time.time()) - self.cleaned_trips = esda.get_entries(esda.CLEANED_TRIP_KEY, self.user_id, time_query=time_range) + self.cleaned_trips = esda.get_entries(esda.CLEANED_TRIP_KEY, self.testUUID, time_query=time_range) self.cleaned_id_to_trip = {trip.get_id(): trip for trip in self.cleaned_trips} - self.inferred_trips = esda.get_entries(esda.INFERRED_TRIP_KEY, self.user_id, time_query=time_range) + self.inferred_trips = esda.get_entries(esda.INFERRED_TRIP_KEY, self.testUUID, time_query=time_range) def tearDown(self): - self.reset_pipeline() + self.reset_all() def run_pipeline(self, algorithms): default_primary_algorithms = eacilp.primary_algorithms eacilp.primary_algorithms = algorithms - epi.run_intake_pipeline("single", [self.user_id]) + etc.runIntakePipeline(self.testUUID) eacilp.primary_algorithms = default_primary_algorithms - def reset_pipeline(self): - epr.reset_user_to_start(self.user_id, False) - + def reset_all(self): + etc.dropAllCollections(edb._get_current_db()) + # Tests that the fields from the cleaned trip are carried over into the inferred trip correctly def testPipelineIntegrity(self): self.assertEqual(len(self.inferred_trips), len(self.cleaned_trips)) @@ -56,7 +54,7 @@ def testPipelineIntegrity(self): # Tests that each of the (test) algorithms runs and saves to the database correctly def testIndividualAlgorithms(self): for trip in self.inferred_trips: - entries = esdt.get_sections_for_trip("inference/labels", self.user_id, trip.get_id()) + entries = esdt.get_sections_for_trip("inference/labels", self.testUUID, trip.get_id()) self.assertEqual(len(entries), len(self.test_algorithms)) for entry in entries: self.assertEqual(entry["data"]["trip_id"], trip.get_id()) @@ -69,7 +67,7 @@ def testIndividualAlgorithms(self): # Tests that the ensemble algorithm runs and saves to the database correctly def testEnsemble(self): for trip in self.inferred_trips: - entries = esdt.get_sections_for_trip("analysis/inferred_labels", self.user_id, trip.get_id()) + entries = esdt.get_sections_for_trip("analysis/inferred_labels", self.testUUID, trip.get_id()) self.assertEqual(len(entries), 1) entry = entries[0] # TODO: when we have a real ensemble implemented: @@ -81,5 +79,12 @@ def testEnsemble(self): # Tests that the final inferred labels in the inferred trip are the same as those given by the ensemble algorithm def testInferredTrip(self): for trip in self.inferred_trips: - entry = esdt.get_sections_for_trip("analysis/inferred_labels", self.user_id, trip.get_id())[0] + entry = esdt.get_sections_for_trip("analysis/inferred_labels", self.testUUID, trip.get_id())[0] self.assertEqual(trip["data"]["inferred_labels"], entry["data"]["prediction"]) + +def main(): + etc.configLogging() + unittest.main() + +if __name__ == "__main__": + main() \ No newline at end of file