Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement label inference pipeline #825

Merged
merged 8 commits into from
Jul 8, 2021
42 changes: 21 additions & 21 deletions emission/analysis/classification/inference/labels/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def placeholder_predictor_1(trip):

# This third scenario provides labels designed to test the soundness and resilience of
# the client-side inference processing algorithms.
trip_n = 0 # ugly use of globals for testing only
def placeholder_predictor_2(trip):
global trip_n # ugly use of globals for testing only
trip_n %= 6
trip_n += 1
# Hardcoded to match "test_july_22" -- clearly, this is just for testing
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
shankari marked this conversation as resolved.
Show resolved Hide resolved
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
return [
[

Expand Down Expand Up @@ -85,14 +85,13 @@ def placeholder_predictor_2(trip):
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][6-trip_n]
][index]

# For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which
# runs on the results of other algorithms), primary_algorithms specifies a corresponding
# function to run. This makes it easy to plug in additional algorithms later.
primary_algorithms = {
# This can be edited to select a different placeholder predictor
ecwl.AlgorithmTypes.PLACEHOLDER: placeholder_predictor_2
ecwl.AlgorithmTypes.PLACEHOLDER_2: placeholder_predictor_2
}

# Code structure based on emission.analysis.classification.inference.mode.pipeline
Expand All @@ -112,19 +111,20 @@ def run_prediction_pipeline(self, user_id, time_range):
self.toPredictTrips = esda.get_entries(
esda.CLEANED_TRIP_KEY, user_id, time_query=time_range)
for cleaned_trip in self.toPredictTrips:
results = self.compute_and_save_algorithms(cleaned_trip)
ensemble = self.compute_and_save_ensemble(cleaned_trip, results)

# Create an inferred trip entry in the database
inferred_trip = copy.copy(cleaned_trip);
del inferred_trip["_id"];
inferred_trip["metadata"]["key"] = "analysis/inferred_trip"
# Create an inferred trip
cleaned_trip_dict = copy.copy(cleaned_trip)["data"]
inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict)

# Run the algorithms and the ensemble, store results
results = self.compute_and_save_algorithms(inferred_trip)
ensemble = self.compute_and_save_ensemble(inferred_trip, results)

# Put final results into the inferred trip and store it
inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id()
inferred_trip["data"]["inferred_labels"] = ensemble["prediction"]
self.ts.insert(ecwe.Entry(inferred_trip))

self.ts.insert(inferred_trip)

if self._last_trip_done is None or self._last_trip_done.data.end_ts < cleaned_trip.data.end_ts:
if self._last_trip_done is None or self._last_trip_done["data"]["end_ts"] < cleaned_trip["data"]["end_ts"]:
self._last_trip_done = cleaned_trip

# This is where the labels for a given trip are actually predicted.
Expand All @@ -138,8 +138,8 @@ def compute_and_save_algorithms(self, trip):
lp.trip_id = trip.get_id()
lp.algorithm_id = algorithm_id
lp.prediction = prediction
lp.start_ts = trip.data.start_ts
lp.end_ts = trip.data.end_ts
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
return predictions
Expand All @@ -153,8 +153,8 @@ def compute_and_save_ensemble(self, trip, predictions):
# Since this is not a real ensemble yet, we will not mark it as such
# il.algorithm_id = ecwl.AlgorithmTypes.ENSEMBLE
il.algorithm_id = ecwl.AlgorithmTypes(predictions[0]["algorithm_id"])
il.start_ts = trip.data.start_ts
il.end_ts = trip.data.end_ts
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]

il.prediction = copy.copy(predictions[0]["prediction"])

Expand Down
4 changes: 3 additions & 1 deletion emission/core/wrapper/labelprediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

class AlgorithmTypes(enum.Enum):
ENSEMBLE = 0
PLACEHOLDER = 1
PLACEHOLDER_0 = 1
PLACEHOLDER_1 = 2
PLACEHOLDER_2 = 3


class Labelprediction(ecwb.WrapperBase):
Expand Down
85 changes: 85 additions & 0 deletions emission/tests/pipelineTests/TestLabelInferencePipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# This tests the label inference pipeline. It uses fake data and placeholder inference algorithms
# and thus intentionally does not test the real inference algorithms themselves.
import unittest
import numpy as np
from logging import config # epi.run_intake_pipeline needs this to be done here
import time

import emission.core.wrapper.user as ecwu
import emission.pipeline.intake_stage as epi
import emission.pipeline.reset as epr
import emission.analysis.classification.inference.labels.pipeline as eacilp
import emission.core.wrapper.labelprediction as ecwl
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.storage.timeseries.timequery as estt
import emission.storage.decorations.trip_queries as esdt

class TestLabelInferencePipeline(unittest.TestCase):
shankari marked this conversation as resolved.
Show resolved Hide resolved
# It is important that these functions be deterministic
test_algorithms = {
ecwl.AlgorithmTypes.PLACEHOLDER_0: eacilp.placeholder_predictor_0,
ecwl.AlgorithmTypes.PLACEHOLDER_2: eacilp.placeholder_predictor_2
}

def setUp(self):
np.random.seed(61297777)
self.user_id = ecwu.User.fromEmail("test_july_22").uuid
shankari marked this conversation as resolved.
Show resolved Hide resolved
self.reset_pipeline()
self.run_pipeline(self.test_algorithms)
time_range = estt.TimeQuery("metadata.write_ts", None, time.time())
self.cleaned_trips = esda.get_entries(esda.CLEANED_TRIP_KEY, self.user_id, time_query=time_range)
self.cleaned_id_to_trip = {trip.get_id(): trip for trip in self.cleaned_trips}
self.inferred_trips = esda.get_entries(esda.INFERRED_TRIP_KEY, self.user_id, time_query=time_range)

def tearDown(self):
self.reset_pipeline()
shankari marked this conversation as resolved.
Show resolved Hide resolved

def run_pipeline(self, algorithms):
default_primary_algorithms = eacilp.primary_algorithms
eacilp.primary_algorithms = algorithms
epi.run_intake_pipeline("single", [self.user_id])
eacilp.primary_algorithms = default_primary_algorithms

def reset_pipeline(self):
epr.reset_user_to_start(self.user_id, False)

# Tests that the fields from the cleaned trip are carried over into the inferred trip correctly
def testPipelineIntegrity(self):
self.assertEqual(len(self.inferred_trips), len(self.cleaned_trips))
for inferred_trip in self.inferred_trips:
cleaned_id = inferred_trip["data"]["cleaned_trip"]
self.assertIn(cleaned_id, self.cleaned_id_to_trip.keys())
cleaned_trip = self.cleaned_id_to_trip[cleaned_id]
self.assertEqual(inferred_trip["data"]["raw_trip"], cleaned_trip["data"]["raw_trip"])
self.assertTrue(inferred_trip["data"]["inferred_labels"]) # Check for existence here, check for congruence later

# Tests that each of the (test) algorithms runs and saves to the database correctly
def testIndividualAlgorithms(self):
for trip in self.inferred_trips:
entries = esdt.get_sections_for_trip("inference/labels", self.user_id, trip.get_id())
self.assertEqual(len(entries), len(self.test_algorithms))
for entry in entries:
self.assertEqual(entry["data"]["trip_id"], trip.get_id())
this_algorithm = ecwl.AlgorithmTypes(entry["data"]["algorithm_id"])
self.assertIn(this_algorithm, self.test_algorithms)
self.assertEqual(entry["data"]["prediction"], self.test_algorithms[this_algorithm](trip))
self.assertEqual(entry["data"]["start_ts"], trip["data"]["start_ts"])
self.assertEqual(entry["data"]["end_ts"], trip["data"]["end_ts"])

# Tests that the ensemble algorithm runs and saves to the database correctly
def testEnsemble(self):
for trip in self.inferred_trips:
entries = esdt.get_sections_for_trip("analysis/inferred_labels", self.user_id, trip.get_id())
self.assertEqual(len(entries), 1)
entry = entries[0]
# TODO: when we have a real ensemble implemented:
# self.assertEqual(entry["data"]["algorithm_id"], ecwl.AlgorithmTypes.ENSEMBLE)
# TODO: perhaps assert something about the prediction when we have a real ensemble
self.assertEqual(entry["data"]["start_ts"], trip["data"]["start_ts"])
self.assertEqual(entry["data"]["end_ts"], trip["data"]["end_ts"])

# Tests that the final inferred labels in the inferred trip are the same as those given by the ensemble algorithm
def testInferredTrip(self):
for trip in self.inferred_trips:
entry = esdt.get_sections_for_trip("analysis/inferred_labels", self.user_id, trip.get_id())[0]
self.assertEqual(trip["data"]["inferred_labels"], entry["data"]["prediction"])