mirror of
https://github.com/jart/cosmopolitan.git
synced 2025-01-31 11:37:35 +00:00
5ef64dbcdb
These are the commits from https://github.com/ahgamut/cpython/tree/cosmo_py36 squashed for simplicity. Also included is the pyconfig.h used for compilation. The pyconfig.h has to be changed manually in case Cosmopolitan gets new features.
205 lines
6.6 KiB
Python
205 lines
6.6 KiB
Python
import queue
|
|
import sched
|
|
import time
|
|
import unittest
|
|
try:
|
|
import _thread
|
|
import threading
|
|
except ImportError:
|
|
threading = None
|
|
|
|
TIMEOUT = 10
|
|
|
|
|
|
class Timer:
|
|
def __init__(self):
|
|
self._cond = threading.Condition()
|
|
self._time = 0
|
|
self._stop = 0
|
|
|
|
def time(self):
|
|
with self._cond:
|
|
return self._time
|
|
|
|
# increase the time but not beyond the established limit
|
|
def sleep(self, t):
|
|
assert t >= 0
|
|
with self._cond:
|
|
t += self._time
|
|
while self._stop < t:
|
|
self._time = self._stop
|
|
self._cond.wait()
|
|
self._time = t
|
|
|
|
# advance time limit for user code
|
|
def advance(self, t):
|
|
assert t >= 0
|
|
with self._cond:
|
|
self._stop += t
|
|
self._cond.notify_all()
|
|
|
|
|
|
class TestCase(unittest.TestCase):
|
|
|
|
def test_enter(self):
|
|
l = []
|
|
fun = lambda x: l.append(x)
|
|
scheduler = sched.scheduler(time.time, time.sleep)
|
|
for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
|
|
z = scheduler.enter(x, 1, fun, (x,))
|
|
scheduler.run()
|
|
self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
|
|
|
|
def test_enterabs(self):
|
|
l = []
|
|
fun = lambda x: l.append(x)
|
|
scheduler = sched.scheduler(time.time, time.sleep)
|
|
for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
|
|
z = scheduler.enterabs(x, 1, fun, (x,))
|
|
scheduler.run()
|
|
self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
|
|
|
|
@unittest.skipUnless(threading, 'Threading required for this test.')
|
|
def test_enter_concurrent(self):
|
|
q = queue.Queue()
|
|
fun = q.put
|
|
timer = Timer()
|
|
scheduler = sched.scheduler(timer.time, timer.sleep)
|
|
scheduler.enter(1, 1, fun, (1,))
|
|
scheduler.enter(3, 1, fun, (3,))
|
|
t = threading.Thread(target=scheduler.run)
|
|
t.start()
|
|
timer.advance(1)
|
|
self.assertEqual(q.get(timeout=TIMEOUT), 1)
|
|
self.assertTrue(q.empty())
|
|
for x in [4, 5, 2]:
|
|
z = scheduler.enter(x - 1, 1, fun, (x,))
|
|
timer.advance(2)
|
|
self.assertEqual(q.get(timeout=TIMEOUT), 2)
|
|
self.assertEqual(q.get(timeout=TIMEOUT), 3)
|
|
self.assertTrue(q.empty())
|
|
timer.advance(1)
|
|
self.assertEqual(q.get(timeout=TIMEOUT), 4)
|
|
self.assertTrue(q.empty())
|
|
timer.advance(1)
|
|
self.assertEqual(q.get(timeout=TIMEOUT), 5)
|
|
self.assertTrue(q.empty())
|
|
timer.advance(1000)
|
|
t.join(timeout=TIMEOUT)
|
|
self.assertFalse(t.is_alive())
|
|
self.assertTrue(q.empty())
|
|
self.assertEqual(timer.time(), 5)
|
|
|
|
def test_priority(self):
|
|
l = []
|
|
fun = lambda x: l.append(x)
|
|
scheduler = sched.scheduler(time.time, time.sleep)
|
|
for priority in [1, 2, 3, 4, 5]:
|
|
z = scheduler.enterabs(0.01, priority, fun, (priority,))
|
|
scheduler.run()
|
|
self.assertEqual(l, [1, 2, 3, 4, 5])
|
|
|
|
def test_cancel(self):
|
|
l = []
|
|
fun = lambda x: l.append(x)
|
|
scheduler = sched.scheduler(time.time, time.sleep)
|
|
now = time.time()
|
|
event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
|
|
event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
|
|
event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
|
|
event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
|
|
event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
|
|
scheduler.cancel(event1)
|
|
scheduler.cancel(event5)
|
|
scheduler.run()
|
|
self.assertEqual(l, [0.02, 0.03, 0.04])
|
|
|
|
@unittest.skipUnless(threading, 'Threading required for this test.')
|
|
def test_cancel_concurrent(self):
|
|
q = queue.Queue()
|
|
fun = q.put
|
|
timer = Timer()
|
|
scheduler = sched.scheduler(timer.time, timer.sleep)
|
|
now = timer.time()
|
|
event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
|
|
event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
|
|
event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
|
|
event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
|
|
event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
|
|
t = threading.Thread(target=scheduler.run)
|
|
t.start()
|
|
timer.advance(1)
|
|
self.assertEqual(q.get(timeout=TIMEOUT), 1)
|
|
self.assertTrue(q.empty())
|
|
scheduler.cancel(event2)
|
|
scheduler.cancel(event5)
|
|
timer.advance(1)
|
|
self.assertTrue(q.empty())
|
|
timer.advance(1)
|
|
self.assertEqual(q.get(timeout=TIMEOUT), 3)
|
|
self.assertTrue(q.empty())
|
|
timer.advance(1)
|
|
self.assertEqual(q.get(timeout=TIMEOUT), 4)
|
|
self.assertTrue(q.empty())
|
|
timer.advance(1000)
|
|
t.join(timeout=TIMEOUT)
|
|
self.assertFalse(t.is_alive())
|
|
self.assertTrue(q.empty())
|
|
self.assertEqual(timer.time(), 4)
|
|
|
|
def test_empty(self):
|
|
l = []
|
|
fun = lambda x: l.append(x)
|
|
scheduler = sched.scheduler(time.time, time.sleep)
|
|
self.assertTrue(scheduler.empty())
|
|
for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
|
|
z = scheduler.enterabs(x, 1, fun, (x,))
|
|
self.assertFalse(scheduler.empty())
|
|
scheduler.run()
|
|
self.assertTrue(scheduler.empty())
|
|
|
|
def test_queue(self):
|
|
l = []
|
|
fun = lambda x: l.append(x)
|
|
scheduler = sched.scheduler(time.time, time.sleep)
|
|
now = time.time()
|
|
e5 = scheduler.enterabs(now + 0.05, 1, fun)
|
|
e1 = scheduler.enterabs(now + 0.01, 1, fun)
|
|
e2 = scheduler.enterabs(now + 0.02, 1, fun)
|
|
e4 = scheduler.enterabs(now + 0.04, 1, fun)
|
|
e3 = scheduler.enterabs(now + 0.03, 1, fun)
|
|
# queue property is supposed to return an order list of
|
|
# upcoming events
|
|
self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
|
|
|
|
def test_args_kwargs(self):
|
|
seq = []
|
|
def fun(*a, **b):
|
|
seq.append((a, b))
|
|
|
|
now = time.time()
|
|
scheduler = sched.scheduler(time.time, time.sleep)
|
|
scheduler.enterabs(now, 1, fun)
|
|
scheduler.enterabs(now, 1, fun, argument=(1, 2))
|
|
scheduler.enterabs(now, 1, fun, argument=('a', 'b'))
|
|
scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3})
|
|
scheduler.run()
|
|
self.assertCountEqual(seq, [
|
|
((), {}),
|
|
((1, 2), {}),
|
|
(('a', 'b'), {}),
|
|
((1, 2), {'foo': 3})
|
|
])
|
|
|
|
def test_run_non_blocking(self):
|
|
l = []
|
|
fun = lambda x: l.append(x)
|
|
scheduler = sched.scheduler(time.time, time.sleep)
|
|
for x in [10, 9, 8, 7, 6]:
|
|
scheduler.enter(x, 1, fun, (x,))
|
|
scheduler.run(blocking=False)
|
|
self.assertEqual(l, [])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|