Skip to content

Latest commit

 

History

History
139 lines (117 loc) · 3.93 KB

service.org

File metadata and controls

139 lines (117 loc) · 3.93 KB

Service

Compression

As the data gets transferred between machines via CSV files, it is reasonable to compress the old ones.

The required imports:

import json
import os
import pathlib
import subprocess
import time

import pandas as pd
import sqlalchemy as sa

from sqrt_data_service.api import settings, DBConn
from sqrt_data_service.models import FileHash
__all__ = ['archive']

So, first we need to group files by dates. When the group is full, it is to be compressed.

def get_date_group(timestamp):
    return timestamp // (60 * 60 * 24 * settings['archive']['days'])

The group is full if there is no chance of a file with this the today’s timestamp appearing in that group:

def get_files_to_compress():
    with DBConn.get_session() as db:
        file_entries = db.execute(sa.select(FileHash)).scalars()
        files = [
            f.file_name for f in file_entries if os.path.exists(f.file_name)
        ]

    df = pd.DataFrame(
        {
            "name": files,
            "date_group":
                [
                    get_date_group(pathlib.Path(f).stat().st_mtime)
                    for f in files
                ],
            "dir": [os.path.dirname(f) for f in files]
        }
    )

    current_date_group = get_date_group(time.time())
    current_date_group_delta = time.time(
    ) // (60 * 60 * 24) - current_date_group * settings['archive']['days']
    df = df[df.date_group != current_date_group]
    if current_date_group_delta <= settings['archive']['timeout']:
        df = df[df.date_group != current_date_group - 1]

    return [
        (date_group, dir, g.name.tolist())
        for (date_group, dir), g in df.groupby(['date_group', 'dir'])
        if dir not in settings['archive']['exclude_dirs']
    ]

And the function to archive the files according the grouping:

def compress(groups):
    if len(groups) == 0:
        logging.info('Nothing to archive')
        return

    with DBConn.get_session() as db:
        file_entries = db.execute(sa.select(FileHash)).scalars()
        files = [
            f.file_name for f in file_entries if os.path.exists(f.file_name)
        ]

        for date_group, dir, files in groups:
            archive_name = f'{os.path.relpath(os.path.dirname(files[0]), os.path.expanduser(settings["general"]["root"])).replace("/", "_")}_{int(date_group)}.tar.gz'
            logging.info(
                'Creating archive %s with %d files', archive_name, len(files)
            )
            subprocess.run(
                [
                    'tar', '-czvf', archive_name, '--remove-files',
                    *[os.path.relpath(f, dir) for f in files]
                ],
                check=True,
                cwd=dir
            )
        for f in list(files):
            if not os.path.exists(f):
                db.execute(sa.delete(FileHash).where(FileHash.file_name == f))
                logging.info('Removed %s from HashDict', f)
        db.commit()

The flow:

def archive():
    DBConn()
    groups = get_files_to_compress()
    compress(groups)

CLI

Create the deployment:

import click

from sqrt_data_service.api import settings

from .compress import archive

@click.group()
def service():
    pass

@service.command(help="Archive old files", name='archive')
def archive_cmd():
    archive()
from .cli import *
from .compress import *