o
    0hB0                     @   s   d dl Z d dlmZ d dlmZmZ d dlmZmZm	Z	m
Z
 d dlZd dlmZ d dlmZmZ d dlmZ g dZe
d	d
dZdefddZedG dd dee ZedG dd deZedG dd dee ZdS )    N)defaultdict)IteratorSized)AnyCallableOptionalTypeVar)functional_datapipe)	DataChunkIterDataPipe)_check_unpickable_fn)BatcherIterDataPipeGrouperIterDataPipeUnBatcherIterDataPipe_T_coT)	covariantnamec                 C   sN   | dv rt jd|  d|  dtdd ttjjjjj	| S t
dt d|  )	N)SHARDING_PRIORITIESShardingFilterIterDataPipe`zc` from `torch.utils.data.datapipes.iter.grouping` is going to be removed in PyTorch 2.1Please use `z5` from the `torch.utils.data.datapipes.iter.sharding`   )category
stacklevelzmodule z has no attribute )warningswarnFutureWarninggetattrtorchutilsdata	datapipesitershardingAttributeError__name__)r    r%   \/var/www/vscode/kcb/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/grouping.py__getattr__   s   r'   batchc                       sz   e Zd ZU dZeed< eed< eed< defdededede	e ddf
 fd	d
Z
dee fddZdefddZ  ZS )r   a2  
    Creates mini-batches of data (functional name: ``batch``).

    An outer dimension will be added as ``batch_size`` if ``drop_last`` is set to ``True``, or ``length % batch_size`` for the
    last batch if ``drop_last`` is set to ``False``.

    Args:
        datapipe: Iterable DataPipe being batched
        batch_size: The size of each batch
        drop_last: Option to drop the last batch if it's not full
        wrapper_class: wrapper to apply onto each batch (type ``List``) before yielding,
            defaults to ``DataChunk``

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> dp = IterableWrapper(range(10))
        >>> dp = dp.batch(batch_size=3, drop_last=True)
        >>> list(dp)
        [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    datapipe
batch_size	drop_lastFwrapper_classreturnNc                    s6   |dksJ dt    || _|| _|| _|| _d S )Nr   z+Batch size is required to be larger than 0!)super__init__r)   r*   r+   r,   )selfr)   r*   r+   r,   	__class__r%   r&   r/   A   s   

zBatcherIterDataPipe.__init__c                 c   sd    g }| j D ]}|| t|| jkr| |V  g }qt|dkr.| js0| |V  d S d S d S Nr   )r)   appendlenr*   r,   r+   )r0   r(   xr%   r%   r&   __iter__O   s   

zBatcherIterDataPipe.__iter__c                 C   sP   t | jtr| jrt| j| j S t| j| j d | j S tt| j d)N   z# instance doesn't have valid length)	
isinstancer)   r   r+   r5   r*   	TypeErrortyper$   r0   r%   r%   r&   __len__Z   s
   zBatcherIterDataPipe.__len__)r$   
__module____qualname____doc__r   __annotations__intboolr
   r;   r/   r   r7   r=   __classcell__r%   r%   r1   r&   r   %   s(   
 r   unbatchc                   @   s4   e Zd ZdZddedefddZdd Zd	d
 ZdS )r   a   
    Undos batching of data (functional name: ``unbatch``).

    In other words, it flattens the data up to the specified level within a batched DataPipe.

    Args:
        datapipe: Iterable DataPipe being un-batched
        unbatch_level: Defaults to ``1`` (only flattening the top level). If set to ``2``,
            it will flatten the top two levels, and ``-1`` will flatten the entire DataPipe.

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> source_dp = IterableWrapper([[[0, 1], [2]], [[3, 4], [5]], [[6]]])
        >>> dp1 = source_dp.unbatch()
        >>> list(dp1)
        [[0, 1], [2], [3, 4], [5], [6]]
        >>> dp2 = source_dp.unbatch(unbatch_level=2)
        >>> list(dp2)
        [0, 1, 2, 3, 4, 5, 6]
    r8   r)   unbatch_levelc                 C   s   || _ || _d S N)r)   rF   )r0   r)   rF   r%   r%   r&   r/   |   s   
zUnBatcherIterDataPipe.__init__c                 c   s(    | j D ]}| j|| jdE d H  qd S )NrF   )r)   _diverF   )r0   elementr%   r%   r&   r7      s   
zUnBatcherIterDataPipe.__iter__c                 c   s    |dk r	t d|dkr*t|ttfr%|D ]}| j|ddE d H  qd S |V  d S |dkr3|V  d S t|ttfrM|D ]}| j||d dE d H  q<d S td| j d)Nz unbatch_level must be -1 or >= 0rH   r   r8   zunbatch_level z" exceeds the depth of the DataPipe)
ValueErrorr9   listr
   rI   
IndexErrorrF   )r0   rJ   rF   itemr%   r%   r&   rI      s$   

zUnBatcherIterDataPipe._diveN)r8   )	r$   r>   r?   r@   r   rB   r/   r7   rI   r%   r%   r%   r&   r   d   s
    r   groupbyc                   @   s   e Zd ZdZdddddddee deegef ded	e	d
e
e	 de
e	 defddZdd Zdd ZdddZdd Zdd Zdd ZdS )r   a!
  
    Groups data from IterDataPipe by keys from ``group_key_fn``, yielding a ``DataChunk`` with batch size up to ``group_size``.

    (functional name: ``groupby``).

    The samples are read sequentially from the source ``datapipe``, and a batch of samples belonging to the same group
    will be yielded as soon as the size of the batch reaches ``group_size``. When the buffer is full,
    the DataPipe will yield the largest batch with the same key, provided that its size is larger
    than ``guaranteed_group_size``. If its size is smaller, it will be dropped if ``drop_remaining=True``.

    After iterating through the entirety of source ``datapipe``, everything not dropped due to the buffer capacity
    will be yielded from the buffer, even if the group sizes are smaller than ``guaranteed_group_size``.

    Args:
        datapipe: Iterable datapipe to be grouped
        group_key_fn: Function used to generate group key from the data of the source datapipe
        keep_key: Option to yield the matching key along with the items in a tuple,
            resulting in `(key, [items])` otherwise returning [items]
        buffer_size: The size of buffer for ungrouped data
        group_size: The max size of each group, a batch is yielded as soon as it reaches this size
        guaranteed_group_size: The guaranteed minimum group size to be yielded in case the buffer is full
        drop_remaining: Specifies if the group smaller than ``guaranteed_group_size`` will be dropped from buffer
            when the buffer is full

    Example:
        >>> import os
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> def group_fn(file):
        ...     return os.path.basename(file).split(".")[0]
        >>> source_dp = IterableWrapper(["a.png", "b.png", "a.json", "b.json", "a.jpg", "c.json"])
        >>> dp0 = source_dp.groupby(group_key_fn=group_fn)
        >>> list(dp0)
        [['a.png', 'a.json', 'a.jpg'], ['b.png', 'b.json'], ['c.json']]
        >>> # A group is yielded as soon as its size equals to `group_size`
        >>> dp1 = source_dp.groupby(group_key_fn=group_fn, group_size=2)
        >>> list(dp1)
        [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
        >>> # Scenario where `buffer` is full, and group 'a' needs to be yielded since its size > `guaranteed_group_size`
        >>> dp2 = source_dp.groupby(group_key_fn=group_fn, buffer_size=3, group_size=3, guaranteed_group_size=2)
        >>> list(dp2)
        [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
    Fi'  N)keep_keybuffer_size
group_sizeguaranteed_group_sizedrop_remainingr)   group_key_fnrQ   rR   rS   rT   rU   c                C   s   t | || _|| _|| _|| _tt| _d| _|| _	d | _
|d ur7|d ur7d|  k r1|ks4J  J || _
|d urP|d urKd|  k rJ|ksMJ  J || _
|| _t| _d S r3   )r   r)   rV   rQ   max_buffer_sizer   rM   buffer_elementscurr_buffer_sizerS   rT   rU   r
   r,   )r0   r)   rV   rQ   rR   rS   rT   rU   r%   r%   r&   r/      s"   
$
zGrouperIterDataPipe.__init__c                 C   s   d }d}d }| j  D ]}t| j | |krt| j | }|}q| jd ur7|| jk r7| js7tdt| j | | jd u sA|| jkrF| j | }|  j|8  _| j |= |S )Nr   zFailed to group items)rX   keysr5   rT   rU   RuntimeErrorstrrY   )r0   biggest_keybiggest_sizeresult_to_yieldfindkeyr%   r%   r&   _remove_biggest_key   s*   




z'GrouperIterDataPipe._remove_biggest_keyc                 c   s"   | j D ]d}| |}| j| | |  jd7  _| jd urK| jt| j| krK| | j| }| jr8||fn|V  |  jt| j| 8  _| j|= | j| j	krh| 
 }|d urh| |}| jre||fn|V  qt| j D ]}| | j|}|  jt|8  _| jr||fn|V  qpd S )Nr8   )r)   rV   rX   r4   rY   rS   r5   r,   rQ   rW   ra   tuplerZ   pop)r0   r6   keyresultr_   r%   r%   r&   r7     s.   


zGrouperIterDataPipe.__iter__r-   c                 C   s   d| _ tt| _d S r3   )rY   r   rM   rX   r<   r%   r%   r&   reset  s   zGrouperIterDataPipe.resetc              
   C   sD   | j | j| j| j| j| j| j| j| j| j	f
}t
jd ur t
|S |S rG   )r)   rV   rQ   rW   rS   rT   rU   r,   _valid_iterator_id_number_of_samples_yieldedr   getstate_hookr0   stater%   r%   r&   __getstate__   s   

z GrouperIterDataPipe.__getstate__c                 C   s@   |\
| _ | _| _| _| _| _| _| _| _| _	d| _
tt| _d S r3   )r)   rV   rQ   rW   rS   rT   rU   r,   rg   rh   rY   r   rM   rX   rj   r%   r%   r&   __setstate__1  s   z GrouperIterDataPipe.__setstate__c                 C   s   | j   d S rG   )rX   clearr<   r%   r%   r&   __del__A  s   zGrouperIterDataPipe.__del__)r-   N)r$   r>   r?   r@   r   r   r   r   rC   rB   r   r/   ra   r7   rf   rl   rm   ro   r%   r%   r%   r&   r      s8    1	

r   )r   collectionsr   collections.abcr   r   typingr   r   r   r   (torch.utils.data.datapipes.iter.shardingr   %torch.utils.data.datapipes._decoratorr	   #torch.utils.data.datapipes.datapiper
   r   'torch.utils.data.datapipes.utils.commonr   __all__r   r\   r'   r   r   r   r%   r%   r%   r&   <module>   s"   >4