o
    Hhd5                     @   sJ  d dl Z d dlmZmZmZ d dlZd dlmZ d dl	m  m
  mZ d dlm  mZ d dlmZmZmZmZ d dl	mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
l m!Z! d dl"m#Z#m$Z$m%Z%mZ& d dl'm(Z(m)Z) dgZ*de$de+ej,ej,f fddZ-de$de.de+ej,ej,f fddZ/de$de+ej,ej,f fddZ0de$de.defddZ1de$dej2defddZ3de$dej2fddZ4d ej5dej6d!e.dej5fd"d#Z7dej6d!e.d$e.d%e.d&ej2dej6fd'd(Z8dej6d!e.d)e#de$fd*d+Z9dej6de+ej6e:e f fd,d-Z;de$d.ee# dej6fd/d0Z<G d1d deZ=dS )2    N)AnycastOptional)ShardShardedTensorShardedTensorMetadataTensorProperties)ShardMetadata)ChunkShardingSpec)_mesh_resources)_set_fsdp_flattened)FSDPExtensions)_create_chunk_sharded_tensor)_remote_device)
DeviceMeshDTensor	Replicater   )_flatten_tensor_unflatten_tensorDTensorExtensionstensorreturnc                 C   s   | j }|jdksJ d| jd }dgt|   }|jdd}| jd  r8tt|j}| || }|||< t	
|| j fS )N   &Only 1D DeviceMeshes currently handledr   )mesh_dim)device_meshndim
placementslensizeis_shardr   DSharddimtorchSize_local_tensor)r   r   	placementoffsets
num_chunks	shard_dim
chunk_size r+   Z/var/www/vscode/kcb/lib/python3.10/site-packages/torch/distributed/tensor/parallel/fsdp.py_get_box    s   
r-   idxc                    s(   t | \}}t fdd|D |fS )Nc                    s   g | ]}|  qS r+   r+   ).0valr.   r+   r,   
<listcomp>2   s    z _get_box_for.<locals>.<listcomp>)r-   r#   r$   )r   r.   r'   r   r+   r1   r,   _get_box_for0   s   r3   c                 C   s(   | j }| }|d usJ t| |d S )Nr   )r   get_coordinater3   )r   r   coordr+   r+   r,   _get_local_box5   s   r6   dtcurrent_rankc                 C   sJ   | j }|jdksJ dt| \}}tt|t|d| d| jj dS )Nr   r   rank:/shard_offsetsshard_sizesr&   )r   r   r6   r	   listr%   device)r7   r8   meshr'   sizesr+   r+   r,   _create_shard_md_from_dt<   s   rB   dt_pgc           	   
   C   s   g }t |}|dkrdnd}| jd  r| }nd}t|D ]%}t| |\}}|tt	|t	|d|dkr:|n| d| j
j d q!t||  t| j| j| jddS )Nr   r   r9   r:   r;   )dtypelayoutrequires_grad)shards_metadatar   tensor_properties)distget_rankr   r    r   ranger3   appendr	   r>   r%   r?   r   r   rD   rE   rF   )	r7   rC   	shards_mdmy_rankscapegoat_rankshard_countir'   rA   r+   r+   r,   !_create_sharded_tensor_md_from_dtH   s0   


rR   c                 C   s    | j }|jdksJ d| S )Nr   r   )r   r   	get_group)r7   r@   r+   r+   r,   
_get_dt_pgo   s   rT   specrankc                 C   s   t | ts| S d}| jD ]}tt|}| |kr$| |jkr$d} nq|rVt| } t	| jD ]$\}}tt|}| |krU| |jkrUtd| d|j | j|< q1| S )z
    Rewrite ``spec`` to match the device of ``tensor``.

    FSDP.sharded_optim_state_dict sneakly ships optimizer state to CPU so if the original ShardingSpec
    produces CUDA metadata, ST construction bombs.
    FTr9   r:   )

isinstancer
   r   r   r   rV   r?   copydeepcopy	enumerate)rU   r   rV   rewriteprQ   r&   r+   r+   r,   _rewrite_spec_if_neededu   s"   
	



r]   
world_sizenum_devices_per_nodepgc                 C   s  t | tu rCt|  dksJ |  }t|||||}|  d }t|t|j	g}t| 	 }	d|	j
_tj||	| jdd}
|
S t | tu r| j}|jdksUJ d| j}t|||tj |}t| }t|t| t|g}t| |}	d|	j
_tj||	|dd}
|
S t| ||||S )Nr   r   F)sharded_tensor_metadataprocess_group
init_rrefsr   )typer   r   local_shardslocal_tensorr   r   rX   rY   metadatarH   rF   +_init_from_local_shards_and_global_metadata_process_groupr   r   r   r%   r#   acceleratordevice_countrT   rB   rI   rJ   rR   )r   rV   r^   r_   r`   inner_paraminner_stouter_local_shardshardsst_metast_outerr   rC   r+   r+   r,   _chunk_tensor   sh   
rr   r   c                 C   s   t |}|du rtd|jdk rtd|j dd|   } t| tjrUt| t	sUdd t
|jD }d	d t
|jD }td
|d
< t	j| ||ddj||dS | j}|d
 }|  } dd t
|jD }||d< dd t
|jD }td
|d< ||d< t	j| ||ddj||dS )z
    Shard a tensor to chunks along the first dimension.

    The local rank will gets its corresponding chunk as the local tensor to create a DTensor.
    Nz4No parent device_mesh is found for FSDP device_mesh.   z!Found parent device_mesh of ndim=,zbut meshes must be at least 2D.c                 S      g | ]}t  qS r+   r   r/   _r+   r+   r,   r2          z"_chunk_dtensor.<locals>.<listcomp>c                 S   ru   r+   rv   rw   r+   r+   r,   r2      ry   r   F)	run_checkr   r   c                 S   ru   r+   rv   rw   r+   r+   r,   r2     ry   c                 S   ru   r+   rv   )r/   rQ   r+   r+   r,   r2     ry   )r   get_root_meshRuntimeErrorr   detachclonerW   r#   Tensorr   rK   r!   
from_localredistributer   to_local)r   rV   r   	root_meshreplicate_placementsshard_placementstp_placementstp_placementr+   r+   r,   _chunk_dtensor   sF   


r   c                 C   s\   t t|  }t|dkr!t|d jtu r!|d j}| }|} | t|dkr+|fS g fS )Nr   r   )r   r   re   r   rd   r   )r   ro   inner_tensorr+   r+   r,   _pre_load_state_dict  s   
r   parent_meshc                 C   sX   || j ksJ tt| j}tdt|d D ]}t ||< q| j| j |d} | 	 S )zGAll gather a DTensor in its FSDP dimension and return the local tensor.r   r   r{   )
r   r>   rX   rY   r   rK   r   r   r   r   )r   r   r   rQ   r+   r+   r,   _all_gather_dtensor)  s   r   c                       s   e Zd ZdZd fddZdejdeejee	 f fddZ
dejd	e	dejfd
dZ	ddejdedededejdeej dejfddZdejdededejfddZdejdeejee f fddZdedee dejfddZ  ZS )r   z
    DTensorExtension is the TensorFlattener extension needed for 2D FSDP + TP.

    This is the implementation for FSDPExtensions defined in
    https://github.com/pytorch/pytorch/blob/main/torch/distributed/fsdp/_fsdp_extensions.py
    r   Nc                    s*   t    d | _|| _tj| j| _d S N)super__init__compute_streamdevice_handler#   _dynamodisablepost_unflatten_transform)selfr   	__class__r+   r,   r   E  s   

zDTensorExtensions.__init__r   c                 C      t |S r   )r   r   r   r+   r+   r,   pre_flatten_transformO     z'DTensorExtensions.pre_flatten_transformparam_extensionc                 C   s`   | j p| j }| j| t||| j| j d}t| |W  d    S 1 s)w   Y  d S )N)r   r   )r   r   current_streamstreamr   r   )r   r   r   r   resultr+   r+   r,   r   U  s   $z*DTensorExtensions.post_unflatten_transformrV   r^   r_   r`   r?   c                 C   s   t |||||S r   )rr   )r   r   rV   r^   r_   r`   r?   r+   r+   r,   chunk_tensorh  s   	zDTensorExtensions.chunk_tensorr   c                 C   s   t |||S r   )r   )r   r   rV   r   r+   r+   r,   chunk_dtensors  s   zDTensorExtensions.chunk_dtensorc                 C   r   r   )r   r   r+   r+   r,   pre_load_state_dict_transform{  r   z/DTensorExtensions.pre_load_state_dict_transformr   c                 C   s
   t ||S r   )r   )r   r   r   r+   r+   r,   all_gather_dtensor  s   
z$DTensorExtensions.all_gather_dtensor)r   Nr   )__name__
__module____qualname____doc__r   r#   r   tupler   r   r   r   intrI   ProcessGroupr?   r   r   r   r>   r   r   r   r   __classcell__r+   r+   r   r,   r   =  sh    





)>rX   typingr   r   r   r#   torch.distributeddistributedrI   &torch.distributed._shard.sharding_spec_shardsharding_spec
shard_spec"torch.distributed.distributed_c10ddistributed_c10dc10d'torch.distributed._shard.sharded_tensorr   r   r   r   r	   :torch.distributed._shard.sharding_spec.chunk_sharding_specr
   torch.distributed.device_meshr   $torch.distributed.fsdp._common_utilsr   'torch.distributed.fsdp._fsdp_extensionsr   #torch.distributed.fsdp._shard_utilsr   torch.distributed.remote_devicer   torch.distributed.tensorr   r   r   r!   6torch.distributed.tensor.parallel._data_parallel_utilsr   r   __all__r   r$   r-   r   r3   r6   rB   r   rR   rT   ShardingSpecr   r]   rr   r   r>   r   r   r   r+   r+   r+   r,   <module>   s   "
'

J
A

