o
    h0                     @   s$  d Z ddlZddlZddlZddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZmZmZmZmZ ddlZejrCddlmZmZmZ edZg d	ZG d
d deZG dd deZdededeejf ddfddZG dd dee ZG dd dee ZG dd deZ G dd deZ!dS )a  Asynchronous queues for coroutines. These classes are very similar
to those provided in the standard library's `asyncio package
<https://docs.python.org/3/library/asyncio-queue.html>`_.

.. warning::

   Unlike the standard library's `queue` module, the classes defined here
   are *not* thread-safe. To use these queues from another thread,
   use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
   before calling any queue methods.

    N)genioloop)Future"future_set_result_unless_cancelled)Event)UnionTypeVarGeneric	AwaitableOptional)DequeTupleAny_T)QueuePriorityQueue	LifoQueue	QueueFull
QueueEmptyc                   @      e Zd ZdZdS )r   z:Raised by `.Queue.get_nowait` when the queue has no items.N__name__
__module____qualname____doc__ r   r   B/var/www/vscode/kcb/lib/python3.10/site-packages/tornado/queues.pyr   /       r   c                   @   r   )r   zBRaised by `.Queue.put_nowait` when a queue is at its maximum size.Nr   r   r   r   r   r   5   r   r   futuretimeoutreturnc                    sD   |r d fdd}t j || fdd d S d S )Nr    c                      s      s t  d S d S N)doneset_exceptionr   TimeoutErrorr   )r   r   r   
on_timeout@   s   z _set_timeout.<locals>.on_timeoutc                    s
     S r!   )remove_timeout)_)io_looptimeout_handler   r   <lambda>F   s   
 z_set_timeout.<locals>.<lambda>r    N)r   IOLoopcurrentadd_timeoutadd_done_callback)r   r   r%   r   )r   r(   r)   r   _set_timeout;   s   
r0   c                   @   s(   e Zd Zd	ddZdee fddZdS )
_QueueIteratorq	Queue[_T]r    Nc                 C   s
   || _ d S r!   )r2   )selfr2   r   r   r   __init__J      
z_QueueIterator.__init__c                 C   
   | j  S r!   )r2   getr4   r   r   r   	__anext__M   r6   z_QueueIterator.__anext__)r2   r3   r    N)r   r   r   r5   r
   r   r:   r   r   r   r   r1   I   s    
r1   c                   @   s  e Zd ZdZdZd1deddfddZedefdd	Zdefd
dZ	de
fddZde
fddZ	d2dedeeeejf  ddfddZdeddfddZ	d2deeeejf  dee fddZdefddZd3ddZ	d2deeeejf  ded fddZdee fdd Zd3d!d"Zdefd#d$Zdeddfd%d&Zdeddfd'd(Zd3d)d*Z de!fd+d,Z"de!fd-d.Z#de!fd/d0Z$dS )4r   a  Coordinate producer and consumer coroutines.

    If maxsize is 0 (the default) the queue size is unbounded.

    .. testcode::

        import asyncio
        from tornado.ioloop import IOLoop
        from tornado.queues import Queue

        q = Queue(maxsize=2)

        async def consumer():
            async for item in q:
                try:
                    print('Doing work on %s' % item)
                    await asyncio.sleep(0.01)
                finally:
                    q.task_done()

        async def producer():
            for item in range(5):
                await q.put(item)
                print('Put %s' % item)

        async def main():
            # Start consumer without waiting (since it never finishes).
            IOLoop.current().spawn_callback(consumer)
            await producer()     # Wait for producer to put all tasks.
            await q.join()       # Wait for consumer to finish all tasks.
            print('Done')

        asyncio.run(main())

    .. testoutput::

        Put 0
        Put 1
        Doing work on 0
        Put 2
        Doing work on 1
        Put 3
        Doing work on 2
        Put 4
        Doing work on 3
        Doing work on 4
        Done


    In versions of Python without native coroutines (before 3.5),
    ``consumer()`` could be written as::

        @gen.coroutine
        def consumer():
            while True:
                item = yield q.get()
                try:
                    print('Doing work on %s' % item)
                    yield gen.sleep(0.01)
                finally:
                    q.task_done()

    .. versionchanged:: 4.3
       Added ``async for`` support in Python 3.5.

    Nr   maxsizer    c                 C   sb   |d u rt d|dk rtd|| _|   tg | _tg | _d| _t	 | _
| j
  d S )Nzmaxsize can't be Noner   zmaxsize can't be negative)	TypeError
ValueError_maxsize_initcollectionsdeque_getters_putters_unfinished_tasksr   	_finishedset)r4   r;   r   r   r   r5      s   zQueue.__init__c                 C   s   | j S )z%Number of items allowed in the queue.)r>   r9   r   r   r   r;      s   zQueue.maxsizec                 C   s
   t | jS )zNumber of items in the queue.)len_queuer9   r   r   r   qsize   s   
zQueue.qsizec                 C   s   | j  S r!   rH   r9   r   r   r   empty      zQueue.emptyc                 C   s   | j dkrdS |  | j kS )Nr   F)r;   rI   r9   r   r   r   full   s   
z
Queue.fullitemr   zFuture[None]c                 C   sR   t  }z| | W n ty!   | j||f t|| Y |S w |d |S )a  Put an item into the queue, perhaps waiting until there is room.

        Returns a Future, which raises `tornado.util.TimeoutError` after a
        timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.
        N)r   
put_nowaitr   rC   appendr0   
set_result)r4   rN   r   r   r   r   r   put   s   
z	Queue.putc                 C   s^   |    | jr"|  sJ d| j }| | t||   dS |  r(t| | dS )z{Put an item into the queue without blocking.

        If no free slot is immediately available, raise `QueueFull`.
        z)queue non-empty, why are getters waiting?N)	_consume_expiredrB   rK   popleft_Queue__put_internalr   _getrM   r   )r4   rN   getterr   r   r   rO      s   

zQueue.put_nowaitc                 C   sF   t  }z
||   W |S  ty"   | j| t|| Y |S w )a.  Remove and return an item from the queue.

        Returns an awaitable which resolves once an item is available, or raises
        `tornado.util.TimeoutError` after a timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.

        .. note::

           The ``timeout`` argument of this method differs from that
           of the standard library's `queue.Queue.get`. That method
           interprets numeric values as relative timeouts; this one
           interprets them as absolute deadlines and requires
           ``timedelta`` objects for relative timeouts (consistent
           with other timeouts in Tornado).

        )r   rQ   
get_nowaitr   rB   rP   r0   )r4   r   r   r   r   r   r8      s   z	Queue.getc                 C   s\   |    | jr$|  sJ d| j \}}| | t|d |  S |  r,|  S t)zRemove and return an item from the queue without blocking.

        Return an item if one is immediately available, else raise
        `QueueEmpty`.
        z(queue not full, why are putters waiting?N)	rS   rC   rM   rT   rU   r   rV   rI   r   )r4   rN   putterr   r   r   rX      s   

zQueue.get_nowaitc                 C   s<   | j dkr	td|  j d8  _ | j dkr| j  dS dS )a  Indicate that a formerly enqueued task is complete.

        Used by queue consumers. For each `.get` used to fetch a task, a
        subsequent call to `.task_done` tells the queue that the processing
        on the task is complete.

        If a `.join` is blocking, it resumes when all items have been
        processed; that is, when every `.put` is matched by a `.task_done`.

        Raises `ValueError` if called more times than `.put`.
        r   z!task_done() called too many times   N)rD   r=   rE   rF   r9   r   r   r   	task_done  s   

zQueue.task_donec                 C   s   | j |S )zBlock until all items in the queue are processed.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        )rE   wait)r4   r   r   r   r   join$  s   z
Queue.joinc                 C   s   t | S r!   )r1   r9   r   r   r   	__aiter__.  rL   zQueue.__aiter__c                 C   s   t  | _d S r!   )r@   rA   rH   r9   r   r   r   r?   2  s   zQueue._initc                 C   r7   r!   )rH   rT   r9   r   r   r   rV   5  r6   z
Queue._getc                 C      | j | d S r!   rH   rP   r4   rN   r   r   r   _put8     z
Queue._putc                 C   s&   |  j d7  _ | j  | | d S )NrZ   )rD   rE   clearrb   ra   r   r   r   __put_internal=  s   
zQueue.__put_internalc                 C   s|   | j r| j d d  r| j   | j r| j d d  s| jr8| jd  r<| j  | jr:| jd  s'd S d S d S d S )Nr   rZ   )rC   r"   rT   rB   r9   r   r   r   rS   B  s   

$zQueue._consume_expiredc                 C   s    dt | jtt| |  f S )Nz<%s at %s %s>)typer   hexid_formatr9   r   r   r   __repr__J  s    zQueue.__repr__c                 C   s   dt | j|  f S )Nz<%s %s>)rf   r   ri   r9   r   r   r   __str__M  s   zQueue.__str__c                 C   sn   d| j f }t| dd r|d| j 7 }| jr|dt| j 7 }| jr+|dt| j 7 }| jr5|d| j 7 }|S )Nz
maxsize=%rrH   z	 queue=%rz getters[%s]z putters[%s]z	 tasks=%s)r;   getattrrH   rB   rG   rC   rD   )r4   resultr   r   r   ri   P  s   zQueue._format)r   r!   r+   )%r   r   r   r   rH   intr5   propertyr;   rI   boolrK   rM   r   r   r   floatdatetime	timedeltarR   rO   r
   r8   rX   r[   r]   r1   r^   r?   rV   rb   rU   rS   strrj   rk   ri   r   r   r   r   r   Q   sR    E






r   c                   @   :   e Zd ZdZdddZdeddfddZdefd	d
ZdS )r   a  A `.Queue` that retrieves entries in priority order, lowest first.

    Entries are typically tuples like ``(priority number, data)``.

    .. testcode::

        import asyncio
        from tornado.queues import PriorityQueue

        async def main():
            q = PriorityQueue()
            q.put((1, 'medium-priority item'))
            q.put((0, 'high-priority item'))
            q.put((10, 'low-priority item'))

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        (0, 'high-priority item')
        (1, 'medium-priority item')
        (10, 'low-priority item')
    r    Nc                 C   
   g | _ d S r!   rJ   r9   r   r   r   r?   z  r6   zPriorityQueue._initrN   c                 C   s   t | j| d S r!   )heapqheappushrH   ra   r   r   r   rb   }  s   zPriorityQueue._putc                 C   s   t | jS r!   )rw   heappoprH   r9   r   r   r   rV     s   zPriorityQueue._getr+   r   r   r   r   r?   r   rb   rV   r   r   r   r   r   ]  s
    
r   c                   @   ru   )r   a  A `.Queue` that retrieves the most recently put items first.

    .. testcode::

        import asyncio
        from tornado.queues import LifoQueue

        async def main():
            q = LifoQueue()
            q.put(3)
            q.put(2)
            q.put(1)

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        1
        2
        3
    r    Nc                 C   rv   r!   rJ   r9   r   r   r   r?     r6   zLifoQueue._initrN   c                 C   r_   r!   r`   ra   r   r   r   rb     rc   zLifoQueue._putc                 C   r7   r!   )rH   popr9   r   r   r   rV     r6   zLifoQueue._getr+   rz   r   r   r   r   r     s
    
r   )"r   r@   rr   rw   tornador   r   tornado.concurrentr   r   tornado.locksr   typingr   r   r	   r
   r   TYPE_CHECKINGr   r   r   r   __all__	Exceptionr   r   rq   rs   r0   r1   r   r   r   r   r   r   r   <module>   s8   
  '