o
    Hh$?                     @   s   d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	m
Z
 d dlZd dlmZ d dlmZmZmZ d dlmZ e eZdeeef fdd	Zd
ejdefddZG dd deZG dd deZdS )    N)abcdefaultdict)Iterable)AnyOptionaloverloadUnion)_MultiDeviceReplicator
GradScalerOptState)ProcessGroupreturnc                   C   s   t ji dS )N)stagefound_inf_per_device)r   READY r   r   ^/var/www/vscode/kcb/lib/python3.10/site-packages/torch/distributed/fsdp/sharded_grad_scaler.py_refresh_per_optimizer_state   s   r   tensorc                 C   s$   | j p| jjdddddtj fv S )Nxlacpuhpumtiaxpu)is_cudadevicetypetorch_C_get_privateuse1_backend_name)r   r   r   r   _is_supported_device   s   r    c                   @   s$   e Zd ZdZdejddfddZdS )_GeneralMultiDeviceReplicatorz
    Lazily serves tensor to request device. This class extends
    _MultiDeviceReplicator to allow support for "cpu" as a device.
    master_tensorr   Nc                 C   s   t |sJ || _i | _d S N)r    master_per_device_tensors)selfr"   r   r   r   __init__%   s   
z&_GeneralMultiDeviceReplicator.__init__)__name__
__module____qualname____doc__r   Tensorr'   r   r   r   r   r!      s    r!   c                       s  e Zd ZdZddddddejjfded	ed
edede	de
dee ddf fddZedejdejfddZedeej deej fddZedeejdf deejdf fddZedeej deej fddZdeejeej f deejeej f fddZ	d(dejjdejdejde
deejejf f
dd Zdejjddfd!d"Zdejddfd#d$Zd)d%eeeejf  ddfd&d'Z  ZS )*ShardedGradScaleraA	  
    ShardedGradScaler helps perform gradient scaling in a shard aware manner. It extends
    functionality from GradScaler:
    * Supports Pytorch DDP and FSDP implementations
    * Support CPU offloaded tensors (as used in fully sharded data parallel[FSDP])
    * Supports the custom Mixed Precision loss dtype (fp16, bf16) that FSDP returns
    * Sync inf/nan for scaled gradient tensors on any torch.device (where tensors are placed) across
    nodes

    Example::

        # Creates a ShardedGradScaler once at the beginning of training.
        scaler = ShardedGradScaler()

        for epoch in epochs:
            for input, target in data:
                optimizer.zero_grad()
                output = model(input)
                loss = loss_fn(output, target)

                # Scales loss.  Calls backward() on scaled loss to create scaled gradients.
                scaler.scale(loss).backward()

                # scaler.step() first unscales gradients of the optimizer's params.
                # If gradients don't contain infs/NaNs, optimizer.step() is then called,
                # otherwise, optimizer.step() is skipped.
                scaler.step(optimizer)

                # Updates the scale for next iteration.
                scaler.update()

    See :class:`GradScaler` for explanation of scaling/unscaling and more use cases.

    Args:
        init_scale (float, optional, default=2.**16):  Initial scale factor.
        growth_factor (float, optional, default=2.0):  Factor by which the scale is multiplied during
            :meth:`update` if no inf/NaN gradients occur for ``growth_interval`` consecutive iterations.
        backoff_factor (float, optional, default=0.5):  Factor by which the scale is multiplied during
            :meth:`update` if inf/NaN gradients occur in an iteration.
        growth_interval (int, optional, default=2000):  Number of consecutive iterations without inf/NaN gradients
            that must occur for the scale to be multiplied by ``growth_factor``.
        enabled (bool, optional):  If ``False``, disables gradient scaling. :meth:`step` simply
            invokes the underlying ``optimizer.step()``, and other methods become no-ops.
            Default: ``True``
        process_group (ProcessGroup, optional, default=torch.distributed.group.WORLD):
            process group for sharding
    cudag      @g      ?g       @i  Tr   
init_scalebackoff_factorgrowth_factorgrowth_intervalenabledprocess_groupr   Nc                    s6   t  j||||||d | jr|| _tt| _d S d S )N)r/   r0   r1   r2   r3   )superr'   _enabledr4   r   r   _per_optimizer_states)r&   r   r/   r0   r1   r2   r3   r4   	__class__r   r   r'   \   s   
zShardedGradScaler.__init__outputsc                 C      d S r#   r   r&   r:   r   r   r   scaler      zShardedGradScaler.scalec                 C   r;   r#   r   r<   r   r   r   r=   u   r>   .c                 C   r;   r#   r   r<   r   r   r   r=   x   r>   c                 C   r;   r#   r   r<   r   r   r   r=   {   r>   c                    s   j s|S t|tjr4t|sJ jd u r|j jd us#J |jj|jdd }|	|j
S g dttjttj f f fdd  |S )NTr   non_blockingvalc                    s   t | tjr<t| sJ tdkr,jd u r| j jd us$J t	j | d 
| j }|| jS t | tjrVt | }t | ttfrTt| |S |S td)Nr   z2outputs must be a Tensor or an iterable of Tensors)
isinstancer   r,   r    len_scale_lazy_init_scale_growth_trackerr   appendr!   getr   dtyper   r   maplisttuple
ValueError)rA   
scaled_valiteratorapply_scaler&   stashr   r   rP      s   

z,ShardedGradScaler.scale.<locals>.apply_scale)r6   rB   r   r,   r    rD   rE   r   tor   rH   r   r   )r&   r:   scaled_outputr   rO   r   r=   ~   s   
&	optimizer	inv_scale	found_inf
allow_fp16c              
   C   sD  t |}t |}tdd }t t |jD ]K}|d D ]D}	|	jd u r$q|s1|	jjtjkr1td|	jj	rR|	jjtju rL|	j
tj }
|

tj|	_|	j }n|	j}||j |j | qq| D ]\}}| D ]}t||||| qnqfW d    n1 sw   Y  |js| jd usJ || jj |jS )Nc                   S   s   t tS r#   )r   rJ   r   r   r   r   <lambda>   s    z3ShardedGradScaler._unscale_grads_.<locals>.<lambda>paramsz%Attempting to unscale FP16 gradients.)r!   r   r   no_gradparam_groupsgradrH   float16rL   	is_sparser   float32coalesce_valuesr   rF   itemsvalues*_amp_foreach_non_finite_check_and_unscale_rG   r%   rD   )r&   rT   rU   rV   rW   per_device_inv_scaleper_device_found_infper_device_and_dtype_gradsgroupparamparam_grad_fp32
to_unscaler   per_dtype_gradsgradsr   r   r   _unscale_grads_   sF   



"z!ShardedGradScaler._unscale_grads_c           
      C   sb  | j sd S | d | jt| }|d tju rtd|d tju r'td| jd us.J | j	 
  }tjddtj| jjd}| |||d|d	< tj|d< | jt| }g }g }g }|d	  D ]6}| jd
kr|jjd
kr|| || j}|| |tj|d| jd qe|tj|d| jd qe|D ]}	|	  q|rt|| d S d S )Nunscale_r   zMunscale_() has already been called on this optimizer since the last update().z(unscale_() is being called after step().)   g        )rH   r   Tr   r   )async_oprh   )r6   _check_scale_growth_trackerr7   idr   UNSCALEDRuntimeErrorSTEPPEDrD   double
reciprocalfloatr   fullr_   r   rn   rc   _devicer   rF   rR   dist
all_reducer4   wait_foreach_copy_)
r&   rT   optimizer_staterU   rV   worksfound_inf_on_cpusfound_inf_on_devicesfound_inf_on_deviceworkr   r   r   ro      sR   




zShardedGradScaler.unscale_c                 C   s   | j dur
| jdusJ | dkr"|  j | j9  _ | jd dS | jd }|| jkr<|  j | j9  _ | jd dS || _dS )z
        If found_inf is 1.0 (True), then scale is multiplied by backoff_factor and growth_tracker is set to zero.
        Otherwise, scale is multiplied by the growth factor when the growth interval is reached.
        Ng      ?r   rp   )rD   _growth_trackeritem_backoff_factorfill__growth_interval_growth_factor)r&   rV   
successfulr   r   r   _amp_update_scale_cpu_  s   


z(ShardedGradScaler._amp_update_scale_cpu_	new_scalec                    s*  | j sdS | d\ }|durCt|tr| j| nrd}|jj| jks)J ||	 dks3J ||j
du s<J || j| nK fdd| j D }t|dksYJ d	|d }t|dkrstdt|D ]}||| 7 }qj jjd
kr| | nt| j| j|| j| j| j tt| _dS )a  
        Updates the scale factor.
        If any optimizer steps were skipped the scale is multiplied by ``backoff_factor``
        to reduce it. If ``growth_interval`` unskipped iterations occurred consecutively,
        the scale is multiplied by ``growth_factor`` to increase it.
        Passing ``new_scale`` sets the new scale value manually. (``new_scale`` is not
        used directly, it's used to fill GradScaler's internal scale tensor. So if
        ``new_scale`` was a tensor, later in-place changes to that tensor will not further
        affect the scale GradScaler uses internally.)
        Args:
            new_scale (float or :class:`torch.Tensor`, optional, default=None):  New scale factor.
        .. warning::
            :meth:`update` should only be called at the end of the iteration, after ``scaler.step(optimizer)`` has
            been invoked for all optimizers used this iteration.
        Nupdateznew_scale should be a float or a 1-element torch.cuda.FloatTensor or                     torch.FloatTensor with requires_grad=False.rp   Fc                    s.   g | ]}|d    D ]
}|j jddq
qS )r   Tr?   )rc   rR   r   ).0staterV   rD   r   r   
<listcomp>M  s    
z,ShardedGradScaler.update.<locals>.<listcomp>r   z,No inf checks were recorded prior to update.r   )r6   rr   rB   ry   rD   r   r   r   r{   numelrequires_gradcopy_r7   rc   rC   ranger   r   _amp_update_scale_r   r   r   r   r   r   )r&   r   r   reason
found_infsfound_inf_combinedir   r   r   r   '  s>   


zShardedGradScaler.update)Tr#   )r(   r)   r*   r+   r|   rh   WORLDstrry   intboolr   r   r'   r   r   r,   r=   rJ   rK   r   r   optim	Optimizerdictr   rn   ro   r   r   __classcell__r   r   r8   r   r-   +   sl    2	 ( 
0
84*r-   ) loggingcollectionsr   r   collections.abcr   typingr   r   r   r   r   torch.distributeddistributedr|   torch.amp.grad_scalerr	   r
   r   "torch.distributed.distributed_c10dr   	getLoggerr(   loggerr   r   r   r,   r   r    r!   r-   r   r   r   r   <module>   s   
