o
    0h                  	   @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZmZ d dl m	Z	 d dl
mZ d dlmZmZ d dlmZ d dlmZmZmZmZmZmZ d dlmZ d dlZd dlmZ d dlmZ d dl m  m!Z" d d	l#m$Z$ d d
l%m&Z& d dl'm(Z(m)Z)m*Z+ d dl,m-Z- d dl.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6 d dl7m8Z8 d dl9m:Z:m;Z;m<Z< d dl=m>Z>m?Z?m@Z@ d dlAmBZBmCZCmDZDmEZE d dlmFZFmGZG d dlHmIZJ d dlKmLZLmMZMmNZNmOZO d dlPmQZQmRZRmSZSmTZTmUZU d dlVmWZW dZXeSrdZYdZZej[\ ZXneTrdZYdZZneUrdZYdZZej]\ ZXnd ZYd!ZZd"ZXG d#d$ d$eZ^G d%d& d&eZ_G d'd( d(ej`eZad)ej`d*ejbd+efd,d-Zcd.d/ Zd	0	1dd)ej`d2eefd3d4Zfdd5d6Zgd7d8 Zhd9d: Zidd)ej`d;eefd<d=Zjd)ej`d>eefd?d@Zkd)ej`dAeefdBdCZlG dDdE dEZmG dFdG dGeaZnG dHdI dIeaZoG dJdK dKeoZpG dLdM dMeoZqG dNdO dOeaZrG dPdQ dQerZsG dRdS dSej`ZtG dTdU dUeoZuG dVdW dWej`ZvG dXdY dYejwZxG dZd[ d[ej`Zye jzd\efd]d^Z{e jzd_efd`daZ|e jzdbefdcddZ}ee jzdeefdfdgZ~ee jzdhefdidjZee jzdkefdldmZee jzdnefdodpZdqed+edredsefdtduZ	vddwej`dxej`dyeedzf fd{d|ZG d}d~ d~eMZG dd deLZddee fddZG dd dej`ZG dd dej`ZG dd dej`ZdS )    N)ABCabstractmethod)nullcontext)deepcopy)autoEnumwraps)AnyCallablecastno_type_checkOptionalUnion)mock)
checkpoint)
DeviceMesh)
CPUOffloadfully_shardFullyShardedDataParallel)TrainingState)FSDPParamGroupRegisterPostBackwardFunction)#NO_RESHARD_AFTER_FORWARD_STRATEGIES)BackwardPrefetchMixedPrecisionShardingStrategy)ShardedGradScaler)always_wrap_policyModuleWrapPolicywrap)distribute_tensorDTensorShard)ColwiseParallelparallelize_moduleRowwiseParallelSequenceParallel)TransformerDecoderLayerTransformerEncoderLayer)DistributedDataParallel)MultiProcessTestCaseMultiThreadedTestCaserun_subtests
TEST_SKIPS)FILE_SCHEMAget_cycles_per_ms	TEST_CUDATEST_HPUTEST_XPU)
has_triton   cudancclzhpu:0hcclxpuxcclcpugloo   c                   @      e Zd Ze Ze ZdS )FSDPInitModeN)__name__
__module____qualname__r   NO_FSDP	RECURSIVE rE   rE   W/var/www/vscode/kcb/lib/python3.10/site-packages/torch/testing/_internal/common_fsdp.pyr?   V   s    
r?   c                   @   s   e Zd Ze Ze Ze ZdS )DEVICEInitModeN)r@   rA   rB   r   DEVICE_BEFOREDEVICE_AFTERDEVICE_NEVERrE   rE   rE   rF   rG   _   s    
rG   c                   @   sn   e Zd ZdZedeejdf fddZedejfddZ	edd	d
Z
eedededejfddZdS )FSDPTestModelzZThis defines the interface expected from all models used commonly for
    FSDP unit tests.return.c                 C      dS )z+Returns an input for the model as as tuple.NrE   selfdevicerE   rE   rF   	get_inputl      zFSDPTestModel.get_inputc                 C   rM   )z,Returns the loss given the input and output.NrE   )rO   inputoutputrE   rE   rF   get_lossq   rR   zFSDPTestModel.get_lossNc                 C   rM   )z<Runs the backward pass (e.g. including ``loss.backward()``).NrE   rO   lossrE   rE   rF   run_backwardv   rR   zFSDPTestModel.run_backwardargskwargsc                  O   rM   )z&Initializes an instance of this model.NrE   rY   rZ   rE   rE   rF   init{   s   zFSDPTestModel.initrL   N)r@   rA   rB   __doc__r   tupletorchTensorrQ   rU   rX   staticmethodr
   nnModuler\   rE   rE   rE   rF   rK   h   s     rK   modelprocess_group	assert_fnc                 C   s   dd |   D }|dd |  D 7 }t|}dd t|D }tj|||d |d }|dus4J |dd D ]}|dusBJ t||D ]\\}}	\}}
||	|
 qGq:dS )	a  
    All-gathers module states across ranks and calls ``assert_fn`` on each pair
    of corresponding states from rank 0 and a nonzero rank. For example, if
    ``assert_fn`` is ``self.assertEqual()``, then this checks that all module
    states are equal across ranks.
    c                 S       g | ]\}}||   fqS rE   detachr;   ).0
param_nameparamrE   rE   rF   
<listcomp>       z)_assert_module_states.<locals>.<listcomp>c                 S   rh   rE   ri   )rk   buffer_namebufferrE   rE   rF   rn      ro   c                 S   s   g | ]}d qS NrE   )rk   _rE   rE   rF   rn      s    groupr   Nr=   )named_parametersnamed_buffersdistget_world_sizerangeall_gather_objectzip)re   rf   rg   named_module_states
world_sizeolistrank0_statesstaters   p1p2rE   rE   rF   _assert_module_states   s"   
r   c                   C   s
   t tS rr   )r`   rP   DEVICE_TYPErE   rE   rE   rF   get_devtype      
r   FTzero_buffersc              
   C   s   |rt | nt }|Q |  D ]}t  |  W d   n1 s&w   Y  q|rM|  D ]"}t  |  W d   n1 sGw   Y  q2W d   dS W d   dS 1 s`w   Y  dS )zBZeros the parameters and optionally buffers of ``model`` in place.N)FSDPsummon_full_paramsr   
parametersr`   no_gradzero_buffers)re   r   summon_fullctxrm   rq   rE   rE   rF   _zero_model   s"   



"r   c                 C   s"   |s|  t} |r|   |  S rr   )tor   half
state_dict)re   cpu_offloadr   rE   rE   rF   _get_state_dict   s
   
r   c                    s   d  fdd|D S )Nrs   c                    s$   g | ]}|d ur t | ndqS )Nnone)str)rk   stest_name_mappingrE   rF   rn      s   $ z subtest_name.<locals>.<listcomp>)join)r   rY   rE   r   rF   subtest_name   s   r   c                 C   s   |  D ]\}}|jtdkr| ||< q| dkr|nd g}t| ttttj	f |d }|
 D ]}|| t||< q5|S )Nr;   r   )itemsrP   r`   r;   rx   broadcast_object_listr   dictr   ra   keysr   r   )rankr   rl   rm   r   rE   rE   rF   _broadcast_state_dict   s   
r   recursec                 C   sB   t j| |d tt|  W  d   S 1 sw   Y  dS )a[  
    Returns the full unsharded parameters of ``model``. Any FSDP-managed
    parameters offloaded to CPU are moved to GPU in the returned list.

    Args:
        recurse (bool): If ``False``, only unshards the parameters immediate to
            ``model``; if ``True``, recurses through the module hierarchy
            rooted at ``model``.
    )r   N)r   r   r   listr   )re   r   rE   rE   rF   get_full_params   s   
$r   move_to_devicec                 C   s   |r|  tS | S rr   )r   r   )re   r   rE   rE   rF   _move_to_device      r   	wrap_fsdpc                 O   s   |s| S t | g|R i |S rr   r   )re   r   rY   rZ   rE   rE   rF   _maybe_wrap_fsdp   s   r   c                   @   sB   e Zd ZdedefddZdefddZdefdd	Zd
d ZdS )DummyProcessGroupr   sizec                 C   s   || _ || _d S rr   )_rank_size)rO   r   r   rE   rE   rF   __init__   s   
zDummyProcessGroup.__init__rL   c                 C      | j S rr   )r   rO   rE   rE   rF   r         zDummyProcessGroup.rankc                 C   r   rr   )r   r   rE   rE   rF   r      r   zDummyProcessGroup.sizec                 O   s   t  }dd }||_|S )Nc                  S   s   t j } | d | S )Nr=   )r`   futuresFuture
set_result)futurerE   rE   rF   
get_future   s   

z/DummyProcessGroup.allreduce.<locals>.get_future)r   Mockr   )rO   rY   rZ   	dist_waitr   rE   rE   rF   	allreduce   s   zDummyProcessGroup.allreduceN)r@   rA   rB   intr   r   r   r   rE   rE   rE   rF   r      s
    r   c                       s   e Zd Zdejdededef fddZdd Zd	d
 Z	dd Z
dd Ze			ddejdededeeeef  dededeejef fddZdd Z  ZS )TransformerWithSharedParamsru   device_init_modeadd_bndeterministicc                    s   t    | | _| | _|rtd d}d}t||| _	tj
|ddddd| _t||| _| j	j| j_| d| j	j|f | d	tj| jtjd
 d| _|r^tj| jntj | _|tjkrn| t} |rv|   d S d S )Nr               g?)d_modelnum_encoder_layersnum_decoder_layersdim_feedforwarddropout
vocab_biaslong_buffer)dtype)superr   r   r   r~   r`   manual_seedrc   	Embeddingembed_tokensTransformertransformerLinearoutput_projweightregister_buffernew_ones
zeros_liker   longbsBatchNorm1dIdentitybnrG   rH   r   r   eval)rO   ru   r   r   r   d_vocabr   	__class__rE   rF   r     s>   





z$TransformerWithSharedParams.__init__c                 C   sN   t d| j  t jd|dd| j}t j| jd |dd| j}||fS )Nr=      rP      r5   )r`   r   r   arangeviewr   )rO   rP   srctgtrE   rE   rF   rQ   .  s   z%TransformerWithSharedParams.get_inputc                 C   sJ   |  |}|| j | j| }|  |}| |}| ||}| |S rr   )r   r   r   type_asr   r   r   )rO   src_idstgt_idsr   r   xrE   rE   rF   forward4  s   



z#TransformerWithSharedParams.forwardc                 C   s.   |\}}t jj|d|d|dddS )Nsum)	reduction)rc   
functionalcross_entropyr   r   )rO   rS   rT   rs   r   rE   rE   rF   rU   <  s   z$TransformerWithSharedParams.get_lossc                 C      |   d S rr   backwardrV   rE   rE   rF   rX   B     z(TransformerWithSharedParams.run_backwardNFTfsdp_init_modefsdp_kwargsrL   c                 C   s   |du ri }|t jkrt| tr| d }n| }t||||S |t jkrud|vr.ttth}n|	d}d|v rI|d t
jt
jhv rIt| tsId}n| }t| trU| d }	n| }	t|	|||}
t|
|fd|i|}|tjkrs|t}|S td| )au  
        Initializes a :class:`TransformerWithSharedParams` instance.

        Args:
            fsdp_init_mode (FSDPInitMode): If ``NO_FSDP``, then does not wrap
                any modules with FSDP. If ``RECURSIVE``, then wraps with
                top-level FSDP. By default, the top-level FSDP uses the
                ``ModuleWrapPolicy`` for encoder and decoder layers, but a
                different auto wrap policy may be specified via
                ``fsdp_kwargs``.
            device_init_mode (DEVICEInitMode): Determines model movement to DEVICE.
            fsdp_kwargs (Optional[Dict[str, Any]]): Optional keyword arguments
                forwarded to the FSDP constructor.
            deterministic (bool): Whether to make the model deterministic
                across constructions.
            add_bn (bool): Whether to include batch norm in the model.
        Nr   auto_wrap_policysharding_strategyUnsupported FSDP init mode: )r?   rC   
isinstancer_   r   rD   r   r)   r(   popr   HYBRID_SHARD_HYBRID_SHARD_ZERO2r   rG   rI   r   r   
ValueError)ru   r   r   r   r   r   pgr   fsdp_pg
tformer_pgm
fsdp_modelrE   rE   rF   r\   E  sV   









z TransformerWithSharedParams.initc                 C   s   | j gS rr   )r   r   rE   rE   rF   get_ignored_modules  s   z/TransformerWithSharedParams.get_ignored_modules)NFT)r@   rA   rB   rx   ProcessGrouprG   boolr   rQ   r   rU   rX   rb   r?   r   r   r   r
   r   rc   rd   r   r\   r  __classcell__rE   rE   r   rF   r     sD    *Mr   c                       s   e Zd Zdejdededef fddZdd Zd	d
 Z	dd Z
dd Ze		ddejdededeeeef  dedejfddZ  ZS )NestedWrappedModuleru   r   r   r   c                    s   t     | _ | _|tjk} fdd}|r#td t	
tt	dd||t	
|tt	dd|tt	dd||tt	dd|tt	dd|| _d S )Nc                       rt | fi  S | S rr   r   layerr   ru   r   rE   rF   _maybe_wrap     z1NestedWrappedModule.__init__.<locals>._maybe_wrapr   r   r5   r   )r   r   r   r   r~   rG   rH   r`   r   rc   
Sequentialr   r   modulerO   ru   r   r   r   r   r   r  r   r  rF   r     s$   





zNestedWrappedModule.__init__c                 C   s"   t d| j  t jdd|dfS )Nr=   r5   r   r   )r`   r   r   randrN   rE   rE   rF   rQ     s   zNestedWrappedModule.get_inputc                 C   
   |  |S rr   r  rO   r   rE   rE   rF   r     r   zNestedWrappedModule.forwardc                 C   s   |  }|S rr   )r   rO   rS   rT   rW   rE   rE   rF   rU     s   zNestedWrappedModule.get_lossc                 C   r   rr   r   rV   rE   rE   rF   rX     r   z NestedWrappedModule.run_backwardNFr   r   rL   c                 C   sn   |du ri }|t jkrt| d||dS |t jkr0t| fd||d|}|tjkr.|t}|S td| )a  
        Initializes a :class:`NestedWrappedModule` instance.

        Args:
            fsdp_init_mode (FSDPInitMode): If ``NO_FSDP``, then does not wrap
                any modules with FSDP. If ``RECURSIVE``, then wraps some nested
                modules with FSDP but not the top-level module. The model may
                later be wrapped with a top-level FSDP external to this method
                if desired.
            device_init_mode (DEVICEInitMode): Determines model movement to DEVICE.
            fsdp_kwargs (Optional[Dict[str, Any]]): Optional keyword arguments
                forwarded to the FSDP constructor.
            deterministic (bool): Whether to make the model deterministic
                across constructions.
        NFr   r   r   Tr   )	r?   rC   r  rD   rG   rI   r   r   r  )ru   r   r   r   r   r  rE   rE   rF   r\     s.   



zNestedWrappedModule.initNF)r@   rA   rB   rx   r  r	  rG   r   rQ   r   rU   rX   rb   r?   r   r   r   r
   rc   rd   r\   r
  rE   rE   r   rF   r    s<     r  c                       sJ   e Zd Ze		d
dejdededee	e
ef  def
 fdd	Z  ZS )AlwaysWrapNestedWrappedModuleNFru   r   r   r   r   c                    sl   t ttj| tj|||d}|tjkr|S |tjkr4|pi }t|fdti|}|tj	kr2|
t}|S dS )z
        Initializes a :class:`NestedWrappedModule` instance, but unlike
        :meth:`NestedWrappedModule.init`, for the ``RECURSIVE`` init mode, this
        wraps with top-level FSDP and the ``always_wrap_policy()`` auto wrap
        policy.
        )ru   r   r   r   r   r   N)r   r  r\   r?   rC   rD   r   r   rG   rI   r   r   )ru   r   r   r   r   re   r  r   rE   rF   r\     s&   
	


z"AlwaysWrapNestedWrappedModule.initr  )r@   rA   rB   rb   rx   r  r?   rG   r   r   r   r
   r	  r\   r
  rE   rE   r   rF   r    s    r  c                       st   e Zd Zdejdededef fddZedd	d
Z	e		ddejde
dedeeeef  def
ddZ  ZS )NonUniformReqGradNWMru   r   r   r   c                    s   t t|    | _ | _|tjk} fdd}|r%t	d t
tt
dd||t
|tt
dd|tt
dd||t
tt
dd|tt
dd|| _d S )Nc                    r  rr   r   r  r  rE   rF   r  +  r  z2NonUniformReqGradNWM.__init__.<locals>._maybe_wrapr   r   r5   r   )r   r  r   r   r   r~   rG   rH   r`   r   rc   r  r   r   r  r  r   r  rF   r     s,   




zNonUniformReqGradNWM.__init__rL   Nc                 C   s,   |   D ]\}}t||s|d qd S r  )rv   rematchrequires_grad_)re   req_grad_masknprE   rE   rF   _set_nonuniform_req_gradB  s
   
z-NonUniformReqGradNWM._set_nonuniform_req_gradFr   r   c                 C   s   t d}|tjkrt| d||d}t|| |S |tjkrC|du r%i }t| fd||d|}|tjkr;|	t
}t|| |S td| )a  
        Initializes a :class:`NestedWrappedModule` instance, but unlike
        :meth:`NestedWrappedModule.init`, it wraps a second :class:`torch.nn.Sequential`
        container to enable the desired non-uniform ``requires_grad``
        ``use_orig_params=True`` tests. For both ``RECURSIVE`` and ``NO_FSDP``
        init modes, freezes all parameters except the last two to validate
        ``ShardedGradScaler`` support for ranks with no (non-zero sized) local shards in
        FSDP ``use_orig_params=True`` mode.
        zmodule\.2.*\.1.*Fr  NTr   )r  compiler?   rC   r  r$  rD   rG   rI   r   r   r  )ru   r   r   r   r   req_grad_pattern	ddp_modelr  rE   rE   rF   r\   H  s6   




zNonUniformReqGradNWM.initr]   r  )r@   rA   rB   rx   r  r	  rG   r   rb   r$  r?   r   r   r   r
   r\   r
  rE   rE   r   rF   r    s4    *r  c                       sv   e Zd ZdZdejdedef fddZdd Zd	d
 Z	dd Z
dd Zedee dedededef
ddZ  ZS )ModuleWithDelayzThis class wraps a :class:`FSDPTestModel` to optionally add a delay
    after computing the loss and/or before the gradient reduction.r  delay_after_loss_msdelay_before_reduction_msc                    s    t    || _|| _|| _d S rr   )r   r   r)  r*  r  )rO   r  r)  r*  r   rE   rF   r   {  s   

zModuleWithDelay.__init__c                 C   s   | j |S rr   )r  rQ   rN   rE   rE   rF   rQ     r   zModuleWithDelay.get_inputc                 C   r  rr   r  r  rE   rE   rF   r     r   zModuleWithDelay.forwardc                 C   sT   | j ||}| jdkr(tstrt| jd  |S tr(tj	
t| jt   |S Nr     )r  rU   r)  r2   r3   timesleepr1   r`   r6   _sleepr   r0   r  rE   rE   rF   rU     s   
zModuleWithDelay.get_lossc                    sT   t jj  fdd}td| j| W d    d S 1 s#w   Y  d S )Nc                     sN   j dkr trtjtj t   ntstr t	
j d   | i |S r+  )r*  r1   r`   r6   r/  r   r0   r2   r3   r-  r.  r[   orig_reduce_scatterrO   rE   rF   _delayed_reduce_scatter  s   
z=ModuleWithDelay.run_backward.<locals>._delayed_reduce_scatterz'torch.distributed.reduce_scatter_tensor)r`   distributedreduce_scatter_tensorr   patchr  rX   )rO   rW   r2  rE   r0  rF   rX     s   
"zModuleWithDelay.run_backwardmodule_class
model_argsmodel_kwargsc                O   s   t | j|i |||S )aA  
        Args:
            module_class (Type[FSDPTestModel]): Wrapped module class to which
                to add delays.
            model_args: Positional arguments forwarded to the ``module_class``
                ``init()``.
            delay_after_loss_ms (int): Delay after computing the loss/before
                the optimizer step (in ms).
            delay_before_reduction_ms (int): Delay before reduce-scattering
                gradients (in ms).
            model_kwargs: Keyword arguments forwarded to the ``module_class``
                ``init()``.
        )r(  r\   )r6  r)  r*  r7  r8  rE   rE   rF   r\     s
   zModuleWithDelay.init)r@   rA   rB   r^   rc   rd   r   r   rQ   r   rU   rX   rb   typerK   r
   r\   r
  rE   rE   r   rF   r(  w  s2    
r(  c                   @   sR   e Zd Zeejddddfdejdedede	e
eef  ded	ed
efddZdS )NestedWrappedModuleWithDelayNFr   ru   r   r   r   r   r)  r*  c              
   C   s   t jt| ||||||dS )Nru   r   r   r   r   r)  r*  )r(  r\   r  r;  rE   rE   rF   r\     s   
z!NestedWrappedModuleWithDelay.init)r@   rA   rB   rb   rG   rI   rx   r  r?   r   r   r   r
   r	  r   r\   rE   rE   rE   rF   r:    s,    r:  c                       $   e Zd Z fddZdd Z  ZS )DummyDDPc                    s   t    || _d S rr   )r   r   r  )rO   r  r   rE   rF   r     s   

zDummyDDP.__init__c                 O   s   | j |i |S rr   r  rO   rY   rZ   rE   rE   rF   r     s   zDummyDDP.forwardr@   rA   rB   r   r   r
  rE   rE   r   rF   r=    s    r=  c                       s   e Zd Zdejdedededef
 fddZdd	 Z	d
d Z
e			ddejdededeeeef  dedefddZ  ZS )MixtureOfExpertsru   r   r   delay_before_free_msr   c              
      s$  t  j||||d || _|| _|| _|tjk| _|r#t	d| j
  d}d}d}	tt||| j}
tdd |
 D | _|
 D ]}d|_qC|rPt	d	 tt||| j}|rwtj|
 g}t|
|fi |}
t||fi |}ttt|	|| j||
tt||	| j| _d S )
N)ru   r   r   r   *   r   r   r   c                 s       | ]}|  V  qd S rr   )numel)rk   r#  rE   rE   rF   	<genexpr>       z,MixtureOfExperts.__init__.<locals>.<genexpr>Tr   )r   r   ru   rA  r   rG   rH   r   r`   r   r   r   rc   r   r   r   num_expert_paramsexpertr3  	new_groupr   r  r  )rO   ru   r   r   rA  r   r   d_expertd_sharedd_inputrH  r#  sharedexpert_groupr   rE   rF   r     sD   	

zMixtureOfExperts.__init__c                    sx   j dkr7jd }t|tr7tjjjj  fdd}t	
d| |W  d    S 1 s2w   Y  |S )Nr   r   c                     sD   t rtjtjt   ntstrt	
jd   | i |S )Nr,  )r1   r`   r6   r/  r   rA  r0   r2   r3   r-  r.  r[   orig_reshardrO   rE   rF   _delayed_reshard  s   z2MixtureOfExperts.forward.<locals>._delayed_reshardz.torch.distributed.fsdp._runtime_utils._reshard)rA  r  r   r   r`   r3  fsdp_runtime_utils_reshardr   r5  )rO   r   rH  rQ  rE   rO  rF   r     s   


 
zMixtureOfExperts.forwardc                 C   s   |   | jsAt , |  D ]}t|drq|jd ur.|j| j tj	j
|j| jd qW d    d S 1 s:w   Y  d S d S )NrH  rt   )r   r   r`   r   r   hasattrgraddiv_r~   r3  
all_reduceru   )rO   rW   r#  rE   rE   rF   rX   1  s   


"zMixtureOfExperts.run_backwardNFr   r   r   c                 C   sr   |du ri }|t jkrt| d|||dS |t jkr2t| fd|||d|}|tjkr0|t}|S td| )a  
        Initializes a :class:`MixtureOfExperts` instance.

        Args:
            fsdp_init_mode (FSDPInitMode): If ``NO_FSDP``, then does not wrap
                any modules with FSDP. If ``RECURSIVE``, then wraps some nested
                modules with FSDP, including the expert and shared layers, but
                not the top-level module. The model may later be wrapped with a
                top-level FSDP external to this method if desired.
            device_init_mode (DEVICEInitMode): Determines model movement to DEVICE.
            fsdp_kwargs (Optional[Dict[str, Any]]): Optional keyword arguments
                forwarded to the FSDP constructor.
            deterministic (bool): Whether to make the model deterministic
                across constructions.
            delay_before_free_ms (int): Delay before resharding expert
                parameters in the forward pass (in ms).
        NF)r   r   rA  r   Tr   )	r?   rC   r@  rD   rG   rI   r   r   r  )ru   r   r   r   r   rA  r  rE   rE   rF   r\   =  s2   



zMixtureOfExperts.init)NFr   )r@   rA   rB   rx   r  r	  rG   r   r   r   rX   rb   r?   r   r   r   r
   r\   r
  rE   rE   r   rF   r@    s>    4r@  c                       sd   e Zd Z	ddddddedeej ded	ed
ef
 fddZdej	dej	fddZ
dd Z  ZS )MLPNTFr5   )biaswith_bufferdim_multiplierdimrP   rZ  r[  r\  c                   sd   t    tj||| ||d| _tj|| |||d| _|r-| dtj|f|d d S d | _	d S )N)rP   rZ  rq   r   )
r   r   rc   r   in_projout_projr   r`   randnrq   )rO   r]  rP   rZ  r[  r\  r   rE   rF   r   r  s   
	
zMLP.__init__r   rL   c                 C   s@   |  |}t|}| |}t|}| jd ur|| j }|S rr   )r^  Frelur_  rq   )rO   r   zrE   rE   rF   r     s   





zMLP.forwardc                 C   s"   | j d urtjj| j  d S d S rr   )rq   r`   rc   r\   normal_r   rE   rE   rF   reset_parameters  s   
zMLP.reset_parametersrr   )r@   rA   rB   r   r   r`   rP   r	  r   ra   r   re  r
  rE   rE   r   rF   rY  q  s&    	rY  c                       sF   e Zd Zdddedef fddZdeded	ed
d fddZ  ZS )MLPStackF)with_seq_parallelmlp_dimrg  c                   sL   t |ddt |t |ddg}|r|tj|dd t j|  || _d S )N   )r\  FrZ  )rY  appendrc   	LayerNormr   r   rg  )rO   rh  rg  modulesr   rE   rF   r     s   


zMLPStack.__init__tp_meshdp_meshuse_activation_checkpointingrL   c                 K   s   t ddtddt ddtddt dd| jrttddnt d}| jr-tdd|d< t| ||d | D ]}t|tjr?q6|rEt	| t
|fd	|i| q6t
| fd	|i| | S )
NF)use_local_outputr=   )output_layouts)z	0.in_projz
0.out_projz	1.in_projz
1.out_projz	2.in_projz
2.out_proj)sequence_dim3)device_meshparallelize_planmesh)r$   r&   rg  r#   r'   r%   r   rc   rl  r   r   )rO   rn  ro  rp  r   rv  r  rE   rE   rF   parallelize  s(   
zMLPStack.parallelize)	r@   rA   rB   r   r	  r   r   rx  r
  rE   rE   r   rF   rf    s    rf  c                       sV   e Zd ZdZddedef fddZdejde	e
ejejf ejf fd	d
Z  ZS )DoubleLinearz
    This can be used for returning multiple outputs from a module
    (``use_second_linear=True``) or for having an unused module (``False``).
    Tr]  use_second_linearc                    s:   t    t||| _t||| _t | _|| _d S rr   )	r   r   rc   r   lin1lin2ReLUrb  rz  )rO   r]  rz  r   rE   rF   r     s
   


zDoubleLinear.__init__r   rL   c                 C   s6   | j r| | || | |fS | | |S rr   )rz  rb  r{  r|  r  rE   rE   rF   r     s    zDoubleLinear.forwardT)r@   rA   rB   r^   r   r	  r   r`   ra   r   r_   r   r
  rE   rE   r   rF   ry    s    ry  new_all_gather_into_tensorc                 c   B    t j}t   | t _zd V  W t   |t _d S t   |t _w rr   )rx   all_gather_into_tensorbarrier)r  orig_all_gatherrE   rE   rF   patch_all_gather     
r  new_reduce_scatter_tensorc                 c   r  rr   )rx   r4  r  )r  r1  rE   rE   rF   patch_reduce_scatter  r  r  new_all_reducec                 c   r  rr   )rx   rX  r  )r  orig_all_reducerE   rE   rF   patch_all_reduce  r  r  new_unshardc                 c   B    t j}t  | t _zd V  W t  |t _d S t  |t _w rr   )r   unshardrx   r  )r  orig_unshardrE   rE   rF   patch_unshard     
r  new_reshardc                 c   r  rr   )r   reshardrx   r  )r  rP  rE   rE   rF   patch_reshard
  r  r  new_post_backwardc                 c   r  rr   )r   post_backwardrx   r  )r  orig_post_backwardrE   rE   rF   patch_post_backward  r  r  new_backwardc                 c   r  rr   )r   r   rx   r  )r  orig_backwardrE   rE   rF   *patch_register_post_backward_hook_backward$  r  r  r1  rY   rZ   c                 O   sR   t |dkr|d }nd|v r|d }n
td| d| || ||i |S )Nr   rT   z,Cannot get reduce-scatter output from
args: z	
kwargs: )lenAssertionError)clsr1  rg   rY   rZ   rT   rE   rE   rF   reduce_scatter_with_assert1  s   

r  rE   replicated_modulesharded_moduleprefixes_to_ignore.c                 C   s  t | | D ]\\}}\}}|}|D ]}	||	d}q| || | |t t|ts1J |j|j}
}t	|t
dt
dfkrHtdt||
|}| | |  |jd u rd| |j q	| |j t|j|
|}| |jt t|jtsJ | |j |  q	d S )N r   zmFSDP's (Shard(0), Shard(0)) layout differs from distribute_tensor(), so we cannot check for equality using it)r|   rv   replaceassertEqualassertIsInstancer"   r   ru  
placementsr_   r#   r  r!   to_localrV  assertIsNoneassertIsNotNone)r  r  r  r  replicated_namereplicated_paramsharded_namesharded_paramclean_sharded_nameprefixrw  r  sharded_ref_paramsharded_ref_gradrE   rE   rF   check_sharded_parityD  s2   
r  c                       s@   e Zd Zedd Z fddZdd Zdd Zd	d
 Z  Z	S )FSDPTestMultiThreadc                 C      t S rr   DEVICE_COUNTr   rE   rE   rF   r~   f     zFSDPTestMultiThread.world_sizec                    s   t    |   d S rr   )r   setUp_spawn_threadsr   r   rE   rF   r  j  s   
zFSDPTestMultiThread.setUpc                 O      t | g|R i |S rr   r-   r>  rE   rE   rF   r-   n     z FSDPTestMultiThread.run_subtestsc                 C      t j  d S rr   r`   _dynamoresetr   rE   rE   rF   perThreadSetUpq     z"FSDPTestMultiThread.perThreadSetUpc                 C   r  rr   r  r   rE   rE   rF   perThreadTearDownt  r  z%FSDPTestMultiThread.perThreadTearDown)
r@   rA   rB   propertyr~   r  r-   r  r  r
  rE   rE   r   rF   r  e  s    
r  c                $       sj  e Zd Z fddZedd Zedd Zedefdd	Zed
d Z	dd Z
dd Zdd Zdd Zedd Z							d4dejdedededee dedee d ed!ed"eeeef  fd#d$Zdd%d&e dddddddddfd'ee d(ed)ed*ee d+eded,ed-ee  d.ee! dee d/ed0ed ed!ed1eeeef  d"eeeef  f d2d3Z"  Z#S )5FSDPTestc                    s    t    dtjd< |   d S )N0TORCH_NCCL_DESYNC_DEBUG)r   r  osenviron_spawn_processesr   r   rE   rF   r  y  s   

zFSDPTest.setUpc                 C   r  rr   r  r   rE   rE   rF   r~     r  zFSDPTest.world_sizec                 C   s
   t j S rr   )rx   distributed_c10d_get_default_groupr   rE   rE   rF   rf     s   
zFSDPTest.process_grouprL   c                 C   rM   r  rE   r   rE   rE   rF   destroy_pg_upon_exit  rR   zFSDPTest.destroy_pg_upon_exitc                 C   s   t  | j S rr   )r/   	file_namer   rE   rE   rF   init_method  s   zFSDPTest.init_methodc                 C      |  ||j d S rr   )r  r   )rO   r  r   rE   rE   rF   _check_cpu_offload  r   zFSDPTest._check_cpu_offloadc                 C   r  rr   )r  backward_prefetch)rO   r  r  rE   rE   rF   _check_backward_prefetch  r   z!FSDPTest._check_backward_prefetchc                 C   r  rr   )r  forward_prefetch)rO   r  r  rE   rE   rF   _check_forward_prefetch  r   z FSDPTest._check_forward_prefetchc                 O   r  rr   r  r>  rE   rE   rF   r-     r  zFSDPTest.run_subtestsc              
   K   s(  | |}||_ ||_|dd}td|j  d|j  z%|r2tjjjj	
 }tjd|j||d ntj|jtt|j|j d W n ty] }	 zd|	jd	 v rXttd
 j  d }	~	ww d }
|j t }tsitrotj| |g}
tj|
d tj  ||| tj  tj|
d t  d S )Nfake_pgFzdist init r=z, world=fake)backendr~   r   store)r  r  r~   r   	recompiler   backend_unavailable)
device_ids) r   r  getprintr~   r`   testing	_internalr3  r  	FakeStorerx   init_process_groupr  DISTRIBUTED_BACKENDr   RuntimeErrorrY   sysexitr.   	exit_coder  r1   r3   acceleratorset_device_indexr  r  r  run_testdestroy_process_group)r  r   	test_namer  piperZ   rO   r  r  er  	device_idrE   rE   rF   _run  sL   


zFSDPTest._run{Gz?NFre   	num_stepsautocastlrfsdp_cpu_offload
save_modelmixed_precisionenable_sharded_grad_scaleruse_pure_fp16sharded_grad_scaler_kwargsc              	   C   sF  |o|j }t| j}|
d u ri }
tdd|i|
}tjj| |dd}t|D ]}|	  tj
jt|dY |jtt}|	sK|r_t|ts_t|tjrV| }n	tdd |D }|| }|rt|tr|jtvr| D ]}| |jtd qs|j|||}W d    n1 sw   Y  ||}|s|	s|jtjksJ dn'|	r| |jtj nt|tr|d usJ | |j|j n| |jtj |j| |rt|tr| D ]}| |jtd q|| |   |rd	d
 |! " D }t#| |$| q*t|tr|%t&j' |( S )Nenabledg?)r  momentum)r  c                 s   rC  rr   )r   )rk   r   rE   rE   rF   rE    rF  z4FSDPTest._train_for_several_steps.<locals>.<genexpr>r;   zeloss data type should be float32, as the original                     parameter data type is float32.c                 S   s   i | ]	\}}||  qS rE   )clone)rk   kvrE   rE   rF   
<dictcomp>  s    z5FSDPTest._train_for_several_steps.<locals>.<dictcomp>rE   ))offload_paramsnextr   rP   r   r`   optimSGDrz   	zero_gradampr  r   r  rQ   r   r   ra   r   r_   r   r   r  rU   r   scaler   float32float16param_dtyperX   stepupdater   r   r   load_state_dict_assert_stater   IDLErj   )rO   re   r  r  r  r  r  r  r  r  r  cpu_offload_paramsmodel_devicesharded_grad_scalerr  rs   rS   rT   r#  rW   r   rE   rE   rF   _train_for_several_steps  sn   





z!FSDPTest._train_for_several_stepsr   Tmodel_classr   r   ref_init_fn	num_itersr   r  r   r  use_orig_paramsinit_kwargsc           "      K   s  |t jks	J d|du ri }d}| j }|j| jt jtjfddi|}|du r>tr5t|t	gt	d}nt||g|d}n||}|rH|
 }| j|||
du|||
|||d	}t| }||||	|
||d z|j| j|||fddi|}W n ty } ztd	| d
t| |d}~ww t|tst|| jfi |}|r|
 }|tjkr|t	}|duo|j}|o|tjk}|o|tjk}|rtd}| D ]	}| |j| q|r| tdt	 nt }| | j||d||||
|||d
} W d   n	1 s	w   Y  |rdS |r/td}| D ]
}| |j| q| t	} t|}!tjj|| dd |
du rO|sQ| j||!ddd dS dS dS )a  
        Tests FSDP training against a reference, which defaults to DDP but
        may be customized with ``ref_init_fn``.

        Args:
            model_class (Type[FSDPTestModel]): A model class that inherits from
                ``FSDPTestModel``, which defines the expected interface.
            fsdp_init_mode (FSDPInitMode): The mode to initialize the
                FSDP-wrapped model. This should not be ``NO_FSDP``.
            ref_init_fn (Optional[Callable]): A callable to invoke that wraps a
                non-wrapped model to construct the reference model, where this
                wrapper should provide data parallel semantics. If ``None``,
                then the callable defaults to the DDP constructor.
        z.Expects an FSDP init mode that wraps with FSDPNr  r   T)r  output_device)r  r  r  r  r  r  r  )r   r  r   r  r  r  zInitializing z raised error r;   zOAn FSDP-managed module with parameter CPU offloading enabled has parameters on F)r  r  r  r  r  r  r  r  )check_dtypezFSDP did not match DDP)exact_devicemsg) r?   rC   rf   r   r\   rG   rH   r2   DDPr   r   r  r   r   r  	Exceptionr  r   r   r   rI   r   r  r`   rP   r  assertRaisesRegexr  r   r   r  assert_close)"rO   r  r   r   r  r  r  r   r  r   r  r  r  r  r  r  r  r   r  r   re   	ref_modelref_loss
ddp_paramsr  r  r  expects_device_errorexpects_cpu_device
cpu_devicerm   context	fsdp_lossfsdp_unsharded_paramsrE   rE   rF   _test_fsdp_parity)  s   #






	


zFSDPTest._test_fsdp_parity)r  NFNFFN)$r@   rA   rB   r  r  r~   rf   r	  r  r  r  r  r  r-   classmethodr  rc   rd   r   floatr   r   r   r   r   r
   r  r9  rK   r?   rG   r   r   r   r*  r
  rE   rE   r   rF   r  x  s    



8	

\	
r  compile_compute_on_modulec                    s.   fddG dd dt   fdd}|S )Nc                     s>   t jjj| i |  d u st| d  r| d   d S d S )Nr   )r`   r3  rR  r   r   r%  r[   )r-  rE   rF   !fully_shard_with_compiled_compute  s   
z=compiled_fsdp_test.<locals>.fully_shard_with_compiled_computec                   @   r>   )z*compiled_fsdp_test.<locals>.FullyShardModeN)r@   rA   rB   r   EAGERCOMPILED_COMPUTErE   rE   rE   rF   FullyShardMode  s    
r1  c                    s   t   fdd}|S )Nc                     s   t jjj} D ]b}| jkrt std qt jj	j
}t jj	j}t j  | jkr.|}n| jkr@dt jj	_
dt jj	_}ntd| |j|j< | i | t j  |j|j< |t jj	_
|t jj	_qd S )Nz0Inductor on GPU needs Triton and recent GPU archTr=   z!Need to implement FullyShardMode=)r`   r3  rR  r   r/  r4   warningswarnr  configskip_fsdp_hooks	_inductorcompile_threadsr  r0  NotImplementedError__globals__r@   )rY   rZ   original_fully_shardmodeoriginal_skip_fsdp_hooksoriginal_compile_threadsfully_shard_patch)r1  r.  funcrE   rF   wrapper  s0   










z6compiled_fsdp_test.<locals>.decorator.<locals>.wrapperr   )r?  r@  )r1  r.  )r?  rF   	decorator  s    z%compiled_fsdp_test.<locals>.decorator)r   )r-  rA  rE   )r1  r-  r.  rF   compiled_fsdp_test  s   $rB  c                       s&   e Zd Zd fddZdd Z  ZS )
SkipModulerL   Nc                    s    t    tjdddd| _d S N
   Frj  )r   r   rc   r   linr   r   rE   rF   r     s   
zSkipModule.__init__c                 C   r  rr   )rF  r  rE   rE   rF   r     r   zSkipModule.forwardr]   r?  rE   rE   r   rF   rC    s    rC  c                       r<  )NestedLinearc                    sJ   t    |rttjddddt| _d S tjddddt| _d S rD  )r   r   r    rc   r   r   r   nested_linear)rO   	fsdp_wrapr   rE   rF   r     s   
 zNestedLinear.__init__c                 C   r  rr   )rH  r  rE   rE   rF   r     r   zNestedLinear.forwardr?  rE   rE   r   rF   rG    s    rG  c                       r<  )	SkipModelc                    sH   t    tjddddt| _t t| _t	t
|dtd| _d S )NrE  Frj  )rI  )r  )r   r   rc   r   r   r   linearrC  linear_skipr    rG  rH  )rO   double_nestr   rE   rF   r     s   

zSkipModel.__init__c                 C   s"   |  |}| |}| |}|S rr   )rK  rL  rH  r  rE   rE   rF   r   $  s   


zSkipModel.forwardr?  rE   rE   r   rF   rJ    s    rJ  )FT)FFr~  )rE   rr   )
contextlibr  r  r  r-  r2  abcr   r   r   copyr   enumr   r   	functoolsr	   typingr
   r   r   r   r   r   unittestr   r`   torch.distributedr3  rx   torch.nnrc   torch.nn.functionalr   ra  torch.distributed._composabler   torch.distributed.device_meshr   torch.distributed.fsdpr   r   r   r   $torch.distributed.fsdp._common_utilsr   5torch.distributed.fsdp._fully_shard._fsdp_param_groupr   r   "torch.distributed.fsdp._init_utilsr   2torch.distributed.fsdp.fully_sharded_data_parallelr   r   r   *torch.distributed.fsdp.sharded_grad_scalerr   torch.distributed.fsdp.wrapr   r   r    torch.distributed.tensorr!   r"   r#   !torch.distributed.tensor.parallelr$   r%   r&   r'   r(   r)   torch.nn.parallel.distributedr*   r  *torch.testing._internal.common_distributedr+   r,   r-   r.   $torch.testing._internal.common_utilsr/   r0   r1   r2   r3   torch.utils._tritonr4   r  r   r  r6   device_countr9   r?   rG   rd   rK   r  r   r   r	  r   r   r   r   r   r   r   r   r   r  r  r  r(  r:  r=  r@  rY  r  rf  ry  contextmanagerr  r  r  r  r  r  r  r  r_   r   r  r  r  r9  rB  rC  rG  rJ  rE   rE   rE   rF   <module>   s   		


	 ^"`M	  -


!  ]3	