o
    Hh?                  	   @   s  U d dl Z d dlmZmZmZ d dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ dd	lmZmZmZmZmZmZmZ dd
lmZmZmZmZm Z m!Z! ddl"m#Z#m$Z$ dgZ%e&e' e(d< dedede)fddZ*de&e de&e de&e fddZ+dej,defddZ-dedefddZ.dededefddZ/d e'dede fd!d"Z0d e'dedede fd#d$Z1d e'dej,de fd%d&Z2d e'd'efd(d)Z3d*d+ Z4d,d- Z5d e'd.ed/e&e de&e fd0dZ6d1edefd2d3Z7d e'd4ede&e  fd5d6Z8dedefd7d8Z9dej,de&e fd9d:Z:d e'd;ed<ede&e fd=d>Z;d1e<e'ef defd?d@Z=dAedBedCedDefdEdFZ>dS )G    N)AnyCallablecast)_get_device_module)ShardMetadata)ShardedTensor)DTensor)%compute_local_shape_and_global_offset   )BytesStorageMetadataChunkStorageMetadataMetadataIndexSTATE_DICT_TYPESTORAGE_TYPESTensorPropertiesTensorStorageMetadata)LoadItemTypeReadItemSavePlanTensorWriteData	WriteItemWriteItemType)"_check_shard_metadata_pair_overlap+_shards_get_overlap_region_wrt_saved_tensor create_read_items_for_chunk_list__all__plan
other_planreturnc           
      C   s  | j |j krdS t| jt|jkrdS t| j|jD ]j\}}|j|jkr( dS |j}|j}|j|jks@|j|jks@|j|jkrC dS |j}|j}|rM|rQ|sT|rT dS |r|r|j	|j	kra dS |j
}|j
}	|rk|	ro|sr|	rr dS |r|	r|j|	jks|j|	jkr dS qdS )a  
    Compare the two Save plans and return True if they are equal.

    Args:
        plan (SavePlan): First SavePlan to compare.
        other_plan (SavePlan): Second SavePlan to compare.

    Returns:
       True if the two plans are equal, False otherwise.
    FT)usablelenitemsziptypeindexfqnoffsettensor_datasizechunkoffsetssizes)
r   r   	plan_itemother_plan_itemplan_metadata_indexother_plan_metadata_indexr'   other_tensor_datar)   other_chunk r2   `/var/www/vscode/kcb/lib/python3.10/site-packages/torch/distributed/checkpoint/planner_helpers.py_compare_save_plans'   sD   r4   cached_plansdelta_plansc                 C   s<   g }t | |D ]\}}|r|js|| q|| q|S )ac  
    Merge a list of delta plans into a single plan.

    Args:
        cached_plans (List[SavePlan]): A list of cached plans.
        delta_plans (List[SavePlan]): A list of delta plans to merge. It can contain empty plans

    Returns:
        A single merged plan. If a delta plan is not usable, use the cached plan. Otherwise, use the delta plan.
    )r"   r   append)r5   r6   merged_planscached_plan
delta_planr2   r2   r3   _merge_delta_local_plansh   s   
r;   tensorc                 C   s$   t tdgt|   |  dS )Nr   r*   r+   )r   torchSizer    r(   )r<   r2   r2   r3   _create_chunk_from_tensor   s   r@   shard_mdc                 C   s   t t| jt| jdS Nr=   )r   r>   r?   shard_offsetsshard_sizes)rA   r2   r2   r3   _chunk_for_shard   s   

rE   sharded_tensorc                 C   s>   |   j}t|j|j|j|j|jd}tt	|||   j
dS )N)dtypelayoutrequires_gradmemory_format
pin_memoryr)   
propertiesr(   )metadatatensor_propertiesr   rG   rH   rI   rJ   rK   r   rE   r(   )rF   rA   shard_propertiesrM   r2   r2   r3   _sharded_tensor_metadata   s   
rQ   r%   c              	   C   sb   t |j|j|j\}}t|t|}}tt| |tj	t
t||dt| | ddS )Nr=   rL   r$   r#   r'   )r	   shapedevice_mesh
placementsr>   r?   r   r   r   SHARDr   r   r   create_from_tensorto_localr(   )r%   r<   r+   r*   r2   r2   r3   _create_write_items_for_dtensor   s    rY   c                 C   s(   t |j}tt| |tjt||dS )NrR   )r>   r?   rC   r   r   r   rV   rQ   )r%   rF   rA   r*   r2   r2   r3   _create_write_item_for_shard   s   rZ   c                 C   sN   t dgt|  }tt| |tjtt	|| dt
|| ddS )Nr   r=   rL   rR   )r>   r?   r    r(   r   r   r   TENSORr   r   r   rW   )r%   r<   r*   r2   r2   r3   _create_write_item_for_tensor   s   r\   bytesc                 C   s   t t| tjdS )N)r$   r#   )r   r   r   BYTE_IO)r%   r]   r2   r2   r3   _create_write_item_for_bytesio   s   r_   c              	   C   s.   t tj| t|f|t|ft|fdS N)r#   
dest_indexdest_offsetsstorage_indexstorage_offsetslengths)r   r   r^   r>   r?   ra   dest_offsetrc   storage_offsetlengthr2   r2   r3   _create_read_item_for_byteio   s   


rj   c              	   C   s(   t tj| t||t|t|dS r`   )r   r   r[   r>   r?   ra   rb   rc   rd   re   r2   r2   r3   _create_read_item_for_tensor   s   rl   checkpoint_mdlocal_chunksc                 C   s   g }t |D ]L\}}t |jD ]B\}}t||sqg }g }	g }
t||dD ]\}}}}|| |	| |
| q%|tt| |j||	t| |j|||
d qq|S )aW  
    Create a list of ``ReadItem`` based on the checkpoint and local chunks.

    This applies the resharding algorithm and computes the reads needed
    to satisfy ``local_chunks`` with a checkpoint described by ``checkpoint_md``.

    Args:
        fqn (str) : The state_dict FQN to pass to ``ReadItem``.
        checkpoint_md (TensorStorageMetadata): metadata for a given tensor
            from a checkpoint.
        local_chunks (List[ChunkStorageMetadata]): Local chunks that needs to be
            loaded.

    Returns:
        A list of ``ReadItem`` that will satisfy all input chunks.
    )saved_shardcurrent_shardrk   )	enumeratechunksr   r   r7   rl   r   r*   )r%   rm   rn   
read_itemsidxshardstorage_idx
storage_mdrd   rb   re   _dimoffset_for_saved_tensoroffset_for_current_tensorri   r2   r2   r3   r      s<   



state_dictc                    s   g }|   D ]?\ ttr|t  qttr.| fdd jD  qtt	j
r=|t  q|t  qt|S )Nc                 3   s    | ]	}t  |V  qd S )N)rZ   ).0rA   r%   objr2   r3   	<genexpr>,  s
    

z5_create_default_metadata_only_plan.<locals>.<genexpr>)r!   
isinstancer   r7   rY   r   extendrN   shards_metadatar>   Tensorr\   r_   r   )r{   requestsr2   r}   r3   "_create_default_metadata_only_plan&  s   


r   objectc                    s\   t dr S ttr fdd D S ttjr(t gS t gS )N__create_write_items__c                    s   g | ]	}t  |jqS r2   )rZ   rN   r|   ru   r%   r   r2   r3   
<listcomp><      z'_create_write_items.<locals>.<listcomp>)	hasattrr   r   r   local_shardsr>   r   r\   r_   r   r2   r   r3   _create_write_items7  s   

r   c                 C   s8   t | j| j| j\}}t|t|}}t||dS rB   )r	   rS   rT   rU   r>   r?   r   )r<   r+   r*   r2   r2   r3   _create_chunk_from_dtensorF  s   r   c                 C   sb   t | dr|  }|S t| trdd |  D }|S t| tjr(t| g}|S tdt	|  )N__create_chunk_list__c                 S   s   g | ]}t |jqS r2   )rE   rN   r   r2   r2   r3   r   V  s    
z&_create_chunk_list.<locals>.<listcomp>zMUnsupported Type, expecting one of [Tensor, DTensor, ShardedTensor] ,but got )
r   r   r   r   r   r>   r   r@   
ValueErrorr#   )r<   rn   r2   r2   r3   _create_chunk_listQ  s    


r   mdr~   c              
   C   sx   t |ts.zt|}W n ty' } ztd|  ddt|  |d }~ww t| ||S tt| dt| dddgS )Nz Invalid checkpoint metadata for z, z(expected BytesStorageMetadata but found r   rf   )r   r   r   r   r#   r   rj   r   )r%   r   r~   rn   exr2   r2   r3   _create_read_itemsd  s,   

r   c                 C   s>   dt fdd}dtfdd}dtjfdd}t| ||| dS )	zP
    Initializes meta tensor if the meta tensor is DTensor or torch.Tensor.
    valuec                 S   st   t | dd }|tdkr8tj j}ttjt|	 }tj
|  |d}tj|| j| j|  |  d}|S | S )Ndevicemetar   )rT   rU   rS   stride)getattrr>   r   distdistributed_c10d_get_pg_default_devicer#   r   r   current_device
empty_likerX   r   
from_localrT   rU   r(   r   )r   r   device_typenew_local_tensordtensorr2   r2   r3   dtensor_func  s    z&_init_state_dict.<locals>.dtensor_funcc                 S   s2   t | dd }|tdkrtdt|  d| S )Nr   r   zFound unsupported type z for meta device loading.)r   r>   r   RuntimeErrorr#   )r   r   r2   r2   r3   sharded_tensor_func  s   z-_init_state_dict.<locals>.sharded_tensor_funcc                 S   sP   t | dd }|tdkr&tj j}ttjt|	 }tj
| |d}|S | S )Nr   r   r   )r   r>   r   r   r   r   r#   r   r   r   r   )r   r   r   r<   r2   r2   r3   tensor_func  s   z%_init_state_dict.<locals>.tensor_funcN)r   r   r>   r   _iterate_state_dict)r{   r   r   r   r2   r2   r3   _init_state_dict{  s   	r   iter_objectr   r   r   c                    s   t | tr	 | S t | tr| S t | tjr| S t | ttttt	j
fs+| du r-| S t | trF|  D ]\}}t| | |< q6| S t | ttfrc fdd| D }t | trat|}|S dS )a$  
    Iterate through the state dict, applying the given functions to each tensor type
    and update the state dict in place.

    Args:
        iter_object (Any): the target state_dict.
        sharded_tensor_func (Callable): the function to apply to ShardedTensor
        dtensor_func (Callable): the function to apply to DTensor
        tensor_func (Callable): the function to apply to Tensor

    # TODO: let state_dict_util._iterate_state_dict() to support in place option
    so we don't need to have two versions of _iterate_state_dict.
    Nc                    s   g | ]	}t | qS r2   )r   )r|   vr   r   r   r2   r3   r     r   z'_iterate_state_dict.<locals>.<listcomp>)r   r   r   r>   r   intfloatstrr]   ioBytesIOdictr!   r   listtuple)r   r   r   r   keyr   retr2   r   r3   r     s0   




r   )?r   typingr   r   r   r>   torch.distributeddistributedr   torch._utilsr   !torch.distributed._shard.metadatar   'torch.distributed._shard.sharded_tensorr   torch.distributed.tensorr   torch.distributed.tensor._utilsr	   rN   r   r   r   r   r   r   r   plannerr   r   r   r   r   r   
reshardingr   r   r   r   r   __annotations__boolr4   r;   r   r@   rE   rQ   rY   rZ   r\   r_   rj   rl   r   r   r   r   r   r   r   r   r   r2   r2   r2   r3   <module>   s   
$ 	A



77