Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job start and end time #30

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ curl http://localhost:6800/listjobs.json
{
"finished":[],
"pending":[],
"running":[{"id":"e9b81fccbec211eeb3b109f30f136c01","project":"example","spider":"quotes","state":"running"}],
"running":[{"id":"e9b81fccbec211eeb3b109f30f136c01","project":"example","spider":"quotes","state":"running", "start_time": "2012-09-12 10:14:03.594664"}],
"status":"ok"
}
```
Expand Down
55 changes: 34 additions & 21 deletions scrapyd_k8s/launcher/docker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import re
import os
import socket
from datetime import datetime
from typing import Optional

import docker
from docker.models.containers import Container

from scrapyd_k8s.settings import TIME_FORMAT

from ..utils import native_stringify_dict

class Docker:
Expand Down Expand Up @@ -55,36 +60,44 @@ def schedule(self, project, version, spider, job_id, settings, args):
)

def cancel(self, project_id, job_id, signal):
c = self._get_container(project_id, job_id)
if not c:
container = self._get_container(project_id, job_id)
if not container:
return None

prevstate = self._docker_to_scrapyd_status(c.status)
if c.status == 'created' or c.status == 'scheduled':
c.remove()
elif c.status == 'running':
c.kill(signal='SIG' + signal)
prevstate = self._docker_to_scrapyd_status(container.status)
if container.status == 'created' or container.status == 'scheduled':
container.remove()
elif container.status == 'running':
container.kill(signal='SIG' + signal)
leewesleyv marked this conversation as resolved.
Show resolved Hide resolved
return prevstate

def _parse_job(self, c):
return {
'id': c.labels.get(self.LABEL_JOB_ID),
'state': self._docker_to_scrapyd_status(c.status),
'project': c.labels.get(self.LABEL_PROJECT),
'spider': c.labels.get(self.LABEL_SPIDER)
def _parse_job(self, container: Container):
state = self._docker_to_scrapyd_status(container.status)
job = {
'id': container.labels.get(self.LABEL_JOB_ID),
'state': state,
'project': container.labels.get(self.LABEL_PROJECT),
'spider': container.labels.get(self.LABEL_SPIDER),
}
wvengen marked this conversation as resolved.
Show resolved Hide resolved

def _get_container(self, project_id, job_id):
filters = { 'label': self.LABEL_JOB_ID + '=' + job_id }
c = self._docker.containers.list(all=True, filters=filters)
if not c:
if state in ['running', 'finished']:
job['start_time'] = datetime.fromisoformat(container.attrs['State']['StartedAt']).strftime(TIME_FORMAT)
if state == 'finished':
job['end_time'] = datetime.fromisoformat(container.attrs['State']['FinishedAt']).strftime(TIME_FORMAT)

return job

def _get_container(self, project_id: str, job_id: str) -> Optional[Container]:
filters = {'label': self.LABEL_JOB_ID + '=' + job_id}
containers = self._docker.containers.list(all=True, filters=filters)
if not containers:
leewesleyv marked this conversation as resolved.
Show resolved Hide resolved
return None
c = c[0]

if c.labels.get(self.LABEL_PROJECT) != project_id:
container = containers[0]
if container.labels.get(self.LABEL_PROJECT) != project_id:
return None

return c
return container

def _docker_to_scrapyd_status(self, status):
return self.STATUS_MAP.get(status, status)
Expand Down
35 changes: 24 additions & 11 deletions scrapyd_k8s/launcher/k8s.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
from signal import Signals
from typing import Optional

import kubernetes
import kubernetes.stream
from signal import Signals
from subprocess import check_output, CalledProcessError
from kubernetes.client.models import V1Job

from scrapyd_k8s.settings import TIME_FORMAT

from ..utils import native_stringify_dict

Expand Down Expand Up @@ -121,15 +124,26 @@ def cancel(self, project, job_id, signal):
)
return prevstate

def _parse_job(self, job):
return {
def _parse_job(self, job: V1Job):
state = self._k8s_job_to_scrapyd_status(job)
_job = {
'id': job.metadata.labels.get(self.LABEL_JOB_ID),
'state': self._k8s_job_to_scrapyd_status(job),
'state': state,
'project': job.metadata.labels.get(self.LABEL_PROJECT),
'spider': job.metadata.labels.get(self.LABEL_SPIDER)
'spider': job.metadata.labels.get(self.LABEL_SPIDER),
}

def _get_job(self, project, job_id):
if state in ['running', 'finished']:
_job['start_time'] = job.status.start_time.strftime(TIME_FORMAT)
if state == 'finished':
# Finished is mapped to succeeded and pending. Only when a job has succeeded will it have a
# completion time.
completion_time = job.status.completion_time
_job['end_time'] = completion_time.strftime(TIME_FORMAT) if completion_time else None

return _job

def _get_job(self, project, job_id) -> Optional[V1Job]:
label = self.LABEL_JOB_ID + '=' + job_id
r = self._k8s_batch.list_namespaced_job(namespace=self._namespace, label_selector=label)
if not r or not r.items:
Expand Down Expand Up @@ -170,13 +184,12 @@ def _k8s_job_name(self, project, job_id):

def _k8s_kill(self, pod_name, signal):
# exec needs stream, which modifies client, so use separate instance
k8s = kubernetes.client.CoreV1Api()
resp = kubernetes.stream.stream(
k8s.connect_get_namespaced_pod_exec,
kubernetes.stream.stream(
leewesleyv marked this conversation as resolved.
Show resolved Hide resolved
self._k8s.connect_get_namespaced_pod_exec,
pod_name,
namespace=self._namespace,
# this is a bit blunt, bit it works and is usually available
command=['/usr/sbin/killall5', '-' + str(signal)],
stderr=True
)
# TODO figure out how to get return value
# TODO figure out how to get return value
1 change: 1 addition & 0 deletions scrapyd_k8s/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%f'
leewesleyv marked this conversation as resolved.
Show resolved Hide resolved
26 changes: 21 additions & 5 deletions test_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/env python3
import os
import pytest
import requests
import time
from datetime import datetime

import requests

from scrapyd_k8s.settings import TIME_FORMAT

BASE_URL = os.getenv('TEST_BASE_URL', 'http://localhost:6800')
AVAIL_PROJECTS = os.getenv('TEST_AVAILABLE_PROJECTS', 'example').split(',')
Expand Down Expand Up @@ -159,7 +161,11 @@ def test_scenario_cancel_running_finished_ok():
# wait until the job has stopped
listjobs_wait(jobid, 'finished')
jobinfo = assert_listjobs(finished=jobid)
assert jobinfo == { 'id': jobid, 'project': RUN_PROJECT, 'spider': RUN_SPIDER, 'state': 'finished' }
assert jobinfo['id'] == jobid
assert jobinfo['project'] == RUN_PROJECT
assert jobinfo['spider'] == RUN_SPIDER
assert jobinfo['state'] == 'finished'
assert datetime.strptime(jobinfo['start_time'], TIME_FORMAT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep the previous form, so that if anything is added to the dict, you are aware that a test needs changing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update this. Would need to look a bit more into mocking the start_time and end_time of the jobs during tests, since a dict comparison will require the exact start_time and end_time of the container/pods. Or maybe a different solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense.
What about removing the time fields from the dict under test, and testing these separately? That would both allow us to test the full response, and allow proper time comparison.
Or use a time-freezing testing library (there must exist one).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using freezegun to set a specific time during testing, but since we are talking to the kubernetes/docker APIs and not using the datetime module, this solution did not work. I will do a bit more digging. The first solution should also work though, but it might be better to see if there are alternative solutions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that totally makes sense! I'm not for mocking the result in the component under test, so let's fix this in the testing part.

# then cancel it again, though nothing would happen
response = requests.post(BASE_URL + '/cancel.json', data={ 'project': RUN_PROJECT, 'job': jobid })
assert_response_ok(response)
Expand All @@ -178,12 +184,22 @@ def scenario_regular(schedule_args):
# wait until the job is running
listjobs_wait(jobid, 'running')
jobinfo = assert_listjobs(running=jobid)
assert jobinfo == { 'id': jobid, 'project': RUN_PROJECT, 'spider': RUN_SPIDER, 'state': 'running' }
assert jobinfo['id'] == jobid
assert jobinfo['project'] == RUN_PROJECT
assert jobinfo['spider'] == RUN_SPIDER
assert jobinfo['state'] == 'running'
assert datetime.strptime(jobinfo['start_time'], TIME_FORMAT)
assert not jobinfo.get('end_time')
# wait until the job has finished
listjobs_wait(jobid, 'finished')
# check listjobs output
jobinfo = assert_listjobs(finished=jobid)
assert jobinfo == { 'id': jobid, 'project': RUN_PROJECT, 'spider': RUN_SPIDER, 'state': 'finished' }
assert jobinfo['id'] == jobid
assert jobinfo['project'] == RUN_PROJECT
assert jobinfo['spider'] == RUN_SPIDER
assert jobinfo['state'] == 'finished'
assert datetime.strptime(jobinfo['start_time'], TIME_FORMAT)
assert datetime.strptime(jobinfo['end_time'], TIME_FORMAT)

def assert_response_ok(response):
assert response.status_code == 200
Expand Down
Loading