Skip to content

Latest commit

 

History

History
977 lines (847 loc) · 31.1 KB

aw.org

File metadata and controls

977 lines (847 loc) · 31.1 KB

ActivityWatch stats

ActivityWatch is a FOSS time tracker software.

Agent

Usage:

  • Run python -m sqrt_data_agent.aw once a day

Saving (Desktop)

The agent exports ActivityWatch “buckets” into CSV files.

Some imports:

import socket
import argparse
import json
import logging
import os
from collections import deque
from datetime import datetime

import pandas as pd
import requests
import furl

from sqrt_data_agent.api import settings

Buckets have a lot of data, so I need to store the position in each bucket. A JSON file works fine for that.

def get_last_updated():
    data = {}
    if os.path.exists(os.path.expanduser(settings['aw']['last_updated'])):
        with open(os.path.expanduser(settings['aw']['last_updated']), 'r') as f:
            data = json.load(f)
    # return data.get(f'last_updated-{get_hostname()}', None)
    return data


def save_last_updated(data):
    os.makedirs(
        os.path.dirname(os.path.expanduser(settings['aw']['last_updated'])),
        exist_ok=True
    )
    hostname = socket.gethostname()
    data[f'last_updated-{hostname}'] = datetime.now().isoformat()
    with open(os.path.expanduser(settings['aw']['last_updated']), 'w') as f:
        json.dump(data, f)

Next, make a DataFrame from the bucket:

def get_data(bucket_id, last_updated=None):
    params = {}
    api = settings['aw']['api']
    if last_updated:
        params['start'] = last_updated
    r = requests.get(f'{api}/0/buckets/{bucket_id}')
    bucket = r.json()
    r = requests.get(f'{api}/0/buckets/{bucket_id}/events', params=params)
    data = deque()
    for event in r.json():
        hostname = bucket['hostname']
        if hostname == 'unknown':
            hostname = socket.gethostname()
        data.append(
            {
                'id': f"{bucket_id}-{event['id']}",
                'bucket_id': bucket['id'],
                'hostname': bucket['hostname'],
                'duration': event['duration'],
                'timestamp': pd.Timestamp(event['timestamp']),
                **event['data']
            }
        )
    if len(data) > 0:
        df = pd.DataFrame(data)
        df = df.set_index('id')
        return df
    return None

And perform this operation on all the required buckets.

def save_buckets(force=False):
    last_updated = get_last_updated()
    hostname = socket.gethostname()
    last_updated_time = last_updated.get(f'last_updated-{hostname}', None)
    if last_updated_time is not None:
        last_updated_date = datetime.fromisoformat(last_updated_time).date()
        if (datetime.now().date() == last_updated_date and not force):
            logging.info('Already loaded AW today')
            return
    r = requests.get(f'{settings["aw"]["api"]}/0/buckets')
    buckets = r.json()

    os.makedirs(
        os.path.expanduser(settings['aw']['logs_folder']), exist_ok=True
    )
    for bucket in buckets.values():
        if not bucket['type'] in settings['aw']['types']:
            continue
        if 'aw-watcher-web' in bucket['id']:
            last_updated_id = f'{bucket["id"]}-{socket.gethostname()}'
        else:
            last_updated_id = bucket['id']
        if bucket['last_updated'] == last_updated.get(last_updated_id, None):
            logging.info('Bucket %s already saved', bucket['id'])
            continue
        df = get_data(bucket['id'], last_updated.get(last_updated_id, None))
        last_updated[last_updated_id] = bucket['last_updated']
        if df is None:
            logging.info('Bucket %s is empty', bucket['id'])
            continue
        bucket_type = bucket['type'].replace('.', '_')
        hostname = bucket['hostname']
        if hostname == 'unknown':
            hostname = socket.gethostname()
        filename = os.path.join(
            os.path.expanduser(settings['aw']['logs_folder']),
            f"{bucket_type}-{hostname}-{bucket['last_updated']}.csv"
        )
        df.to_csv(filename)
        logging.info('Saved %s with %s events', filename, len(df))
    save_last_updated(last_updated)
def main():
    parser = argparse.ArgumentParser(
        prog='sqrt_data_agent.aw'
    )
    parser.add_argument('-f', '--force', action='store_true')
    args = parser.parse_args()
    save_buckets(args.force)

if __name__ == '__main__':
    main()

Saving (Android)

I couldn’t launch the desktop scripts on Android because of numpy, so I opted for manual export + syncing with FolderSync for now.

Models

The data model in the program is pretty reasonable. The top-level entity is called “bucket” and has the following attributes:

  • id
  • created - creation date
  • name
  • type - type of events in bucket
  • client - ID of the client software
  • hostname

And a list of events.

One event has the following attributes:

  • timestamp
  • duration - duration in seconds
  • data - a dictionary with details about the event.

The set of buckets on each machine is dependent on a particular setup. I’m interested in the following types on Desktop:

  • afkstatus (aw-watcher-afk)
    • status - “afk” or “not-afk”
  • currentwindow (aw-watcher-currentwindow)
    • app
    • title
  • app.editor.activity (activity-watch-mode)
    • file
    • project
    • language
  • web.tab.current (aw-watcher-web)
    • url
    • title
    • audible
    • incognito
    • tabCount

And in the following types on Android.

  • os.lockscreen.unlocks - no data
  • currentwindow
    • app
    • classname
    • package

Also, (thanks Erik for the comment), AW stores timestamps in the UTC format, so I add the location field to convert the timestamps as necessary.

Source models

I don’t get to use model inheritance all to often, but seems like it’s one case.

Here’s the general model:

import sqlalchemy as sa
from sqrt_data_service.models import Base

__all__ = ['Bucket']


class Bucket(Base):
    __table_args__ = {'schema': 'aw'}
    __abstract__ = True

    id = sa.Column(
        sa.String(256),
        primary_key=True,
    )
    bucket_id = sa.Column(sa.String(256), nullable=False)
    hostname = sa.Column(sa.String(256), nullable=False)
    location = sa.Column(sa.String(256), nullable=False)
    timestamp = sa.Column(sa.DateTime(), nullable=False)
    duration = sa.Column(sa.Float(), nullable=False)

And here are the models for specific bucket types:

import sqlalchemy as sa
from .bucket import Bucket

__all__ = ['AfkStatus']

class AfkStatus(Bucket):
    __tablename__ = 'afkstatus'
    __table_args__ = {'schema': 'aw'}

    status = sa.Column(sa.Boolean(), nullable=False)
import sqlalchemy as sa
from .bucket import Bucket

__all__ = ['CurrentWindow']

class CurrentWindow(Bucket):
    __tablename__ = 'currentwindow'
    __table_args__ = {'schema': 'aw'}

    app = sa.Column(sa.Text(), nullable=False)
    title = sa.Column(sa.Text(), nullable=False)
import sqlalchemy as sa
from .bucket import Bucket

__all__ = ['AppEditor']

class AppEditor(Bucket):
    __tablename__ = 'appeditor'
    __table_args__ = {'schema': 'aw'}

    file = sa.Column(sa.Text(), nullable=False)
    project = sa.Column(sa.Text(), nullable=False)
    language = sa.Column(sa.Text(), nullable=False)
import sqlalchemy as sa
from .bucket import Bucket

__all__ = ['WebTab']

class WebTab(Bucket):
    __tablename__ = 'webtab'
    __table_args__ = {'schema': 'aw'}

    url = sa.Column(sa.Text(), nullable=False)
    site = sa.Column(sa.Text(), nullable=False)
    url_no_params = sa.Column(sa.Text(), nullable=False)
    title = sa.Column(sa.Text(), nullable=False)
    audible = sa.Column(sa.Boolean(), nullable=False)
    incognito = sa.Column(sa.Boolean(), nullable=False)
    tab_count = sa.Column(sa.Integer(), nullable=True)
import sqlalchemy as sa
from .bucket import Bucket

__all__ = ['AndroidUnlock']

class AndroidUnlock(Bucket):
    __tablename__ = 'android_unlock'
    __table_args__ = {'schema': 'aw'}
import sqlalchemy as sa
from .bucket import Bucket

__all__ = ['AndroidCurrentWindow']

class AndroidCurrentWindow(Bucket):
    __tablename__ = 'android_currentwindow'
    __table_args__ = {'schema': 'aw'}

    app = sa.Column(sa.Text(), nullable=False)
    classname = sa.Column(sa.Text(), nullable=False)
    package = sa.Column(sa.Text(), nullable=False)

The corresponding __init__.py:

from .bucket import *
from .afkstatus import *
from .currentwindow import *
from .appeditor import *
from .webtab import *
from .android_unlock import *
from .android_currentwindow import *

Flows

The corresponding __init__.py:

Loading (Desktop)

The required imports:
import furl
import tldextract
import glob
import pandas as pd
import os
import re
import logging

from sqlalchemy.dialects.postgresql import insert as pg_insert
from tqdm import tqdm

from sqrt_data_service.api import settings, DBConn, FileHasher
from sqrt_data_service.models import Base
from sqrt_data_service.models.aw import AfkStatus, CurrentWindow, AppEditor, WebTab
from sqrt_data_service.common.locations import LocationMatcher
__all__ = ['aw_load_desktop']

Get all the dataframes to load:

def get_dataframes(db):
    files = glob.glob(
        f'{os.path.expanduser(settings["aw"]["logs_folder"])}/*.csv'
    )
    dfs_by_type = {}
    files_by_type = {}
    hasher = FileHasher()
    for f in tqdm(files):
        if not hasher.is_updated(f, db):
            continue
        try:
            df = pd.read_csv(f, lineterminator='\n', index_col=False)
        except pd.errors.ParserError:
            logging.error(f'Error parsing file: {f}')
            continue
        type_ = re.search(r'^\w+', os.path.basename(f)).group(0)
        try:
            dfs_by_type[type_].append(df)
            files_by_type[type_].append(f)
        except KeyError:
            dfs_by_type[type_] = [df]
            files_by_type[type_] = [f]
        hasher.save_hash(f, db)
    # for type, files in files_by_type.items():
    #     logging.info(f'AW {type}: {"; ".join(files)}')
    return dfs_by_type

Models by type:

MODELS = {
    'afkstatus': AfkStatus,
    'currentwindow': CurrentWindow,
    'app_editor_activity': AppEditor,
    'web_tab_current': WebTab
}

Preprocessing the records.

def safe_furl_no_params(url):
    try:
        return furl.furl(url).remove(args=True, fragment=True).url
    except ValueError:
        logging.warning('Bad URL: %s', url)
        return url

def get_records(type_, df):
    loc = LocationMatcher()
    if type_ == 'afkstatus':
        df['status'] = df['status'] == 'not-afk'
    if type_ == 'currentwindow':
        df['app'] = df['app'].apply(
            lambda app: settings['aw']['apps_convert'].get(app, app)
        )
    if type_ == 'web_tab_current':
        df = df.rename({'tabCount': 'tab_count'}, axis=1)
        df['site'] = [
            tldextract.extract(url).registered_domain
            for url in df['url']
        ]
        df['url_no_params'] = [
            safe_furl_no_params(url)
            for url in df['url']
        ]
    if type_ == 'app_editor_activity':
        if 'branch' in df.columns:
            df = df.drop('branch', axis=1)
    df['timestamp'] = pd.to_datetime(df['timestamp'], format='ISO8601')
    locations = df.apply(
        lambda row: loc.get_location(row.timestamp, row.hostname), axis=1
    )
    df['location'] = [l[0] for l in locations]
    df['timestamp'] = [l[1] for l in locations]
    return df.to_dict(orient='records')

Insert data:

def insert_data(type_, entries, db):
    db.execute(
        pg_insert(MODELS[type_]).values(entries).on_conflict_do_nothing()
    )

Perform the loading:

def aw_load_desktop():
    DBConn()
    DBConn.create_schema('aw', Base)
    with DBConn.get_session() as db:
        dfs_by_type = get_dataframes(db)

        for type_, dfs in tqdm(dfs_by_type.items()):
            for df in dfs:
                if len(df) > 10000:
                    logging.info(f'Inserting a large df ({len(df)}) of type "{type_}"')
                entries = get_records(type_, df)
                insert_data(type_, entries, db)
                logging.info(f'Inserted {len(entries)} records of type "{type_}"')
        db.commit()

Loading (Android)

As long as I have only one Android phone where I have ActivityWatch installed, I work with just one JSON file.

The required imports:

import json
import pandas as pd
import logging

from tqdm import tqdm

from sqrt_data_service.api import settings, DBConn, FileHasher
from sqrt_data_service.models import Base
from sqrt_data_service.common.locations import LocationMatcher
__all__ = ['aw_load_android']

The function to get dataframes from the JSON file:

def get_dataframes(db):
    hasher = FileHasher()
    if not hasher.is_updated(settings["aw"]["android_file"], db):
        logging.info('Android already loaded')
        return
    dfs_by_type = {}
    with open(settings["aw"]["android_file"], 'r') as f:
        data = json.load(f)
        buckets = data['buckets']
        for bucket in buckets.values():
            df = pd.DataFrame(
                [
                    {
                        'id': f"{bucket['id']}-{event['id']}",
                        'bucket_id': bucket['id'],
                        'hostname': bucket['hostname'],
                        'duration': event['duration'],
                        'timestamp': pd.Timestamp(event['timestamp']),
                        **event['data'],
                    } for event in bucket['events']
                ]
            )
            df = df.set_index('id')
            dfs_by_type[bucket['type']] = df
    return dfs_by_type

Also, pre-processing the records.

def get_records(type_, df):
    loc = LocationMatcher()
    df['timestamp'] = pd.to_datetime(df['timestamp'], format='ISO8601')
    locations = df.apply(
        lambda row: loc.get_location(row.timestamp, row.hostname), axis=1
    )
    df['location'] = [l[0] for l in locations]
    df['timestamp'] = [l[1] for l in locations]
    return df

And the flow:

TABLE_NAMES = {
    'os.lockscreen.unlocks': 'android_unlock',
    'currentwindow': 'android_currentwindow'
}

def aw_load_android():
    DBConn()
    DBConn.create_schema('aw', Base)

    hasher = FileHasher()
    with DBConn.get_session() as db:
        dfs_by_type = get_dataframes(db)

        if dfs_by_type is None:
            return

        for type_, df in tqdm(dfs_by_type.items()):
            df = get_records(type_, df)
            df.to_sql(
                TABLE_NAMES[type_],
                schema=settings['aw']['schema'],
                con=DBConn.engine,
                if_exists='replace'
            )
            print(df)
        hasher.save_hash(settings["aw"]["android_file"])
        db.commit()

Post-processing

The post-processing logic turned out to be rather complex… But well.

One issue is that data from ActivityWatch is somewhat scattered, so the following operations are necessary:

  1. filter active windows by not-afk status
  2. filter active browser tabs by not-afk status & active browser window

Also, there’s a lot of data, so some pre-aggregation is necessary. Ended up implementing this in PL/pgSQL for some reason.

So first, initialize the tables the first level of post-processed data:

drop procedure if exists aw.init_postprocessing();
create procedure aw.init_postprocessing()
    language plpgsql as
$$
begin
    drop table if exists aw.notafkwindow cascade;
    drop table if exists aw.notafktab cascade;
    drop table if exists aw._notafkwindow_meta cascade;
    create table aw.notafkwindow (like aw.currentwindow including all);
    create table aw.notafktab (like aw.webtab including all);
    create table aw._notafkwindow_meta (
        date date primary key,
        count int8
    );

    CREATE OR REPLACE VIEW aw._notafkwindow_meta_diff AS
    WITH current_meta AS (
        select date(timestamp) date, count(*) count
        FROM aw.currentwindow
        GROUP BY date(timestamp)
        ORDER BY date ASC
    )
    SELECT CM.date
    FROM current_meta CM
             LEFT JOIN aw._notafkwindow_meta OM ON CM.date = OM.date
    WHERE CM.count != OM.count OR OM.count IS NULL;
end;
$$;

Next, filter the list of active windows. If:

  • the interval of using a program overlaps with the interval of being non-afk, or
  • the interval of using a specified program (aw.skip.afk.apps, aw.skip_afk_titles) overlaps with the interval of being afk less than aw.skip_afk_interval

Add that interval to the resulting table. The duration of the new interval is the duration of the overlap.

After some time, I’ve decided to add the intervals of being AFK to this table as well, but with title & app equal to AFK. So first, the function to check the AFK status:

drop function if exists aw.is_afk;
create function aw.is_afk(status bool, duration float, app text, title text) returns bool
    language plpgsql as
$$
begin
    return status = true
        OR (status = false AND duration < current_setting('aw.skip_afk_interval')::int AND
            (app ~ current_setting('aw.skip_afk_apps') OR title ~ current_setting('aw.skip_afk_titles')));
end;
$$;

I iterated through a few implementations of this part, and the most elegant way seems to be to do a join on the overlaps operator. CTEs are meant to increase the performance, because otherwise doing such a join on tables with around a million records is quite expensive.

drop function if exists aw.get_notafkwindow;
create function aw.get_notafkwindow(start_date timestamp, end_date timestamp)
    returns table
            (
                like aw.currentwindow
            )
    language plpgsql
AS
$$
begin
    RETURN QUERY
        WITH A AS (SELECT * FROM aw.afkstatus WHERE timestamp BETWEEN start_date AND end_date),
             C AS (SELECT * FROM aw.currentwindow WHERE timestamp BETWEEN start_date AND end_date)
        SELECT concat('afkw-', substring(C.id from '[0-9]+$'), '-', substring(A.id from '[0-9]+$'))::varchar(256) id,
               C.bucket_id,
               C.hostname,
               C.location,
               case
                   when A.timestamp > C.timestamp then A.timestamp
                   else C.timestamp end AS                                                                        timestamp,
               extract(epoch from
                       least(C.timestamp + C.duration * interval '1 second',
                             A.timestamp + A.duration * interval '1 second') -
                       greatest(A.timestamp, C.timestamp))::double precision                                      duration,
               case
                   when aw.is_afk(A.status, A.duration, app, title) then C.app
                   else 'AFK' end       as                                                                        app,
               case
                   when aw.is_afk(A.status, A.duration, app, title) then C.title
                   else 'AFK' end       as                                                                        title
        FROM A
                 INNER JOIN C ON
                ((A.timestamp, A.timestamp + A.duration * interval '1 second')
                    overlaps
                 (C.timestamp, C.timestamp + C.duration * interval '1 second')) AND A.hostname = C.hostname
        ORDER BY timestamp DESC;
end;
$$;

And store all of that into the final table. I used to have a materialized view here, but it doesn’t scale well, so I’ve ended up doing day-by-day processing.

The view to get the list of unprocessed days resides in init_postprocessing.

The procedure to perform the processing:

drop procedure if exists aw.postprocess_notafkwindow;
create procedure aw.postprocess_notafkwindow()
    language plpgsql AS
$$
DECLARE
    date date;
begin
    FOR date IN SELECT * FROM aw._notafkwindow_meta_diff
        LOOP
            DELETE FROM aw.notafkwindow WHERE date(timestamp) = date;
            INSERT INTO aw.notafkwindow
            SELECT *
            FROM aw.get_notafkwindow(date, date + interval '1 day')
            ON CONFLICT (id) DO UPDATE SET timestamp = EXCLUDED.timestamp, duration = EXCLUDED.duration;
        end loop;
    DELETE FROM aw._notafkwindow_meta;
    INSERT INTO aw._notafkwindow_meta
    select date(timestamp) date, count(*) count
    FROM aw.currentwindow
    GROUP BY date(timestamp)
    ORDER BY date;
end;
$$;

And one materialized view to aggregate the window data and improve the dashboard performance a bit:

drop procedure if exists aw.create_afkwindow_views();
create procedure aw.create_afkwindow_views()
    language plpgsql as
$$
begin
    CREATE MATERIALIZED VIEW aw.notafkwindow_group AS
    SELECT hostname, location, date(timestamp) date, sum(duration) / (60) total_minutes, app, title
    FROM aw.notafkwindow
    GROUP BY hostname, location, date(timestamp), app, title;
end;
$$;

As for the browser data, one materialized view seems enough for the current quantities. I’ll probably optimize this in a year or so.

One problem here is that timestamps from the browser tab watcher do not quite align with ones from the current window watcher, so calculating overlaps between them gives deflated results. So I truncate the intervals from the current window data to 1 minute.

drop procedure if exists aw.create_browser_views();
create procedure aw.create_browser_views()
    language plpgsql as
$$
begin
    CREATE MATERIALIZED VIEW aw.webtab_active AS
    WITH W AS (
        SELECT *
        FROM aw.notafkwindow
        WHERE app ~ current_setting('aw.webtab_apps')
        ORDER BY timestamp
    ),
         T AS (SELECT * FROM aw.webtab WHERE url !~ current_setting('aw.skip_urls')),
    res AS (
    SELECT T.bucket_id,
           T.location,
           greatest(W.timestamp, T.timestamp) AS       timestamp,
           extract(epoch from
                   least(T.timestamp + T.duration * interval '1 second',
                         W.timestamp + W.duration * interval '1 second') -
                   greatest(W.timestamp, T.timestamp)) duration,
           T.url,
           T.site,
           T.url_no_params,
           T.title,
           T.audible,
           T.tab_count
    FROM T
             INNER JOIN W ON
        ((W.timestamp, W.timestamp + W.duration * interval '1 second')
            overlaps
         (T.timestamp, T.timestamp + T.duration * interval '1 second'))
    ORDER BY T.timestamp, T.id)
    SELECT * FROM res;
    CREATE MATERIALIZED VIEW aw.webtab_group AS
    SELECT location, date(timestamp) date, sum(duration) / (60) total_minutes, site, url_no_params, title, audible, tab_count
    FROM aw.webtab_active
    GROUP BY location, date(timestamp), site, url_no_params, title, audible, tab_count;
end
$$;

The Python part sets the database settings from the configuration file and executes the stuff above. I wanted to make a separate .sql file for that, but that would make packaging more complicated, so here goes noweb.

from sqrt_data_service.api import settings, DBConn

__all__ = ['aw_postprocessing_init', 'aw_postprocessing_dispatch']

SQL = """
<<postprocess-sql>>
"""

def update_settings(db):
    db.execute(
        f"""
    SELECT set_config('aw.skip_afk_interval', '{settings['aw']['skip_afk_interval']}', false);
    SELECT set_config('aw.skip_afk_apps', '{settings['aw']['skip_afk_apps']}', false);
    SELECT set_config('aw.skip_afk_titles', '{settings['aw']['skip_afk_titles']}', false);
    SELECT set_config('aw.webtab_apps', '{settings['aw']['webtab_apps']}', false);
    SELECT set_config('aw.skip_urls', '{settings['aw']['skip_urls']}', false);
    """
    )

def init_postprocessing(db):
    db.execute("CALL aw.init_postprocessing();")

def create_afkwindow_views(db):
    db.execute("CALL aw.create_afkwindow_views();")


def create_browser_views(db):
    db.execute("CALL aw.create_browser_views();")


def postprocess_notafkwindow(db):
    db.execute("CALL aw.postprocess_notafkwindow();")

def refresh_notafkwindow(db):
    db.execute("REFRESH MATERIALIZED VIEW aw.notafkwindow_group;")

def refresh_webtab(db):
    db.execute("REFRESH MATERIALIZED VIEW aw.webtab_active;")
    db.execute("REFRESH MATERIALIZED VIEW aw.webtab_group;")

def aw_postprocessing_init():
    DBConn()
    with DBConn.get_session() as db:
        db.execute(SQL)
        update_settings(db)
        init_postprocessing(db)
        create_afkwindow_views(db)
        # create_browser_views
        db.commit()

def aw_postprocessing_dispatch():
    DBConn()
    with DBConn.get_session() as db:
        update_settings(db)
        postprocess_notafkwindow(db)
        refresh_notafkwindow(db)
        # refresh_webtab(db)
        db.commit()

App Interval

Convert stats for certain apps to be compatible with WakaTime.
from tqdm import tqdm
from sqrt_data_service.api import settings, DBConn

import sqlalchemy as sa
import pandas as pd
__all__ = ['process_app_intervals']

Get all timestamps for selected apps

def extract_data(db=None):
    apps = ', '.join([f"'{app}'" for app in settings.aw.app_interval.apps])
    sql = f"SELECT app, timestamp FROM aw.notafkwindow WHERE app in ({apps}) ORDER BY timestamp ASC"
    with DBConn.ensure_session(db) as db:
        data = db.execute(sa.text(sql)).all()

    app_timestamps = {}
    for app, timestamp in data:
        try:
            app_timestamps[app].append(timestamp)
        except KeyError:
            app_timestamps[app] = [timestamp]
    return app_timestamps

Get total length of intervals per day:

def process_data(app_timestamps):
    time_by_day = {}
    for app, timestamps in app_timestamps.items():
        intervals = []
        start = timestamps[0]
        end = timestamps[0]
        for timestamp in tqdm(timestamps[1:]):
            delta = (timestamp - end).total_seconds()
            if delta > settings.aw.app_interval.interval:
                if end > start:
                    intervals.append((start, end))
                start = timestamp
                end = timestamp
            else:
                end = timestamp
        if end > start:
            intervals.append((start, end))

        time_by_day[app] = {}
        for start, end in intervals:
            date = start.date()
            delta = (end - start).total_seconds()
            try:
                time_by_day[app][date] += delta
            except KeyError:
                time_by_day[app][date] = delta
    return time_by_day

Save data:

def save_data(time_by_day):
    data = []
    for app, times_per_app in time_by_day.items():
        for date, seconds in times_per_app.items():
            data.append((app, date, seconds))
    df = pd.DataFrame(data, columns=["app", "date", "seconds"])
    df.to_sql(
        "intervals",
        schema=settings["aw"]["schema"],
        con=DBConn.engine,
        if_exists="replace",
    )
def process_app_intervals():
    DBConn()
    with DBConn.get_session() as db:
        raw_data = extract_data(db)
        time_by_day = process_data(raw_data)
        save_data(time_by_day)

Final flow

The flow that executes all other flows.

import argparse

from .app_intervals import process_app_intervals
from .load import aw_load_desktop
from .load_android import aw_load_android
from .postprocessing import aw_postprocessing_init, aw_postprocessing_dispatch


__all__ = ['aw_process']

def aw_process(init=False):
    aw_load_desktop()
    aw_load_android()
    if init:
        aw_postprocessing_init()
    aw_postprocessing_dispatch()
    process_app_intervals()

CLI & Init

Click module:

import click

from sqrt_data_service.api import settings

from .app_intervals import process_app_intervals
from .load import aw_load_desktop
from .load_android import aw_load_android
from .postprocessing import aw_postprocessing_init, aw_postprocessing_dispatch
from .main import aw_process


__all__ = ["aw"]

@click.group()
def aw():
    pass

@aw.command(help="Load desktop data", name="load-desktop")
def aw_load_desktop_cmd():
    aw_load_desktop()

@aw.command(help="Load android data", name="load-android")
def aw_load_android_cmd():
    aw_load_android()

@aw.command(help="Process app intervals", name="process-app-intervals")
def aw_process_app_intervals_cmd():
    process_app_intervals()

@aw.command(help="Postprocessing init", name="postprocessing-init")
def aw_postprocessing_init_cmd():
    aw_postprocessing_init()

@aw.command(help="Postprocessing dispatch", name="postprocessing-dispatch")
def aw_postprocessing_dispatch_cmd():
    aw_postprocessing_dispatch()

@aw.command(help="Process all", name="process-all")
def aw_process_all_cmd():
    aw_process()

And __init__.py:

from .app_intervals import *
from .cli import *
from .load_android import *
from .load import *
from .main import *
from .postprocessing import *