o
    Ihx1                     @   s(  d dl Z d dlmZ d dlZd dlm  mZ d dlm	Z	 d dl
mZ ddlmZmZ ddlmZ e eZdejd	ejd
efddZdejd	ejded
dfddZdejd
ejfddZeeeef   Zdejd
dfddZdejd
efddZ dejdedefddZ!dejfddZ"dd Z#dS )    N)cast)is_symbolic)
OrderedSet   )configir)Vxcomm_buffer_typereturnc                 C   sP   t | }t|tjrdS | }t|tjrdS t|tjr&t| s&dS dS )ze
    Check if an input can be realized as a comm buffer of the specified
    `comm_buffer_type`.
    TF)		_get_data
isinstancer   Loopsget_output_specCommBufferLayoutFlexibleLayoutr   	get_numel)r	   r
   datalayout r   Q/var/www/vscode/kcb/lib/python3.10/site-packages/torch/_inductor/comm_lowering.pycan_realize_as_comm_buffer8   s   r   
group_namec                 C   s   |    t| }t|tjsJ | }t|tjrdS t|tjs*td| dt	|
 r8td| dtj|||d|_dS )z
    Realize an input as a comm buffer of the specified `comm_buffer_type`.

    Specifically, this realizes the underlying buffer if it's still unrealized
    and changes the layout of the buffer to `ir.CommBufferLayout`.
    NzOA buffer can only be realized as a comm buffer if it has `FlexibleLayout` (got ).zGA buffer with symbolic shape cannot be converted to a comm buffer (got )r   r
   r   )realizer   r   r   Bufferr   r   r   AssertionErrorr   r   r   )r	   r
   r   bufferr   r   r   r   realize_as_comm_bufferN   s.   	r   c                 C   sJ   t | jtjr| j jS t | jtjrttj| jjS td| j d)Nz\Expect the data attr of a `TensorBox` to be either an `ir.BaseView` or `ir.StorageBox` (got r   )	r   r   r   BaseViewunwrap_view
StorageBoxr   r   r   r	   r   r   r   r   r   s   r   c                 C   s   t ttj|  f dS )z
    If a non-blocking collective is lowered as a blocking collective, the wait
    node in the original graph becomes useless and we can skip the lowering it.
    N)_bufs_to_skip_waitaddidr   graphget_namer"   r   r   r   mark_as_skip_wait   s   r(   c                 C   s   t tj|  ftv S N)r%   r   r&   r'   r#   r"   r   r   r   should_skip_wait   s   r*   inp	reduce_opc                 C   sP   ddl m} |  |  j }tjjo'||o't| t	j
jo'|dv o'|tjjkS )Nr   )is_symm_mem_enabled_for_group)sum)#torch.distributed._symmetric_memoryr-   r   	get_dtypeitemsizer   _collectiveauto_selectr   r   CommBufferTypeSYMM_MEM#one_shot_all_reduce_threshold_bytes)r+   r,   r   r-   inp_sizer   r   r   $_should_lower_as_one_shot_all_reduce   s   
r8   c              	   C   s6   t | tjj| ttjjtjt	j
jjj| ||S r)   )r   r   r4   r5   pytreetree_map	TensorBoxcreateFallbackKerneltorchopssymm_memone_shot_all_reducedefaultr+   r,   r   r   r   r   _one_shot_all_reduce   s   
rD   c               	      s  zt jjj W n ty   td Y d S w ddlm m	m
mm  fdd} t jj| jdtjdtdtd	tjffd
d}| jdtjdtdtd	tjffdd}| jfdd}| jfdd}| jfdd}| jfdd}| jfdd}| jfdd}| jfdd}	| jfdd}
| jfdd}| jfd d!}| t jjjd"d# }| jfd$d%}d S )&NzRInductor support for distributed collectives depends on building torch.distributedr   )add_layout_constraintcloneconstrain_to_fx_stridescopy_register_loweringc                    s    |  | S r)   r   )fn)rE   rG   rI   r   r   register_comm_lowering   s   
z7register_comm_lowerings.<locals>.register_comm_loweringr+   r,   r   r   c                    sf   t | ||rt| ||S | } tjr |   tjj| 	  t
j| } t
j jj| || | S r)   )r8   rD   r    reorder_for_compute_comm_overlapr   r   r&   no_fuse_buffer_namesr$   r'   r   ExternKernelrequire_contiguous_CollectiveKernelcreate_inplaceall_reduce_rB   rC   c10drF   r   r   _all_reduce   s   z,register_comm_lowerings.<locals>._all_reducec                    sP   t | ||r| t| ||}t| | S tj| } tj jj	| || | S r)   )
r8   rD   r(   r   rN   rO   rP   rQ   rR   rB   )r+   r,   r   ret)rT   rH   r   r   _all_reduce_   s   
z-register_comm_lowerings.<locals>._all_reduce_c                    s,   fdd| D } t j jj| || | S )Nc                    s   g | ]} |qS r   r   ).0r+   )rF   r   r   
<listcomp>   s    zJregister_comm_lowerings.<locals>._all_reduce_coalesced.<locals>.<listcomp>r   rP   rQ   all_reduce_coalesced_rB   inputsr,   r   rS   r   r   _all_reduce_coalesced   s   z6register_comm_lowerings.<locals>._all_reduce_coalescedc                       t j jj| || | S r)   rZ   r\   rT   r   r   _all_reduce_coalesced_   s   z7register_comm_lowerings.<locals>._all_reduce_coalesced_c                    s   t jt j jj| ||S r)   )r   r;   r<   rP   create_out_of_placeall_gather_into_tensorrB   )r+   
group_sizer   r`   r   r   _all_gather_into_tensor   s   z8register_comm_lowerings.<locals>._all_gather_into_tensorc              	      s"   t tjjtj jj| ||S r)   )	r9   r:   r   r;   r<   rP   rb    all_gather_into_tensor_coalescedrB   )r]   rd   r   r`   r   r   !_all_gather_into_tensor_coalesced  s   zBregister_comm_lowerings.<locals>._all_gather_into_tensor_coalescedc                   s   t jj jj| |||d |S )N)out)r   rP   rQ   all_gather_into_tensor_outrB   )r+   rd   r   rh   r`   r   r   _all_gather_into_tensor_out  s   z<register_comm_lowerings.<locals>._all_gather_into_tensor_outc              	          t jt j jj| |||S r)   )r   r;   r<   rP   rb   reduce_scatter_tensorrB   )r+   r,   rd   r   r`   r   r   _reduce_scatter_tensor"     z7register_comm_lowerings.<locals>._reduce_scatter_tensorc              
      s$   t tjjtj jj| |||S r)   )	r9   r:   r   r;   r<   rP   rb   reduce_scatter_tensor_coalescedrB   )r]   r,   rd   r   r`   r   r    _reduce_scatter_tensor_coalesced.  s   zAregister_comm_lowerings.<locals>._reduce_scatter_tensor_coalescedc              	      rk   r)   )r   r;   r<   rP   rb   all_to_all_singlerB   )r+   output_split_sizesinput_split_sizesr   r`   r   r   _all_to_all_single;  rn   z3register_comm_lowerings.<locals>._all_to_all_singlec                    s"   | } t j jj| || | S r)   r   rP   rQ   
broadcast_rB   r+   srcr   rS   r   r   
_broadcastG  s
   z+register_comm_lowerings.<locals>._broadcastc                    r_   r)   ru   rw   r`   r   r   _broadcast_O  s   z,register_comm_lowerings.<locals>._broadcast_c              	   S   s$   t jt jtjjjj	| |||S r)   )
r   r;   r<   rP   rb   r>   r?   _dtensorshard_dim_alltoallrB   )r+   
gather_dim	shard_dimr   r   r   r   _shard_dim_alltoallV  s   
z4register_comm_lowerings.<locals>._shard_dim_alltoallc                    s"   t | r| S tj jj|  | S r)   )r*   r   _WaitKernelcreate_waitwait_tensorrB   )r+   r`   r   r   _wait_tensorb  s   z-register_comm_lowerings.<locals>._wait_tensor)r>   r?   _c10d_functional
all_reduceAttributeErrorloginfoloweringrE   rF   rG   rH   rI   r   r;   strrR   all_reduce_coalescedr[   rc   rf   ri   rl   ro   rq   	broadcastrv   r{   r|   r   )rK   rU   rW   r^   ra   re   rg   rj   rm   rp   rt   ry   rz   r   r   r   )rE   rT   rF   rG   rH   rI   r   register_comm_lowerings   s^   &
	


r   )$loggingtypingr   r>   torch.utils._pytreeutils_pytreer9   torch._inductor.utilsr   torch.utils._ordered_setr    r   r   virtualizedr   	getLogger__name__r   r;   r4   boolr   r   r   IRNoder   tupleintr#   r(   r*   r8   rD   r   r   r   r   r   <module>   sL   
*

$
