mirror of
https://github.com/jart/cosmopolitan.git
synced 2025-02-12 09:17:53 +00:00
These are the commits from https://github.com/ahgamut/cpython/tree/cosmo_py36 squashed for simplicity. Also included is the pyconfig.h used for compilation. The pyconfig.h has to be changed manually in case Cosmopolitan gets new features.
995 lines
35 KiB
Python
995 lines
35 KiB
Python
import contextlib
|
|
import os
|
|
import sys
|
|
import tracemalloc
|
|
import unittest
|
|
from unittest.mock import patch
|
|
from test.support.script_helper import (assert_python_ok, assert_python_failure,
|
|
interpreter_requires_environment)
|
|
from test import support
|
|
try:
|
|
import _thread
|
|
import threading
|
|
except ImportError:
|
|
threading = None
|
|
try:
|
|
import _testcapi
|
|
except ImportError:
|
|
_testcapi = None
|
|
|
|
|
|
EMPTY_STRING_SIZE = sys.getsizeof(b'')
|
|
|
|
|
|
def get_frames(nframe, lineno_delta):
|
|
frames = []
|
|
frame = sys._getframe(1)
|
|
for index in range(nframe):
|
|
code = frame.f_code
|
|
lineno = frame.f_lineno + lineno_delta
|
|
frames.append((code.co_filename, lineno))
|
|
lineno_delta = 0
|
|
frame = frame.f_back
|
|
if frame is None:
|
|
break
|
|
return tuple(frames)
|
|
|
|
def allocate_bytes(size):
|
|
nframe = tracemalloc.get_traceback_limit()
|
|
bytes_len = (size - EMPTY_STRING_SIZE)
|
|
frames = get_frames(nframe, 1)
|
|
data = b'x' * bytes_len
|
|
return data, tracemalloc.Traceback(frames)
|
|
|
|
def create_snapshots():
|
|
traceback_limit = 2
|
|
|
|
# _tracemalloc._get_traces() returns a list of (domain, size,
|
|
# traceback_frames) tuples. traceback_frames is a tuple of (filename,
|
|
# line_number) tuples.
|
|
raw_traces = [
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
|
|
(1, 2, (('a.py', 5), ('b.py', 4))),
|
|
|
|
(2, 66, (('b.py', 1),)),
|
|
|
|
(3, 7, (('<unknown>', 0),)),
|
|
]
|
|
snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
|
|
|
|
raw_traces2 = [
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
|
|
(2, 2, (('a.py', 5), ('b.py', 4))),
|
|
(2, 5000, (('a.py', 5), ('b.py', 4))),
|
|
|
|
(4, 400, (('c.py', 578),)),
|
|
]
|
|
snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
|
|
|
|
return (snapshot, snapshot2)
|
|
|
|
def frame(filename, lineno):
|
|
return tracemalloc._Frame((filename, lineno))
|
|
|
|
def traceback(*frames):
|
|
return tracemalloc.Traceback(frames)
|
|
|
|
def traceback_lineno(filename, lineno):
|
|
return traceback((filename, lineno))
|
|
|
|
def traceback_filename(filename):
|
|
return traceback_lineno(filename, 0)
|
|
|
|
|
|
class TestTracemallocEnabled(unittest.TestCase):
|
|
def setUp(self):
|
|
if tracemalloc.is_tracing():
|
|
self.skipTest("tracemalloc must be stopped before the test")
|
|
|
|
tracemalloc.start(1)
|
|
|
|
def tearDown(self):
|
|
tracemalloc.stop()
|
|
|
|
def test_get_tracemalloc_memory(self):
|
|
data = [allocate_bytes(123) for count in range(1000)]
|
|
size = tracemalloc.get_tracemalloc_memory()
|
|
self.assertGreaterEqual(size, 0)
|
|
|
|
tracemalloc.clear_traces()
|
|
size2 = tracemalloc.get_tracemalloc_memory()
|
|
self.assertGreaterEqual(size2, 0)
|
|
self.assertLessEqual(size2, size)
|
|
|
|
def test_get_object_traceback(self):
|
|
tracemalloc.clear_traces()
|
|
obj_size = 12345
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
traceback = tracemalloc.get_object_traceback(obj)
|
|
self.assertEqual(traceback, obj_traceback)
|
|
|
|
def test_set_traceback_limit(self):
|
|
obj_size = 10
|
|
|
|
tracemalloc.stop()
|
|
self.assertRaises(ValueError, tracemalloc.start, -1)
|
|
|
|
tracemalloc.stop()
|
|
tracemalloc.start(10)
|
|
obj2, obj2_traceback = allocate_bytes(obj_size)
|
|
traceback = tracemalloc.get_object_traceback(obj2)
|
|
self.assertEqual(len(traceback), 10)
|
|
self.assertEqual(traceback, obj2_traceback)
|
|
|
|
tracemalloc.stop()
|
|
tracemalloc.start(1)
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
traceback = tracemalloc.get_object_traceback(obj)
|
|
self.assertEqual(len(traceback), 1)
|
|
self.assertEqual(traceback, obj_traceback)
|
|
|
|
def find_trace(self, traces, traceback):
|
|
for trace in traces:
|
|
if trace[2] == traceback._frames:
|
|
return trace
|
|
|
|
self.fail("trace not found")
|
|
|
|
def test_get_traces(self):
|
|
tracemalloc.clear_traces()
|
|
obj_size = 12345
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
|
|
traces = tracemalloc._get_traces()
|
|
trace = self.find_trace(traces, obj_traceback)
|
|
|
|
self.assertIsInstance(trace, tuple)
|
|
domain, size, traceback = trace
|
|
self.assertEqual(size, obj_size)
|
|
self.assertEqual(traceback, obj_traceback._frames)
|
|
|
|
tracemalloc.stop()
|
|
self.assertEqual(tracemalloc._get_traces(), [])
|
|
|
|
def test_get_traces_intern_traceback(self):
|
|
# dummy wrappers to get more useful and identical frames in the traceback
|
|
def allocate_bytes2(size):
|
|
return allocate_bytes(size)
|
|
def allocate_bytes3(size):
|
|
return allocate_bytes2(size)
|
|
def allocate_bytes4(size):
|
|
return allocate_bytes3(size)
|
|
|
|
# Ensure that two identical tracebacks are not duplicated
|
|
tracemalloc.stop()
|
|
tracemalloc.start(4)
|
|
obj_size = 123
|
|
obj1, obj1_traceback = allocate_bytes4(obj_size)
|
|
obj2, obj2_traceback = allocate_bytes4(obj_size)
|
|
|
|
traces = tracemalloc._get_traces()
|
|
|
|
trace1 = self.find_trace(traces, obj1_traceback)
|
|
trace2 = self.find_trace(traces, obj2_traceback)
|
|
domain1, size1, traceback1 = trace1
|
|
domain2, size2, traceback2 = trace2
|
|
self.assertIs(traceback2, traceback1)
|
|
|
|
def test_get_traced_memory(self):
|
|
# Python allocates some internals objects, so the test must tolerate
|
|
# a small difference between the expected size and the real usage
|
|
max_error = 2048
|
|
|
|
# allocate one object
|
|
obj_size = 1024 * 1024
|
|
tracemalloc.clear_traces()
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
size, peak_size = tracemalloc.get_traced_memory()
|
|
self.assertGreaterEqual(size, obj_size)
|
|
self.assertGreaterEqual(peak_size, size)
|
|
|
|
self.assertLessEqual(size - obj_size, max_error)
|
|
self.assertLessEqual(peak_size - size, max_error)
|
|
|
|
# destroy the object
|
|
obj = None
|
|
size2, peak_size2 = tracemalloc.get_traced_memory()
|
|
self.assertLess(size2, size)
|
|
self.assertGreaterEqual(size - size2, obj_size - max_error)
|
|
self.assertGreaterEqual(peak_size2, peak_size)
|
|
|
|
# clear_traces() must reset traced memory counters
|
|
tracemalloc.clear_traces()
|
|
self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
|
|
|
|
# allocate another object
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
size, peak_size = tracemalloc.get_traced_memory()
|
|
self.assertGreaterEqual(size, obj_size)
|
|
|
|
# stop() also resets traced memory counters
|
|
tracemalloc.stop()
|
|
self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
|
|
|
|
def test_clear_traces(self):
|
|
obj, obj_traceback = allocate_bytes(123)
|
|
traceback = tracemalloc.get_object_traceback(obj)
|
|
self.assertIsNotNone(traceback)
|
|
|
|
tracemalloc.clear_traces()
|
|
traceback2 = tracemalloc.get_object_traceback(obj)
|
|
self.assertIsNone(traceback2)
|
|
|
|
def test_is_tracing(self):
|
|
tracemalloc.stop()
|
|
self.assertFalse(tracemalloc.is_tracing())
|
|
|
|
tracemalloc.start()
|
|
self.assertTrue(tracemalloc.is_tracing())
|
|
|
|
def test_snapshot(self):
|
|
obj, source = allocate_bytes(123)
|
|
|
|
# take a snapshot
|
|
snapshot = tracemalloc.take_snapshot()
|
|
|
|
# write on disk
|
|
snapshot.dump(support.TESTFN)
|
|
self.addCleanup(support.unlink, support.TESTFN)
|
|
|
|
# load from disk
|
|
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
|
|
self.assertEqual(snapshot2.traces, snapshot.traces)
|
|
|
|
# tracemalloc must be tracing memory allocations to take a snapshot
|
|
tracemalloc.stop()
|
|
with self.assertRaises(RuntimeError) as cm:
|
|
tracemalloc.take_snapshot()
|
|
self.assertEqual(str(cm.exception),
|
|
"the tracemalloc module must be tracing memory "
|
|
"allocations to take a snapshot")
|
|
|
|
def test_snapshot_save_attr(self):
|
|
# take a snapshot with a new attribute
|
|
snapshot = tracemalloc.take_snapshot()
|
|
snapshot.test_attr = "new"
|
|
snapshot.dump(support.TESTFN)
|
|
self.addCleanup(support.unlink, support.TESTFN)
|
|
|
|
# load() should recreate the attribute
|
|
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
|
|
self.assertEqual(snapshot2.test_attr, "new")
|
|
|
|
def fork_child(self):
|
|
if not tracemalloc.is_tracing():
|
|
return 2
|
|
|
|
obj_size = 12345
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
traceback = tracemalloc.get_object_traceback(obj)
|
|
if traceback is None:
|
|
return 3
|
|
|
|
# everything is fine
|
|
return 0
|
|
|
|
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
|
|
def test_fork(self):
|
|
# check that tracemalloc is still working after fork
|
|
pid = os.fork()
|
|
if not pid:
|
|
# child
|
|
exitcode = 1
|
|
try:
|
|
exitcode = self.fork_child()
|
|
finally:
|
|
os._exit(exitcode)
|
|
else:
|
|
pid2, status = os.waitpid(pid, 0)
|
|
self.assertTrue(os.WIFEXITED(status))
|
|
exitcode = os.WEXITSTATUS(status)
|
|
self.assertEqual(exitcode, 0)
|
|
|
|
|
|
class TestSnapshot(unittest.TestCase):
|
|
maxDiff = 4000
|
|
|
|
def test_create_snapshot(self):
|
|
raw_traces = [(0, 5, (('a.py', 2),))]
|
|
|
|
with contextlib.ExitStack() as stack:
|
|
stack.enter_context(patch.object(tracemalloc, 'is_tracing',
|
|
return_value=True))
|
|
stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
|
|
return_value=5))
|
|
stack.enter_context(patch.object(tracemalloc, '_get_traces',
|
|
return_value=raw_traces))
|
|
|
|
snapshot = tracemalloc.take_snapshot()
|
|
self.assertEqual(snapshot.traceback_limit, 5)
|
|
self.assertEqual(len(snapshot.traces), 1)
|
|
trace = snapshot.traces[0]
|
|
self.assertEqual(trace.size, 5)
|
|
self.assertEqual(len(trace.traceback), 1)
|
|
self.assertEqual(trace.traceback[0].filename, 'a.py')
|
|
self.assertEqual(trace.traceback[0].lineno, 2)
|
|
|
|
def test_filter_traces(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
filter1 = tracemalloc.Filter(False, "b.py")
|
|
filter2 = tracemalloc.Filter(True, "a.py", 2)
|
|
filter3 = tracemalloc.Filter(True, "a.py", 5)
|
|
|
|
original_traces = list(snapshot.traces._traces)
|
|
|
|
# exclude b.py
|
|
snapshot3 = snapshot.filter_traces((filter1,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(1, 2, (('a.py', 5), ('b.py', 4))),
|
|
(3, 7, (('<unknown>', 0),)),
|
|
])
|
|
|
|
# filter_traces() must not touch the original snapshot
|
|
self.assertEqual(snapshot.traces._traces, original_traces)
|
|
|
|
# only include two lines of a.py
|
|
snapshot4 = snapshot3.filter_traces((filter2, filter3))
|
|
self.assertEqual(snapshot4.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(1, 2, (('a.py', 5), ('b.py', 4))),
|
|
])
|
|
|
|
# No filter: just duplicate the snapshot
|
|
snapshot5 = snapshot.filter_traces(())
|
|
self.assertIsNot(snapshot5, snapshot)
|
|
self.assertIsNot(snapshot5.traces, snapshot.traces)
|
|
self.assertEqual(snapshot5.traces, snapshot.traces)
|
|
|
|
self.assertRaises(TypeError, snapshot.filter_traces, filter1)
|
|
|
|
def test_filter_traces_domain(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
filter1 = tracemalloc.Filter(False, "a.py", domain=1)
|
|
filter2 = tracemalloc.Filter(True, "a.py", domain=1)
|
|
|
|
original_traces = list(snapshot.traces._traces)
|
|
|
|
# exclude a.py of domain 1
|
|
snapshot3 = snapshot.filter_traces((filter1,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(2, 66, (('b.py', 1),)),
|
|
(3, 7, (('<unknown>', 0),)),
|
|
])
|
|
|
|
# include domain 1
|
|
snapshot3 = snapshot.filter_traces((filter1,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(2, 66, (('b.py', 1),)),
|
|
(3, 7, (('<unknown>', 0),)),
|
|
])
|
|
|
|
def test_filter_traces_domain_filter(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
filter1 = tracemalloc.DomainFilter(False, domain=3)
|
|
filter2 = tracemalloc.DomainFilter(True, domain=3)
|
|
|
|
# exclude domain 2
|
|
snapshot3 = snapshot.filter_traces((filter1,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(0, 10, (('a.py', 2), ('b.py', 4))),
|
|
(1, 2, (('a.py', 5), ('b.py', 4))),
|
|
(2, 66, (('b.py', 1),)),
|
|
])
|
|
|
|
# include domain 2
|
|
snapshot3 = snapshot.filter_traces((filter2,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(3, 7, (('<unknown>', 0),)),
|
|
])
|
|
|
|
def test_snapshot_group_by_line(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
tb_0 = traceback_lineno('<unknown>', 0)
|
|
tb_a_2 = traceback_lineno('a.py', 2)
|
|
tb_a_5 = traceback_lineno('a.py', 5)
|
|
tb_b_1 = traceback_lineno('b.py', 1)
|
|
tb_c_578 = traceback_lineno('c.py', 578)
|
|
|
|
# stats per file and line
|
|
stats1 = snapshot.statistics('lineno')
|
|
self.assertEqual(stats1, [
|
|
tracemalloc.Statistic(tb_b_1, 66, 1),
|
|
tracemalloc.Statistic(tb_a_2, 30, 3),
|
|
tracemalloc.Statistic(tb_0, 7, 1),
|
|
tracemalloc.Statistic(tb_a_5, 2, 1),
|
|
])
|
|
|
|
# stats per file and line (2)
|
|
stats2 = snapshot2.statistics('lineno')
|
|
self.assertEqual(stats2, [
|
|
tracemalloc.Statistic(tb_a_5, 5002, 2),
|
|
tracemalloc.Statistic(tb_c_578, 400, 1),
|
|
tracemalloc.Statistic(tb_a_2, 30, 3),
|
|
])
|
|
|
|
# stats diff per file and line
|
|
statistics = snapshot2.compare_to(snapshot, 'lineno')
|
|
self.assertEqual(statistics, [
|
|
tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
|
|
tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
|
|
tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
|
|
tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
|
|
tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
|
|
])
|
|
|
|
def test_snapshot_group_by_file(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
tb_0 = traceback_filename('<unknown>')
|
|
tb_a = traceback_filename('a.py')
|
|
tb_b = traceback_filename('b.py')
|
|
tb_c = traceback_filename('c.py')
|
|
|
|
# stats per file
|
|
stats1 = snapshot.statistics('filename')
|
|
self.assertEqual(stats1, [
|
|
tracemalloc.Statistic(tb_b, 66, 1),
|
|
tracemalloc.Statistic(tb_a, 32, 4),
|
|
tracemalloc.Statistic(tb_0, 7, 1),
|
|
])
|
|
|
|
# stats per file (2)
|
|
stats2 = snapshot2.statistics('filename')
|
|
self.assertEqual(stats2, [
|
|
tracemalloc.Statistic(tb_a, 5032, 5),
|
|
tracemalloc.Statistic(tb_c, 400, 1),
|
|
])
|
|
|
|
# stats diff per file
|
|
diff = snapshot2.compare_to(snapshot, 'filename')
|
|
self.assertEqual(diff, [
|
|
tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
|
|
tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
|
|
tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
|
|
tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
|
|
])
|
|
|
|
def test_snapshot_group_by_traceback(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
|
|
# stats per file
|
|
tb1 = traceback(('a.py', 2), ('b.py', 4))
|
|
tb2 = traceback(('a.py', 5), ('b.py', 4))
|
|
tb3 = traceback(('b.py', 1))
|
|
tb4 = traceback(('<unknown>', 0))
|
|
stats1 = snapshot.statistics('traceback')
|
|
self.assertEqual(stats1, [
|
|
tracemalloc.Statistic(tb3, 66, 1),
|
|
tracemalloc.Statistic(tb1, 30, 3),
|
|
tracemalloc.Statistic(tb4, 7, 1),
|
|
tracemalloc.Statistic(tb2, 2, 1),
|
|
])
|
|
|
|
# stats per file (2)
|
|
tb5 = traceback(('c.py', 578))
|
|
stats2 = snapshot2.statistics('traceback')
|
|
self.assertEqual(stats2, [
|
|
tracemalloc.Statistic(tb2, 5002, 2),
|
|
tracemalloc.Statistic(tb5, 400, 1),
|
|
tracemalloc.Statistic(tb1, 30, 3),
|
|
])
|
|
|
|
# stats diff per file
|
|
diff = snapshot2.compare_to(snapshot, 'traceback')
|
|
self.assertEqual(diff, [
|
|
tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
|
|
tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
|
|
tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
|
|
tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
|
|
tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
|
|
])
|
|
|
|
self.assertRaises(ValueError,
|
|
snapshot.statistics, 'traceback', cumulative=True)
|
|
|
|
def test_snapshot_group_by_cumulative(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
tb_0 = traceback_filename('<unknown>')
|
|
tb_a = traceback_filename('a.py')
|
|
tb_b = traceback_filename('b.py')
|
|
tb_a_2 = traceback_lineno('a.py', 2)
|
|
tb_a_5 = traceback_lineno('a.py', 5)
|
|
tb_b_1 = traceback_lineno('b.py', 1)
|
|
tb_b_4 = traceback_lineno('b.py', 4)
|
|
|
|
# per file
|
|
stats = snapshot.statistics('filename', True)
|
|
self.assertEqual(stats, [
|
|
tracemalloc.Statistic(tb_b, 98, 5),
|
|
tracemalloc.Statistic(tb_a, 32, 4),
|
|
tracemalloc.Statistic(tb_0, 7, 1),
|
|
])
|
|
|
|
# per line
|
|
stats = snapshot.statistics('lineno', True)
|
|
self.assertEqual(stats, [
|
|
tracemalloc.Statistic(tb_b_1, 66, 1),
|
|
tracemalloc.Statistic(tb_b_4, 32, 4),
|
|
tracemalloc.Statistic(tb_a_2, 30, 3),
|
|
tracemalloc.Statistic(tb_0, 7, 1),
|
|
tracemalloc.Statistic(tb_a_5, 2, 1),
|
|
])
|
|
|
|
def test_trace_format(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
trace = snapshot.traces[0]
|
|
self.assertEqual(str(trace), 'a.py:2: 10 B')
|
|
traceback = trace.traceback
|
|
self.assertEqual(str(traceback), 'a.py:2')
|
|
frame = traceback[0]
|
|
self.assertEqual(str(frame), 'a.py:2')
|
|
|
|
def test_statistic_format(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
stats = snapshot.statistics('lineno')
|
|
stat = stats[0]
|
|
self.assertEqual(str(stat),
|
|
'b.py:1: size=66 B, count=1, average=66 B')
|
|
|
|
def test_statistic_diff_format(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
stats = snapshot2.compare_to(snapshot, 'lineno')
|
|
stat = stats[0]
|
|
self.assertEqual(str(stat),
|
|
'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
|
|
|
|
def test_slices(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
self.assertEqual(snapshot.traces[:2],
|
|
(snapshot.traces[0], snapshot.traces[1]))
|
|
|
|
traceback = snapshot.traces[0].traceback
|
|
self.assertEqual(traceback[:2],
|
|
(traceback[0], traceback[1]))
|
|
|
|
def test_format_traceback(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
def getline(filename, lineno):
|
|
return ' <%s, %s>' % (filename, lineno)
|
|
with unittest.mock.patch('tracemalloc.linecache.getline',
|
|
side_effect=getline):
|
|
tb = snapshot.traces[0].traceback
|
|
self.assertEqual(tb.format(),
|
|
[' File "a.py", line 2',
|
|
' <a.py, 2>',
|
|
' File "b.py", line 4',
|
|
' <b.py, 4>'])
|
|
|
|
self.assertEqual(tb.format(limit=1),
|
|
[' File "a.py", line 2',
|
|
' <a.py, 2>'])
|
|
|
|
self.assertEqual(tb.format(limit=-1),
|
|
[])
|
|
|
|
|
|
class TestFilters(unittest.TestCase):
|
|
maxDiff = 2048
|
|
|
|
def test_filter_attributes(self):
|
|
# test default values
|
|
f = tracemalloc.Filter(True, "abc")
|
|
self.assertEqual(f.inclusive, True)
|
|
self.assertEqual(f.filename_pattern, "abc")
|
|
self.assertIsNone(f.lineno)
|
|
self.assertEqual(f.all_frames, False)
|
|
|
|
# test custom values
|
|
f = tracemalloc.Filter(False, "test.py", 123, True)
|
|
self.assertEqual(f.inclusive, False)
|
|
self.assertEqual(f.filename_pattern, "test.py")
|
|
self.assertEqual(f.lineno, 123)
|
|
self.assertEqual(f.all_frames, True)
|
|
|
|
# parameters passed by keyword
|
|
f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
|
|
self.assertEqual(f.inclusive, False)
|
|
self.assertEqual(f.filename_pattern, "test.py")
|
|
self.assertEqual(f.lineno, 123)
|
|
self.assertEqual(f.all_frames, True)
|
|
|
|
# read-only attribute
|
|
self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
|
|
|
|
def test_filter_match(self):
|
|
# filter without line number
|
|
f = tracemalloc.Filter(True, "abc")
|
|
self.assertTrue(f._match_frame("abc", 0))
|
|
self.assertTrue(f._match_frame("abc", 5))
|
|
self.assertTrue(f._match_frame("abc", 10))
|
|
self.assertFalse(f._match_frame("12356", 0))
|
|
self.assertFalse(f._match_frame("12356", 5))
|
|
self.assertFalse(f._match_frame("12356", 10))
|
|
|
|
f = tracemalloc.Filter(False, "abc")
|
|
self.assertFalse(f._match_frame("abc", 0))
|
|
self.assertFalse(f._match_frame("abc", 5))
|
|
self.assertFalse(f._match_frame("abc", 10))
|
|
self.assertTrue(f._match_frame("12356", 0))
|
|
self.assertTrue(f._match_frame("12356", 5))
|
|
self.assertTrue(f._match_frame("12356", 10))
|
|
|
|
# filter with line number > 0
|
|
f = tracemalloc.Filter(True, "abc", 5)
|
|
self.assertFalse(f._match_frame("abc", 0))
|
|
self.assertTrue(f._match_frame("abc", 5))
|
|
self.assertFalse(f._match_frame("abc", 10))
|
|
self.assertFalse(f._match_frame("12356", 0))
|
|
self.assertFalse(f._match_frame("12356", 5))
|
|
self.assertFalse(f._match_frame("12356", 10))
|
|
|
|
f = tracemalloc.Filter(False, "abc", 5)
|
|
self.assertTrue(f._match_frame("abc", 0))
|
|
self.assertFalse(f._match_frame("abc", 5))
|
|
self.assertTrue(f._match_frame("abc", 10))
|
|
self.assertTrue(f._match_frame("12356", 0))
|
|
self.assertTrue(f._match_frame("12356", 5))
|
|
self.assertTrue(f._match_frame("12356", 10))
|
|
|
|
# filter with line number 0
|
|
f = tracemalloc.Filter(True, "abc", 0)
|
|
self.assertTrue(f._match_frame("abc", 0))
|
|
self.assertFalse(f._match_frame("abc", 5))
|
|
self.assertFalse(f._match_frame("abc", 10))
|
|
self.assertFalse(f._match_frame("12356", 0))
|
|
self.assertFalse(f._match_frame("12356", 5))
|
|
self.assertFalse(f._match_frame("12356", 10))
|
|
|
|
f = tracemalloc.Filter(False, "abc", 0)
|
|
self.assertFalse(f._match_frame("abc", 0))
|
|
self.assertTrue(f._match_frame("abc", 5))
|
|
self.assertTrue(f._match_frame("abc", 10))
|
|
self.assertTrue(f._match_frame("12356", 0))
|
|
self.assertTrue(f._match_frame("12356", 5))
|
|
self.assertTrue(f._match_frame("12356", 10))
|
|
|
|
def test_filter_match_filename(self):
|
|
def fnmatch(inclusive, filename, pattern):
|
|
f = tracemalloc.Filter(inclusive, pattern)
|
|
return f._match_frame(filename, 0)
|
|
|
|
self.assertTrue(fnmatch(True, "abc", "abc"))
|
|
self.assertFalse(fnmatch(True, "12356", "abc"))
|
|
self.assertFalse(fnmatch(True, "<unknown>", "abc"))
|
|
|
|
self.assertFalse(fnmatch(False, "abc", "abc"))
|
|
self.assertTrue(fnmatch(False, "12356", "abc"))
|
|
self.assertTrue(fnmatch(False, "<unknown>", "abc"))
|
|
|
|
def test_filter_match_filename_joker(self):
|
|
def fnmatch(filename, pattern):
|
|
filter = tracemalloc.Filter(True, pattern)
|
|
return filter._match_frame(filename, 0)
|
|
|
|
# empty string
|
|
self.assertFalse(fnmatch('abc', ''))
|
|
self.assertFalse(fnmatch('', 'abc'))
|
|
self.assertTrue(fnmatch('', ''))
|
|
self.assertTrue(fnmatch('', '*'))
|
|
|
|
# no *
|
|
self.assertTrue(fnmatch('abc', 'abc'))
|
|
self.assertFalse(fnmatch('abc', 'abcd'))
|
|
self.assertFalse(fnmatch('abc', 'def'))
|
|
|
|
# a*
|
|
self.assertTrue(fnmatch('abc', 'a*'))
|
|
self.assertTrue(fnmatch('abc', 'abc*'))
|
|
self.assertFalse(fnmatch('abc', 'b*'))
|
|
self.assertFalse(fnmatch('abc', 'abcd*'))
|
|
|
|
# a*b
|
|
self.assertTrue(fnmatch('abc', 'a*c'))
|
|
self.assertTrue(fnmatch('abcdcx', 'a*cx'))
|
|
self.assertFalse(fnmatch('abb', 'a*c'))
|
|
self.assertFalse(fnmatch('abcdce', 'a*cx'))
|
|
|
|
# a*b*c
|
|
self.assertTrue(fnmatch('abcde', 'a*c*e'))
|
|
self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
|
|
self.assertFalse(fnmatch('abcdd', 'a*c*e'))
|
|
self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
|
|
|
|
# replace .pyc suffix with .py
|
|
self.assertTrue(fnmatch('a.pyc', 'a.py'))
|
|
self.assertTrue(fnmatch('a.py', 'a.pyc'))
|
|
|
|
if os.name == 'nt':
|
|
# case insensitive
|
|
self.assertTrue(fnmatch('aBC', 'ABc'))
|
|
self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
|
|
|
|
self.assertTrue(fnmatch('a.pyc', 'a.PY'))
|
|
self.assertTrue(fnmatch('a.py', 'a.PYC'))
|
|
else:
|
|
# case sensitive
|
|
self.assertFalse(fnmatch('aBC', 'ABc'))
|
|
self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
|
|
|
|
self.assertFalse(fnmatch('a.pyc', 'a.PY'))
|
|
self.assertFalse(fnmatch('a.py', 'a.PYC'))
|
|
|
|
if os.name == 'nt':
|
|
# normalize alternate separator "/" to the standard separator "\"
|
|
self.assertTrue(fnmatch(r'a/b', r'a\b'))
|
|
self.assertTrue(fnmatch(r'a\b', r'a/b'))
|
|
self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
|
|
self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
|
|
else:
|
|
# there is no alternate separator
|
|
self.assertFalse(fnmatch(r'a/b', r'a\b'))
|
|
self.assertFalse(fnmatch(r'a\b', r'a/b'))
|
|
self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
|
|
self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
|
|
|
|
# as of 3.5, .pyo is no longer munged to .py
|
|
self.assertFalse(fnmatch('a.pyo', 'a.py'))
|
|
|
|
def test_filter_match_trace(self):
|
|
t1 = (("a.py", 2), ("b.py", 3))
|
|
t2 = (("b.py", 4), ("b.py", 5))
|
|
t3 = (("c.py", 5), ('<unknown>', 0))
|
|
unknown = (('<unknown>', 0),)
|
|
|
|
f = tracemalloc.Filter(True, "b.py", all_frames=True)
|
|
self.assertTrue(f._match_traceback(t1))
|
|
self.assertTrue(f._match_traceback(t2))
|
|
self.assertFalse(f._match_traceback(t3))
|
|
self.assertFalse(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(True, "b.py", all_frames=False)
|
|
self.assertFalse(f._match_traceback(t1))
|
|
self.assertTrue(f._match_traceback(t2))
|
|
self.assertFalse(f._match_traceback(t3))
|
|
self.assertFalse(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(False, "b.py", all_frames=True)
|
|
self.assertFalse(f._match_traceback(t1))
|
|
self.assertFalse(f._match_traceback(t2))
|
|
self.assertTrue(f._match_traceback(t3))
|
|
self.assertTrue(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(False, "b.py", all_frames=False)
|
|
self.assertTrue(f._match_traceback(t1))
|
|
self.assertFalse(f._match_traceback(t2))
|
|
self.assertTrue(f._match_traceback(t3))
|
|
self.assertTrue(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
|
|
self.assertTrue(f._match_traceback(t1))
|
|
self.assertTrue(f._match_traceback(t2))
|
|
self.assertTrue(f._match_traceback(t3))
|
|
self.assertFalse(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
|
|
self.assertFalse(f._match_traceback(t1))
|
|
self.assertFalse(f._match_traceback(t2))
|
|
self.assertTrue(f._match_traceback(t3))
|
|
self.assertTrue(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
|
|
self.assertTrue(f._match_traceback(t1))
|
|
self.assertTrue(f._match_traceback(t2))
|
|
self.assertFalse(f._match_traceback(t3))
|
|
self.assertFalse(f._match_traceback(unknown))
|
|
|
|
|
|
class TestCommandLine(unittest.TestCase):
|
|
def test_env_var_disabled_by_default(self):
|
|
# not tracing by default
|
|
code = 'import tracemalloc; print(tracemalloc.is_tracing())'
|
|
ok, stdout, stderr = assert_python_ok('-c', code)
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, b'False')
|
|
|
|
@unittest.skipIf(interpreter_requires_environment(),
|
|
'Cannot run -E tests when PYTHON env vars are required.')
|
|
def test_env_var_ignored_with_E(self):
|
|
"""PYTHON* environment variables must be ignored when -E is present."""
|
|
code = 'import tracemalloc; print(tracemalloc.is_tracing())'
|
|
ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, b'False')
|
|
|
|
def test_env_var_enabled_at_startup(self):
|
|
# tracing at startup
|
|
code = 'import tracemalloc; print(tracemalloc.is_tracing())'
|
|
ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, b'True')
|
|
|
|
def test_env_limit(self):
|
|
# start and set the number of frames
|
|
code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
|
|
ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, b'10')
|
|
|
|
def test_env_var_invalid(self):
|
|
for nframe in (-1, 0, 2**30):
|
|
with self.subTest(nframe=nframe):
|
|
with support.SuppressCrashReport():
|
|
ok, stdout, stderr = assert_python_failure(
|
|
'-c', 'pass',
|
|
PYTHONTRACEMALLOC=str(nframe))
|
|
self.assertIn(b'PYTHONTRACEMALLOC: invalid '
|
|
b'number of frames',
|
|
stderr)
|
|
|
|
def test_sys_xoptions(self):
|
|
for xoptions, nframe in (
|
|
('tracemalloc', 1),
|
|
('tracemalloc=1', 1),
|
|
('tracemalloc=15', 15),
|
|
):
|
|
with self.subTest(xoptions=xoptions, nframe=nframe):
|
|
code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
|
|
ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, str(nframe).encode('ascii'))
|
|
|
|
def test_sys_xoptions_invalid(self):
|
|
for nframe in (-1, 0, 2**30):
|
|
with self.subTest(nframe=nframe):
|
|
with support.SuppressCrashReport():
|
|
args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
|
|
ok, stdout, stderr = assert_python_failure(*args)
|
|
self.assertIn(b'-X tracemalloc=NFRAME: invalid '
|
|
b'number of frames',
|
|
stderr)
|
|
|
|
@unittest.skipIf(_testcapi is None, 'need _testcapi')
|
|
def test_pymem_alloc0(self):
|
|
# Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
|
|
# does not crash.
|
|
code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
|
|
assert_python_ok('-X', 'tracemalloc', '-c', code)
|
|
|
|
|
|
@unittest.skipIf(_testcapi is None, 'need _testcapi')
|
|
class TestCAPI(unittest.TestCase):
|
|
maxDiff = 80 * 20
|
|
|
|
def setUp(self):
|
|
if tracemalloc.is_tracing():
|
|
self.skipTest("tracemalloc must be stopped before the test")
|
|
|
|
self.domain = 5
|
|
self.size = 123
|
|
self.obj = allocate_bytes(self.size)[0]
|
|
|
|
# for the type "object", id(obj) is the address of its memory block.
|
|
# This type is not tracked by the garbage collector
|
|
self.ptr = id(self.obj)
|
|
|
|
def tearDown(self):
|
|
tracemalloc.stop()
|
|
|
|
def get_traceback(self):
|
|
frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
|
|
if frames is not None:
|
|
return tracemalloc.Traceback(frames)
|
|
else:
|
|
return None
|
|
|
|
def track(self, release_gil=False, nframe=1):
|
|
frames = get_frames(nframe, 2)
|
|
_testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
|
|
release_gil)
|
|
return frames
|
|
|
|
def untrack(self):
|
|
_testcapi.tracemalloc_untrack(self.domain, self.ptr)
|
|
|
|
def get_traced_memory(self):
|
|
# Get the traced size in the domain
|
|
snapshot = tracemalloc.take_snapshot()
|
|
domain_filter = tracemalloc.DomainFilter(True, self.domain)
|
|
snapshot = snapshot.filter_traces([domain_filter])
|
|
return sum(trace.size for trace in snapshot.traces)
|
|
|
|
def check_track(self, release_gil):
|
|
nframe = 5
|
|
tracemalloc.start(nframe)
|
|
|
|
size = tracemalloc.get_traced_memory()[0]
|
|
|
|
frames = self.track(release_gil, nframe)
|
|
self.assertEqual(self.get_traceback(),
|
|
tracemalloc.Traceback(frames))
|
|
|
|
self.assertEqual(self.get_traced_memory(), self.size)
|
|
|
|
def test_track(self):
|
|
self.check_track(False)
|
|
|
|
def test_track_without_gil(self):
|
|
# check that calling _PyTraceMalloc_Track() without holding the GIL
|
|
# works too
|
|
self.check_track(True)
|
|
|
|
def test_track_already_tracked(self):
|
|
nframe = 5
|
|
tracemalloc.start(nframe)
|
|
|
|
# track a first time
|
|
self.track()
|
|
|
|
# calling _PyTraceMalloc_Track() must remove the old trace and add
|
|
# a new trace with the new traceback
|
|
frames = self.track(nframe=nframe)
|
|
self.assertEqual(self.get_traceback(),
|
|
tracemalloc.Traceback(frames))
|
|
|
|
def test_untrack(self):
|
|
tracemalloc.start()
|
|
|
|
self.track()
|
|
self.assertIsNotNone(self.get_traceback())
|
|
self.assertEqual(self.get_traced_memory(), self.size)
|
|
|
|
# untrack must remove the trace
|
|
self.untrack()
|
|
self.assertIsNone(self.get_traceback())
|
|
self.assertEqual(self.get_traced_memory(), 0)
|
|
|
|
# calling _PyTraceMalloc_Untrack() multiple times must not crash
|
|
self.untrack()
|
|
self.untrack()
|
|
|
|
def test_stop_track(self):
|
|
tracemalloc.start()
|
|
tracemalloc.stop()
|
|
|
|
with self.assertRaises(RuntimeError):
|
|
self.track()
|
|
self.assertIsNone(self.get_traceback())
|
|
|
|
def test_stop_untrack(self):
|
|
tracemalloc.start()
|
|
self.track()
|
|
|
|
tracemalloc.stop()
|
|
with self.assertRaises(RuntimeError):
|
|
self.untrack()
|
|
|
|
|
|
def test_main():
|
|
support.run_unittest(
|
|
TestTracemallocEnabled,
|
|
TestSnapshot,
|
|
TestFilters,
|
|
TestCommandLine,
|
|
TestCAPI,
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|