Skip to content

Commit

Permalink
Refactored ps() function in test_posix (#1341)
Browse files Browse the repository at this point in the history
* refactored ps() function that attepts to abstract away more platform-specific differences in the ps command

* move open_text an open_binary tests to test_misc
  • Loading branch information
E. M. Bray authored and giampaolo committed Sep 26, 2018
1 parent 39811bb commit 772c675
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 51 deletions.
8 changes: 0 additions & 8 deletions psutil/tests/test_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -1978,14 +1978,6 @@ def test_cpu_affinity_eligible_cpus(self):
@unittest.skipIf(not LINUX, "LINUX only")
class TestUtils(unittest.TestCase):

def test_open_text(self):
with psutil._psplatform.open_text(__file__) as f:
self.assertEqual(f.mode, 'rt')

def test_open_binary(self):
with psutil._psplatform.open_binary(__file__) as f:
self.assertEqual(f.mode, 'rb')

def test_readlink(self):
with mock.patch("os.readlink", return_value="foo (deleted)") as m:
self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
Expand Down
10 changes: 10 additions & 0 deletions psutil/tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from psutil._common import memoize_when_activated
from psutil._common import supports_ipv6
from psutil._common import wrap_numbers
from psutil._common import open_text
from psutil._common import open_binary
from psutil._compat import PY3
from psutil.tests import APPVEYOR
from psutil.tests import bind_socket
Expand Down Expand Up @@ -896,6 +898,14 @@ def setUp(self):

tearDown = setUp

def test_open_text(self):
with open_text(__file__) as f:
self.assertEqual(f.mode, 'rt')

def test_open_binary(self):
with open_binary(__file__) as f:
self.assertEqual(f.mode, 'rb')

def test_safe_mkdir(self):
safe_mkdir(TESTFN)
assert os.path.isdir(TESTFN)
Expand Down
105 changes: 62 additions & 43 deletions psutil/tests/test_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import os
import re
import subprocess
import sys
import time

import psutil
Expand All @@ -23,7 +22,6 @@
from psutil import OPENBSD
from psutil import POSIX
from psutil import SUNOS
from psutil._compat import PY3
from psutil.tests import APPVEYOR
from psutil.tests import get_kernel_version
from psutil.tests import get_test_subprocess
Expand All @@ -40,23 +38,54 @@
from psutil.tests import which


def ps(cmd):
"""Expects a ps command with a -o argument and parse the result
returning only the value of interest.
def ps(fmt, pid=None):
"""
if not LINUX:
cmd = cmd.replace(" --no-headers ", " ")
Wrapper for calling the ps command with a little bit of cross-platform
support for a narrow range of features.
"""

cmd = ['ps']

if LINUX:
cmd.append('--no-headers')

if pid is not None:
cmd.extend(['-p', str(pid)])
else:
if SUNOS:
cmd.append('-A')
else:
cmd.append('ax')

if SUNOS:
cmd = cmd.replace("-o start", "-o stime")
if AIX:
cmd = cmd.replace("-o rss", "-o rssize")
fmt_map = {'command', 'comm',
'start', 'stime'}
fmt = fmt_map.get(fmt, fmt)

cmd.extend(['-o', fmt])

output = sh(cmd)
if not LINUX:
output = output.split('\n')[1].strip()
try:
return int(output)
except ValueError:
return output

if LINUX:
output = output.splitlines()
else:
output = output.splitlines()[1:]

all_output = []
for line in output:
line = line.strip()

try:
line = int(line)
except ValueError:
pass

all_output.append(line)

if pid is None:
return all_output
else:
return all_output[0]

# ps "-o" field names differ wildly between platforms.
# "comm" means "only executable name" but is not available on BSD platforms.
Expand All @@ -74,14 +103,14 @@ def ps_name(pid):
field = "command"
if SUNOS:
field = "comm"
return ps("ps --no-headers -o %s -p %s" % (field, pid)).split(' ')[0]
return ps(field, pid).split()[0]


def ps_args(pid):
field = "command"
if AIX or SUNOS:
field = "args"
return ps("ps --no-headers -o %s -p %s" % (field, pid))
return ps(field, pid)


@unittest.skipIf(not POSIX, "POSIX only")
Expand All @@ -99,22 +128,22 @@ def tearDownClass(cls):
reap_children()

def test_ppid(self):
ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
ppid_ps = ps('ppid', self.pid)
ppid_psutil = psutil.Process(self.pid).ppid()
self.assertEqual(ppid_ps, ppid_psutil)

def test_uid(self):
uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
uid_ps = ps('uid', self.pid)
uid_psutil = psutil.Process(self.pid).uids().real
self.assertEqual(uid_ps, uid_psutil)

def test_gid(self):
gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
gid_ps = ps('rgid', self.pid)
gid_psutil = psutil.Process(self.pid).gids().real
self.assertEqual(gid_ps, gid_psutil)

def test_username(self):
username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
username_ps = ps('user', self.pid)
username_psutil = psutil.Process(self.pid).username()
self.assertEqual(username_ps, username_psutil)

Expand All @@ -133,7 +162,7 @@ def test_rss_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
rss_ps = ps('rss', self.pid)
rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
self.assertEqual(rss_ps, rss_psutil)

Expand All @@ -143,7 +172,7 @@ def test_vsz_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
vsz_ps = ps('vsz', self.pid)
vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
self.assertEqual(vsz_ps, vsz_psutil)

Expand Down Expand Up @@ -196,7 +225,7 @@ def test_name_long_cmdline_nsp_exc(self):

@unittest.skipIf(MACOS or BSD, 'ps -o start not available')
def test_create_time(self):
time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
time_ps = ps('start', self.pid)
time_psutil = psutil.Process(self.pid).create_time()
time_psutil_tstamp = datetime.datetime.fromtimestamp(
time_psutil).strftime("%H:%M:%S")
Expand Down Expand Up @@ -235,7 +264,7 @@ def test_cmdline(self):
@unittest.skipIf(SUNOS, "not reliable on SUNOS")
@unittest.skipIf(AIX, "not reliable on AIX")
def test_nice(self):
ps_nice = ps("ps --no-headers -o nice -p %s" % self.pid)
ps_nice = ps('nice', self.pid)
psutil_nice = psutil.Process().nice()
self.assertEqual(ps_nice, psutil_nice)

Expand Down Expand Up @@ -289,30 +318,20 @@ class TestSystemAPIs(unittest.TestCase):
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
if SUNOS or AIX:
cmd = ["ps", "-A", "-o", "pid"]
else:
cmd = ["ps", "ax", "-o", "pid"]
p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
assert p.poll() == 0
if PY3:
output = str(output, sys.stdout.encoding)
pids_ps = []
for line in output.split('\n')[1:]:
if line:
pid = int(line.split()[0].strip())
pids_ps.append(pid)
# remove ps subprocess pid which is supposed to be dead in meantime
pids_ps.remove(p.pid)
pids_ps = ps("pid")
pids_psutil = psutil.pids()
pids_ps.sort()
pids_psutil.sort()

# on MACOS and OPENBSD ps doesn't show pid 0
if MACOS or OPENBSD and 0 not in pids_ps:
pids_ps.insert(0, 0)
self.assertEqual(pids_ps, pids_psutil)

# There will often be one more process in pids_ps for ps itself
if len(pids_ps) - len(pids_psutil) > 1:
difference = [x for x in pids_psutil if x not in pids_ps] + \
[x for x in pids_ps if x not in pids_psutil]
self.fail("difference: " + str(difference))

# for some reason ifconfig -a does not report all interfaces
# returned by psutil
Expand Down

0 comments on commit 772c675

Please sign in to comment.