Improve Python tree-shaking

This commit is contained in:
Justine Tunney 2021-09-06 19:24:10 -07:00
parent 5bb2275788
commit 4f41f2184d
169 changed files with 4182 additions and 2411 deletions

View file

@ -637,7 +637,7 @@ class UtimeTests(unittest.TestCase):
def get_file_system(self, path):
if sys.platform == 'win32':
root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
import ctypes
# import ctypes
kernel32 = ctypes.windll.kernel32
buf = ctypes.create_unicode_buffer("", 100)
ok = kernel32.GetVolumeInformationW(root, None, 0,
@ -1915,361 +1915,6 @@ class Pep383Tests(unittest.TestCase):
for fn in self.unicodefn:
os.stat(os.path.join(self.dir, fn))
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32KillTests(unittest.TestCase):
def _kill(self, sig):
# Start sys.executable as a subprocess and communicate from the
# subprocess to the parent that the interpreter is ready. When it
# becomes ready, send *sig* via os.kill to the subprocess and check
# that the return code is equal to *sig*.
import ctypes
from ctypes import wintypes
import msvcrt
# Since we can't access the contents of the process' stdout until the
# process has exited, use PeekNamedPipe to see what's inside stdout
# without waiting. This is done so we can tell that the interpreter
# is started and running at a point where it could handle a signal.
PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
PeekNamedPipe.restype = wintypes.BOOL
PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
ctypes.POINTER(ctypes.c_char), # stdout buf
wintypes.DWORD, # Buffer size
ctypes.POINTER(wintypes.DWORD), # bytes read
ctypes.POINTER(wintypes.DWORD), # bytes avail
ctypes.POINTER(wintypes.DWORD)) # bytes left
msg = "running"
proc = subprocess.Popen([sys.executable, "-c",
"import sys;"
"sys.stdout.write('{}');"
"sys.stdout.flush();"
"input()".format(msg)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
self.addCleanup(proc.stdout.close)
self.addCleanup(proc.stderr.close)
self.addCleanup(proc.stdin.close)
count, max = 0, 100
while count < max and proc.poll() is None:
# Create a string buffer to store the result of stdout from the pipe
buf = ctypes.create_string_buffer(len(msg))
# Obtain the text currently in proc.stdout
# Bytes read/avail/left are left as NULL and unused
rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
buf, ctypes.sizeof(buf), None, None, None)
self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
if buf.value:
self.assertEqual(msg, buf.value.decode())
break
time.sleep(0.1)
count += 1
else:
self.fail("Did not receive communication from the subprocess")
os.kill(proc.pid, sig)
self.assertEqual(proc.wait(), sig)
def test_kill_sigterm(self):
# SIGTERM doesn't mean anything special, but make sure it works
self._kill(signal.SIGTERM)
def test_kill_int(self):
# os.kill on Windows can take an int which gets set as the exit code
self._kill(100)
def _kill_with_event(self, event, name):
tagname = "test_os_%s" % uuid.uuid1()
m = mmap.mmap(-1, 1, tagname)
m[0] = 0
# Run a script which has console control handling enabled.
proc = subprocess.Popen([sys.executable,
os.path.join(os.path.dirname(__file__),
"win_console_handler.py"), tagname],
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
# Let the interpreter startup before we send signals. See #3137.
count, max = 0, 100
while count < max and proc.poll() is None:
if m[0] == 1:
break
time.sleep(0.1)
count += 1
else:
# Forcefully kill the process if we weren't able to signal it.
os.kill(proc.pid, signal.SIGINT)
self.fail("Subprocess didn't finish initialization")
os.kill(proc.pid, event)
# proc.send_signal(event) could also be done here.
# Allow time for the signal to be passed and the process to exit.
time.sleep(0.5)
if not proc.poll():
# Forcefully kill the process if we weren't able to signal it.
os.kill(proc.pid, signal.SIGINT)
self.fail("subprocess did not stop on {}".format(name))
@unittest.skip("subprocesses aren't inheriting Ctrl+C property")
def test_CTRL_C_EVENT(self):
from ctypes import wintypes
import ctypes
# Make a NULL value by creating a pointer with no argument.
NULL = ctypes.POINTER(ctypes.c_int)()
SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
wintypes.BOOL)
SetConsoleCtrlHandler.restype = wintypes.BOOL
# Calling this with NULL and FALSE causes the calling process to
# handle Ctrl+C, rather than ignore it. This property is inherited
# by subprocesses.
SetConsoleCtrlHandler(NULL, 0)
self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
def test_CTRL_BREAK_EVENT(self):
self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32ListdirTests(unittest.TestCase):
"""Test listdir on Windows."""
def setUp(self):
self.created_paths = []
for i in range(2):
dir_name = 'SUB%d' % i
dir_path = os.path.join(support.TESTFN, dir_name)
file_name = 'FILE%d' % i
file_path = os.path.join(support.TESTFN, file_name)
os.makedirs(dir_path)
with open(file_path, 'w') as f:
f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
self.created_paths.extend([dir_name, file_name])
self.created_paths.sort()
def tearDown(self):
shutil.rmtree(support.TESTFN)
def test_listdir_no_extended_path(self):
"""Test when the path is not an "extended" path."""
# unicode
self.assertEqual(
sorted(os.listdir(support.TESTFN)),
self.created_paths)
# bytes
self.assertEqual(
sorted(os.listdir(os.fsencode(support.TESTFN))),
[os.fsencode(path) for path in self.created_paths])
def test_listdir_extended_path(self):
"""Test when the path starts with '\\\\?\\'."""
# See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
# unicode
path = '\\\\?\\' + os.path.abspath(support.TESTFN)
self.assertEqual(
sorted(os.listdir(path)),
self.created_paths)
# bytes
path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
self.assertEqual(
sorted(os.listdir(path)),
[os.fsencode(path) for path in self.created_paths])
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@support.skip_unless_symlink
class Win32SymlinkTests(unittest.TestCase):
filelink = 'filelinktest'
filelink_target = os.path.abspath(__file__)
dirlink = 'dirlinktest'
dirlink_target = os.path.dirname(filelink_target)
missing_link = 'missing link'
def setUp(self):
assert os.path.exists(self.dirlink_target)
assert os.path.exists(self.filelink_target)
assert not os.path.exists(self.dirlink)
assert not os.path.exists(self.filelink)
assert not os.path.exists(self.missing_link)
def tearDown(self):
if os.path.exists(self.filelink):
os.remove(self.filelink)
if os.path.exists(self.dirlink):
os.rmdir(self.dirlink)
if os.path.lexists(self.missing_link):
os.remove(self.missing_link)
def test_directory_link(self):
os.symlink(self.dirlink_target, self.dirlink)
self.assertTrue(os.path.exists(self.dirlink))
self.assertTrue(os.path.isdir(self.dirlink))
self.assertTrue(os.path.islink(self.dirlink))
self.check_stat(self.dirlink, self.dirlink_target)
def test_file_link(self):
os.symlink(self.filelink_target, self.filelink)
self.assertTrue(os.path.exists(self.filelink))
self.assertTrue(os.path.isfile(self.filelink))
self.assertTrue(os.path.islink(self.filelink))
self.check_stat(self.filelink, self.filelink_target)
def _create_missing_dir_link(self):
'Create a "directory" link to a non-existent target'
linkname = self.missing_link
if os.path.lexists(linkname):
os.remove(linkname)
target = r'c:\\target does not exist.29r3c740'
assert not os.path.exists(target)
target_is_dir = True
os.symlink(target, linkname, target_is_dir)
def test_remove_directory_link_to_missing_target(self):
self._create_missing_dir_link()
# For compatibility with Unix, os.remove will check the
# directory status and call RemoveDirectory if the symlink
# was created with target_is_dir==True.
os.remove(self.missing_link)
@unittest.skip("currently fails; consider for improvement")
def test_isdir_on_directory_link_to_missing_target(self):
self._create_missing_dir_link()
# consider having isdir return true for directory links
self.assertTrue(os.path.isdir(self.missing_link))
@unittest.skip("currently fails; consider for improvement")
def test_rmdir_on_directory_link_to_missing_target(self):
self._create_missing_dir_link()
# consider allowing rmdir to remove directory links
os.rmdir(self.missing_link)
def check_stat(self, link, target):
self.assertEqual(os.stat(link), os.stat(target))
self.assertNotEqual(os.lstat(link), os.stat(link))
bytes_link = os.fsencode(link)
self.assertEqual(os.stat(bytes_link), os.stat(target))
self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
def test_12084(self):
level1 = os.path.abspath(support.TESTFN)
level2 = os.path.join(level1, "level2")
level3 = os.path.join(level2, "level3")
self.addCleanup(support.rmtree, level1)
os.mkdir(level1)
os.mkdir(level2)
os.mkdir(level3)
file1 = os.path.abspath(os.path.join(level1, "file1"))
create_file(file1)
orig_dir = os.getcwd()
try:
os.chdir(level2)
link = os.path.join(level2, "link")
os.symlink(os.path.relpath(file1), "link")
self.assertIn("link", os.listdir(os.getcwd()))
# Check os.stat calls from the same dir as the link
self.assertEqual(os.stat(file1), os.stat("link"))
# Check os.stat calls from a dir below the link
os.chdir(level1)
self.assertEqual(os.stat(file1),
os.stat(os.path.relpath(link)))
# Check os.stat calls from a dir above the link
os.chdir(level3)
self.assertEqual(os.stat(file1),
os.stat(os.path.relpath(link)))
finally:
os.chdir(orig_dir)
@unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
and os.path.exists(r'C:\ProgramData'),
'Test directories not found')
def test_29248(self):
# os.symlink() calls CreateSymbolicLink, which creates
# the reparse data buffer with the print name stored
# first, so the offset is always 0. CreateSymbolicLink
# stores the "PrintName" DOS path (e.g. "C:\") first,
# with an offset of 0, followed by the "SubstituteName"
# NT path (e.g. "\??\C:\"). The "All Users" link, on
# the other hand, seems to have been created manually
# with an inverted order.
target = os.readlink(r'C:\Users\All Users')
self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
def test_buffer_overflow(self):
# Older versions would have a buffer overflow when detecting
# whether a link source was a directory. This test ensures we
# no longer crash, but does not otherwise validate the behavior
segment = 'X' * 27
path = os.path.join(*[segment] * 10)
test_cases = [
# overflow with absolute src
('\\' + path, segment),
# overflow dest with relative src
(segment, path),
# overflow when joining src
(path[:180], path[:180]),
]
for src, dest in test_cases:
try:
os.symlink(src, dest)
except FileNotFoundError:
pass
else:
try:
os.remove(dest)
except OSError:
pass
# Also test with bytes, since that is a separate code path.
try:
os.symlink(os.fsencode(src), os.fsencode(dest))
except FileNotFoundError:
pass
else:
try:
os.remove(dest)
except OSError:
pass
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32JunctionTests(unittest.TestCase):
junction = 'junctiontest'
junction_target = os.path.dirname(os.path.abspath(__file__))
def setUp(self):
assert os.path.exists(self.junction_target)
assert not os.path.exists(self.junction)
def tearDown(self):
if os.path.exists(self.junction):
# os.rmdir delegates to Windows' RemoveDirectoryW,
# which removes junction points safely.
os.rmdir(self.junction)
def test_create_junction(self):
_winapi.CreateJunction(self.junction_target, self.junction)
self.assertTrue(os.path.exists(self.junction))
self.assertTrue(os.path.isdir(self.junction))
# Junctions are not recognized as links.
self.assertFalse(os.path.islink(self.junction))
def test_unlink_removes_junction(self):
_winapi.CreateJunction(self.junction_target, self.junction)
self.assertTrue(os.path.exists(self.junction))
os.unlink(self.junction)
self.assertFalse(os.path.exists(self.junction))
@support.skip_unless_symlink
class NonLocalSymlinkTests(unittest.TestCase):