mirror of
https://github.com/jart/cosmopolitan.git
synced 2025-05-23 05:42:29 +00:00
python-3.6.zip added from Github
README.cosmo contains the necessary links.
This commit is contained in:
parent
75fc601ff5
commit
0c4c56ff39
4219 changed files with 1968626 additions and 0 deletions
145
third_party/python/Lib/test/test_devpoll.py
vendored
Normal file
145
third_party/python/Lib/test/test_devpoll.py
vendored
Normal file
|
@ -0,0 +1,145 @@
|
|||
# Test case for the select.devpoll() function
|
||||
|
||||
# Initial tests are copied as is from "test_poll.py"
|
||||
|
||||
import os
|
||||
import random
|
||||
import select
|
||||
import unittest
|
||||
from test.support import run_unittest, cpython_only
|
||||
|
||||
if not hasattr(select, 'devpoll') :
|
||||
raise unittest.SkipTest('test works only on Solaris OS family')
|
||||
|
||||
|
||||
def find_ready_matching(ready, flag):
|
||||
match = []
|
||||
for fd, mode in ready:
|
||||
if mode & flag:
|
||||
match.append(fd)
|
||||
return match
|
||||
|
||||
class DevPollTests(unittest.TestCase):
|
||||
|
||||
def test_devpoll1(self):
|
||||
# Basic functional test of poll object
|
||||
# Create a bunch of pipe and test that poll works with them.
|
||||
|
||||
p = select.devpoll()
|
||||
|
||||
NUM_PIPES = 12
|
||||
MSG = b" This is a test."
|
||||
MSG_LEN = len(MSG)
|
||||
readers = []
|
||||
writers = []
|
||||
r2w = {}
|
||||
w2r = {}
|
||||
|
||||
for i in range(NUM_PIPES):
|
||||
rd, wr = os.pipe()
|
||||
p.register(rd)
|
||||
p.modify(rd, select.POLLIN)
|
||||
p.register(wr, select.POLLOUT)
|
||||
readers.append(rd)
|
||||
writers.append(wr)
|
||||
r2w[rd] = wr
|
||||
w2r[wr] = rd
|
||||
|
||||
bufs = []
|
||||
|
||||
while writers:
|
||||
ready = p.poll()
|
||||
ready_writers = find_ready_matching(ready, select.POLLOUT)
|
||||
if not ready_writers:
|
||||
self.fail("no pipes ready for writing")
|
||||
wr = random.choice(ready_writers)
|
||||
os.write(wr, MSG)
|
||||
|
||||
ready = p.poll()
|
||||
ready_readers = find_ready_matching(ready, select.POLLIN)
|
||||
if not ready_readers:
|
||||
self.fail("no pipes ready for reading")
|
||||
self.assertEqual([w2r[wr]], ready_readers)
|
||||
rd = ready_readers[0]
|
||||
buf = os.read(rd, MSG_LEN)
|
||||
self.assertEqual(len(buf), MSG_LEN)
|
||||
bufs.append(buf)
|
||||
os.close(r2w[rd]) ; os.close(rd)
|
||||
p.unregister(r2w[rd])
|
||||
p.unregister(rd)
|
||||
writers.remove(r2w[rd])
|
||||
|
||||
self.assertEqual(bufs, [MSG] * NUM_PIPES)
|
||||
|
||||
def test_timeout_overflow(self):
|
||||
pollster = select.devpoll()
|
||||
w, r = os.pipe()
|
||||
pollster.register(w)
|
||||
|
||||
pollster.poll(-1)
|
||||
self.assertRaises(OverflowError, pollster.poll, -2)
|
||||
self.assertRaises(OverflowError, pollster.poll, -1 << 31)
|
||||
self.assertRaises(OverflowError, pollster.poll, -1 << 64)
|
||||
|
||||
pollster.poll(0)
|
||||
pollster.poll(1)
|
||||
pollster.poll(1 << 30)
|
||||
self.assertRaises(OverflowError, pollster.poll, 1 << 31)
|
||||
self.assertRaises(OverflowError, pollster.poll, 1 << 63)
|
||||
self.assertRaises(OverflowError, pollster.poll, 1 << 64)
|
||||
|
||||
def test_close(self):
|
||||
open_file = open(__file__, "rb")
|
||||
self.addCleanup(open_file.close)
|
||||
fd = open_file.fileno()
|
||||
devpoll = select.devpoll()
|
||||
|
||||
# test fileno() method and closed attribute
|
||||
self.assertIsInstance(devpoll.fileno(), int)
|
||||
self.assertFalse(devpoll.closed)
|
||||
|
||||
# test close()
|
||||
devpoll.close()
|
||||
self.assertTrue(devpoll.closed)
|
||||
self.assertRaises(ValueError, devpoll.fileno)
|
||||
|
||||
# close() can be called more than once
|
||||
devpoll.close()
|
||||
|
||||
# operations must fail with ValueError("I/O operation on closed ...")
|
||||
self.assertRaises(ValueError, devpoll.modify, fd, select.POLLIN)
|
||||
self.assertRaises(ValueError, devpoll.poll)
|
||||
self.assertRaises(ValueError, devpoll.register, fd, fd, select.POLLIN)
|
||||
self.assertRaises(ValueError, devpoll.unregister, fd)
|
||||
|
||||
def test_fd_non_inheritable(self):
|
||||
devpoll = select.devpoll()
|
||||
self.addCleanup(devpoll.close)
|
||||
self.assertEqual(os.get_inheritable(devpoll.fileno()), False)
|
||||
|
||||
def test_events_mask_overflow(self):
|
||||
pollster = select.devpoll()
|
||||
w, r = os.pipe()
|
||||
pollster.register(w)
|
||||
# Issue #17919
|
||||
self.assertRaises(OverflowError, pollster.register, 0, -1)
|
||||
self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
|
||||
self.assertRaises(OverflowError, pollster.modify, 1, -1)
|
||||
self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
|
||||
|
||||
@cpython_only
|
||||
def test_events_mask_overflow_c_limits(self):
|
||||
from _testcapi import USHRT_MAX
|
||||
pollster = select.devpoll()
|
||||
w, r = os.pipe()
|
||||
pollster.register(w)
|
||||
# Issue #17919
|
||||
self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
|
||||
self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
|
||||
|
||||
|
||||
def test_main():
|
||||
run_unittest(DevPollTests)
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_main()
|
Loading…
Add table
Add a link
Reference in a new issue