o
    0h                     @   s(  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ d dlmZ d dlmZmZm Z m!Z!m"Z" d d	l#m$Z$ d d
l%m&Z& d dl'Z'd dl(Z'd dl)Z'd dl*m+Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z? d dl@mAZAmBZBmCZC d dlDZDejEejFd eGeHZIG dd deZJi deJdddeJdddeJdddeJdddeJdd d!eJd"d#d$eJd%d&d'eJd(d)d*eJd+d,d-eJd.d/d0eJd1d2d3eJd4d5d6eJd7d8d9eJd:d;d<eJd=d>d?eJd@dAdBeJdCdDdEeJdFdGiZKeG dHdI dIZLdJdK ZMdLdM ZNdNdO ZOdPdQ ZPdRdS ZQdTdU ZRdVdW ZSdXdY ZTdZd[ ZUd\d] ZVd^d_ ZWd`da ZXdbdc ZYddde ZZdfdg Z[dhdi Z\djdk Z]dldm Z^dndo Z_dpe'j`dqeadreadsebfdtduZce7dvdwdxedydzdxd{dxfd|d}Zde;rd~ZeneaefddZeddiZge:rdegd< dddZhdseafddZiedd ZjddeadeadeafddZkdeadelfddZmdanee	jo epd< ddeel dsdfddZqdddZrdZsG dd de<ZtG dd detZudevelewe! f de"de!fddZxdaydsebfddZzdd Z{deeesfddZ|G dd de<Z}G dd de2j~ZG dd de2j~ZedddZG dd de'jjj<ZG dd detZG dd de<ZdS )    N)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)
NamedTupleOptionalUnionAnyCallable)patch)	trace_log)
DeviceType)_SymmetricMemory)FILE_SCHEMAfind_free_portIS_SANDCASTLEretry_on_connect_failuresskip_but_pass_in_sandcastleskip_but_pass_in_sandcastle_ifTEST_WITH_ROCMTEST_WITH_TSANTestCase	run_testsTEST_HPUTEST_XPU)_install_threaded_pg_uninstall_threaded_pgProcessLocalGroup)levelc                   @   s   e Zd ZU eed< eed< dS )TestSkip	exit_codemessageN)__name__
__module____qualname__int__annotations__str r,   r,   ^/var/www/vscode/kcb/lib/python3.10/site-packages/torch/testing/_internal/common_distributed.pyr#   <   s   
 r#   backend_unavailableH   z5Skipped because distributed backend is not available.small_worldsizeI   z Skipped due to small world size.odd_worldsizeW   zSkipped due to odd world size.no_cudaJ   zCUDA is not available.zmulti-gpu-1K   zNeed at least 1 CUDA devicezmulti-gpu-2M   zNeed at least 2 CUDA deviceszmulti-gpu-3P   zNeed at least 3 CUDA deviceszmulti-gpu-4Q   zNeed at least 4 CUDA deviceszmulti-gpu-5R   zNeed at least 5 CUDA deviceszmulti-gpu-6S   zNeed at least 6 CUDA deviceszmulti-gpu-7T   zNeed at least 7 CUDA deviceszmulti-gpu-8U   zNeed at least 8 CUDA devicesncclL   z#c10d not compiled with NCCL support
skipIfRocmN   zTest skipped for ROCmno_peer_accessO   z'Test skipped because no GPU peer accessgenericV   zHTest skipped at subprocess level, look at subprocess log for skip reasonimporterrorX   z"Test skipped due to missing importno_acceleratorY   zaccelerator is not available.c                   @   s   e Zd Zi Zh ded< e ed< ddhed< ddhed< i Zh ded	< h ded
< h ded< h ded< e ed< erCdhed< erLdhed< dS dS )DistTestCases>   mpiuccr>   allgather_coalescedr   r>   rL   zsendrecv anysourcezcpu barrier>   rL   gloor>   gpucudaddpsubgrouppluginhcclhpuxcclxpuN)r&   r'   r(   skip_collectivesetbackend_featurer   r   r,   r,   r,   r-   rJ   [   s"    


rJ   c                       t   fdd}|S )zSkips if the world size exceeds the number of GPUs, ensuring that if the
    test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                     s   t j sttd j ttj	d }t j
 |k r&ttd|  j tr9t jj
|k r9ttd|  j trLt jj
|k rLttd|  j  | i |S )Nr4   
WORLD_SIZE
multi-gpu-z
multi-xpu-)torchrP   is_availablesysexit
TEST_SKIPSr$   r)   osenvirondevice_countr   rU   r   rW   )argskwargs
world_sizefuncr,   r-   wrapperu   s   
zskip_if_no_gpu.<locals>.wrapperr   rj   rk   r,   ri   r-   skip_if_no_gpuq   s   rn   c                    r[   )Nc                     s>   t jd dkrtt jd dk rttd j  | i |S )NBACKENDrK   r\      r0   rc   rd   r)   r`   ra   rb   r$   rf   rg   ri   r,   r-   rk      s    z(skip_if_small_worldsize.<locals>.wrapperrl   rm   r,   ri   r-   skip_if_small_worldsize      rs   c                    r[   )Nc                     sB   t jd dkrtt jd d dkrttd j  | i |S )Nro   rK   r\         r2   rq   rr   ri   r,   r-   rk      s   $z&skip_if_odd_worldsize.<locals>.wrapperrl   rm   r,   ri   r-   skip_if_odd_worldsize   rt   rw   c                        fdd}|S )Nc                       t   fdd}|S )Nc                     s>    dkrt j k rttd  j d S | i |S Nr>   r]   )r^   rP   re   r`   ra   rb   r$   rr   )backendrj   nr,   r-   rk      s   zCrequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapperrl   rm   r{   r|   ri   r-   	decorator   s   z2require_n_gpus_for_nccl_backend.<locals>.decoratorr,   )r|   r{   r~   r,   r}   r-   require_n_gpus_for_nccl_backend   s   
r   c                  C   s   dd } | S )Nc                    r[   )Nc                     sF   zddl m}m}  | i |W S  ty"   ttd j Y d S w )Nr   )AutoModelForMaskedLM
BertConfigrF   )transformersr   r   ImportErrorr`   ra   rb   r$   )rf   rg   r   r   ri   r,   r-   rk      s   z?import_transformers_or_skip.<locals>.decorator.<locals>.wrapperrl   rm   r,   ri   r-   r~      s   z.import_transformers_or_skip.<locals>.decoratorr,   )r~   r,   r,   r-   import_transformers_or_skip   s   r   c                 C   s   t j ot j | kS N)r^   rP   r_   re   xr,   r,   r-   at_least_x_gpu   s   r   c                        fdd}|S )Nc                       t   fdd}|S )Nc                     s   t j rt j kr | i |S tr#t j kr# | i |S tr3t j kr3 | i |S t	t
d  j d S )Nr]   )r^   rP   r_   re   r   rU   r   rW   r`   ra   rb   r$   rr   )rj   r   r,   r-   rk      s   z4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapperrl   rm   r   ri   r-   r~      s   	z#skip_if_lt_x_gpu.<locals>.decoratorr,   )r   r~   r,   r   r-   skip_if_lt_x_gpu   s   r   c                    rx   )Nc                    ry   )Nc                     sV    dkr| i |S t j rt j kr| i |S ttd  j d S rz   )r^   rP   r_   re   r`   ra   rb   r$   rr   )r{   rj   r   r,   r-   rk      s
   z9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapperrl   rm   r{   r   ri   r-   r~      s   z(nccl_skip_if_lt_x_gpu.<locals>.decoratorr,   )r{   r   r~   r,   r   r-   nccl_skip_if_lt_x_gpu   s   r   c                 C   st   |   }d|v s
J d|v sJ d|v sJ |d }|ddkr#|n|dd }||v s8J d| d| d S )	N	iteration	has_errorerrorz
Exception raised from r   zDid not find expected z in ddp logging data error: )_get_ddp_logging_datafindsplit)	model_DDP
err_substrddp_logging_datalogging_erractualr,   r,   r-   verify_ddp_error_logged   s   
r   c                    r[   )aJ  
    Convenience decorator to set/unset TORCH_NCCL_BLOCKING_WAIT flag. Note that use of
    this decorator will override the setting of TORCH_NCCL_ASYNC_ERROR_HANDLING for
    the particular test. After the test, both TORCH_NCCL_BLOCKING_WAIT and
    TORCH_NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
    c               	      s   zt jd }t jd= W n ty   d }Y nw zzt jd }W n ty*   d }Y nw W dt jd< ndt jd< w z | i |}|W |d urK|t jd< |d urU|t jd< S S |d ur_|t jd< |d urh|t jd< w )NTORCH_NCCL_ASYNC_ERROR_HANDLINGTORCH_NCCL_BLOCKING_WAIT1)rc   rd   KeyError)rf   rg    cached_nccl_async_error_handlingcached_nccl_blocking_waitretri   r,   r-   rk     sF   z(with_nccl_blocking_wait.<locals>.wrapperrl   rm   r,   ri   r-   with_nccl_blocking_wait   s   "r   c                    r   )zK
    Runs a test for each distributed debug level specified in levels.
    c                    r   )Nc                     sV   t jdd }D ]}|t jd< t   | i |}t  |d ur(|t jd< q	|S )NTORCH_DISTRIBUTED_DEBUG)rc   rd   getc10dset_debug_level_from_envbarrier)rf   rg   	old_levelr"   r   )rj   levelsr,   r-   rk   .  s   

z:with_dist_debug_levels.<locals>.decorator.<locals>.wrapperrl   rm   r   ri   r-   r~   -  s   z)with_dist_debug_levels.<locals>.decoratorr,   )r   r~   r,   r   r-   with_dist_debug_levels(  s   r   c                   C      t t  dS )Nz+c10d was not compiled with the Gloo backend)r   r   is_gloo_availabler,   r,   r,   r-   requires_glooB     r   c                 C   s@   t  stdS ttjj | k d|  dtjj  d| S )N+c10d was not compiled with the NCCL backendz0Requires NCCL version greater than or equal to: z	, found: z
, reason: )r   is_nccl_availabler   r   r^   rP   r>   version)r   msgr,   r,   r-   requires_nccl_versionI  s   r   c                   C   r   )Nr   )r   r   r   r,   r,   r,   r-   requires_ncclU  r   r   c                   C   r   )Nz*c10d was not compiled with the UCC backend)r   r   is_ucc_availabler,   r,   r,   r-   requires_ucc[  r   r   c                   C   r   )Nz*c10d was not compiled with the MPI backend)r   r   is_mpi_availabler,   r,   r,   r-   requires_mpia  r   r   c                  C   s$   t j ottjd} t|  dS )Nr   z"multicast support is not available)r^   rP   r_   r   has_multicast_supportr   CUDAr   )r   r,   r,   r-   requires_multicast_supporth  s   
r   c                    s   d _ t  fdd}|S )zSkips a test for ROCmTc                     s&   t s	 | i |S ttd j d S )Nr@   )r   r`   ra   rb   r$   rr   ri   r,   r-   rk   w  s   z*skip_if_rocm_multiprocess.<locals>.wrapper)skip_if_rocm_multiprocessr   rm   r,   ri   r-   r   s  s   r   c                   C   s   t tjdkdS )Nwin32z8This unit test case is not supported on Windows platform)r   r`   platformr,   r,   r,   r-   skip_if_win32  r   r   devicemajorminorreturnc                 C   s6   | j dkr	tdtjjdurdS tj| ||fkS )z
    Returns True if the device's compute capability is (major, minor) or higher.
    Error out if the device is not a CUDA device.
    Returns False if device is a RoCM device.
    rP   z3sm_is_or_later() is only supported for CUDA devicesNF)type
ValueErrorr^   r   hiprP   get_device_capability)r   r   r   r,   r,   r-   sm_is_or_higher_than  s
   
r   	localhostrv   T   )minutesFc           	      C   sH   t  }|rt|tdd }tjj| ||||S tj| |||||dS )zL
    Creates a TCP store. Retries if the chosen port is already in use.
    rv   )milliseconds)wait_for_workers	use_libuv)r   r)   r   r^   classes	dist_c10dTCPStorer   )	addrrh   	is_mastertimeoutr   	jit_classr   porttimeout_millisecondr,   r,   r-   create_tcp_store  s   
r   i  !DISTRIBUTED_TESTS_DEFAULT_TIMEOUT300test_ddp_uneven_inputsi     test_join_kwargsc                 C   s.   t jdks	| d u rtjjddS tjj| dS )Nr   z	127.0.0.1)hostname	interface)r`   r   r   ProcessGroupGloocreate_devicer   r,   r,   r-   r     s   r   c                 C   s   t | dd tS N.r   )TIMEOUT_OVERRIDEr   r   TIMEOUT_DEFAULT)test_idr,   r,   r-   get_timeout  s   r   c               	   c   s`    t  t  } }tjtj}}z| |t_t_tjtjfV  W ||t_t_d S ||t_t_w r   )r	   r`   stdoutstderr)new_outnew_errold_outold_errr,   r,   r-   captured_output  s   "r   rankrh   
num_inputsc              
      s~   ddt dt dt dt fdd}dt fd	d
  fddt|ddt|ddt|ddt|ddt|ddt|ddfD S )z
    Generate a number of basic test cases for sparse reduction.
    These cover tensors with a varying number of sparse dimensions and a varying
    number of dense dimensions. The only reduction operation we support is sum.
    rv   r   r   rh   sparse_dims
dense_dimsc              	   S   s   t t | d d| d f}|gdd t|D  }t|d D ]}t |t d| d f}|| q!t | d gdd t|D  }t |||S )Nrv   c                 S      g | ]}d qS ru   r,   .0_r,   r,   r-   
<listcomp>      z@simple_sparse_reduce_tests.<locals>.generate.<locals>.<listcomp>c                 S   r   r   r,   r   r,   r,   r-   r     r   )	r^   reshapearangerangecatzerosappendonessparse_coo_tensor)r   rh   r   r   indicesshaper   valuesr,   r,   r-   generate  s   "z,simple_sparse_reduce_tests.<locals>.generatec                    s    t tj fddtD S )Nc                    s   g | ]} |qS r,   r,   )r   r   fnrh   r,   r-   r         zCsimple_sparse_reduce_tests.<locals>.compute_sum.<locals>.<listcomp>)r   operatoraddr   r  r,   r  r-   compute_sum  s   z/simple_sparse_reduce_tests.<locals>.compute_sumc                    sD   g | ]  fd dt D  fddt D fqS )c                    s"   g | ]}  |  qS r,   r,   r   i)r	  r   r   rh   r,   r-   r     s    z9simple_sparse_reduce_tests.<locals>.<listcomp>.<listcomp>c                    s   g | ]	}  qS r,   r,   r  )r  r	  r   rh   r,   r-   r     s    )r   )r   r  r   r   rh   r	  r-   r     s    z.simple_sparse_reduce_tests.<locals>.<listcomp>)r   ru      )r   N)rv   r   )r)   r   )r   rh   r   r  r,   r  r-   simple_sparse_reduce_tests  s   
	




r  r{   c                    s^   t j }trt j }trt j }t|d | |kr!||    fddt| D }|S )zMultigpu tests are designed to simulate the multi nodes with multi
    GPUs on each node. Nccl backend requires equal #GPUs in each process.
    On a single node, all visible GPUs are evenly
    divided to subsets, each process only uses a subset.
    rv   c                    s*   i | ]}|t |  |d     qS rv   )listr  nGPUs_per_processvisible_devicesr,   r-   
<dictcomp>  s    z(init_multigpu_helper.<locals>.<dictcomp>)r^   rP   re   r   rU   r   rW   r   )rh   r{   nGPUsrank_to_GPUr,   r  r-   init_multigpu_helper  s   


r  tmp_dirinit_methodc                 C   s   t  atjtjd< ttjtjd ttjtjd tjtjd}t| | d ur8| tjd< d S t	tj|d tjd< d S )NTEMP_DIRr   test_dirinit_dirINIT_METHODshared_init_file)
tempfileTemporaryDirectoryr  namerc   rd   mkdirpathjoinr   )r  init_dir_pathr,   r,   r-   initialize_temp_directories  s   
r+  c                   C   s   t d ur
t   d S d S r   )r  cleanupr,   r,   r,   r-   cleanup_temp_dir,  s   r-     c                	       s8  e Zd ZdZdZdefddZedefddZede	fdd	Z
d
d Zd1dededdf fddZd2 fddZd2 fddZdefddZd2ddZd2ddZG dd deZede	fdd Zede	d!ed"eddfd#d$Zd!eddfd%d&Zd2d'd(Zd2d)d*Zd2d+d,Zd2d-d.Zedefd/d0Z  ZS )3MultiProcessTestCaser   
   r   c                 C      dS )NFr,   selfr,   r,   r-   _should_stop_test_suiteI     z,MultiProcessTestCase._should_stop_test_suitec                 C   r1  )NTr,   r2  r,   r,   r-   destroy_pg_upon_exitQ     z)MultiProcessTestCase.destroy_pg_upon_exitc                 C      t S r   DEFAULT_WORLD_SIZEr2  r,   r,   r-   rh   U  r7  zMultiProcessTestCase.world_sizec                        t   fdd}t|| S )Nc                    s$   | j | jkr|   d S    d S r   )r   MAIN_PROCESS_RANK_join_processesr2  r  r,   r-   rk   Z  s   
z1MultiProcessTestCase.join_or_run.<locals>.wrapperr   types
MethodTyper3  r	  rk   r,   r  r-   join_or_runY     z MultiProcessTestCase.join_or_runrunTestmethod_name
methodNameNc              
         |dkr|}t  | zt| |}t| || | W d S  ty@ } z|dkr5td| j d| |W Y d }~d S d }~ww NrD  zno such test method in z: super__init__getattrsetattrrB  AttributeErrorr   	__class__r3  rE  rF  r	  erO  r,   r-   rK  g     
zMultiProcessTestCase.__init__c                    s8   t    g | _g | _| j| _tjddj| _	i | _
d S )NF)delete)rJ  setUpskip_return_code_checks	processesr<  r   r$  NamedTemporaryFiler&  	file_namepid_to_piper2  rR  r,   r-   rU  v  s   

zMultiProcessTestCase.setUpc                    s(   t    | jD ]}|  qg | _d S r   )rJ  tearDownrW  	terminate)r3  prR  r,   r-   r[    s   



zMultiProcessTestCase.tearDownc                 C      |   dd S r   idr   r2  r,   r,   r-   _current_test_name  s   z'MultiProcessTestCase._current_test_namec              
   C   s   g | _ tt| jD ]<}tj \}}|| jjdt	| || 
 | j|fdt| ddid}|  td||j || j|j< | j | q
d S )Nzprocess fake_pgF)targetr&  rf   rg   zStarted process %s with pid %s)rW  r   r)   rh   r^   multiprocessingPiperO  _runr+   ra  rY  rL  startloggerinfopidrZ  r  )r3  procr   parent_conn
child_connprocessr,   r,   r-   _start_processes  s   
z%MultiProcessTestCase._start_processesc                 C   s   t jdj}| | d S )Nspawn)r^   rd  get_contextProcessro  )r3  rk  r,   r,   r-   _spawn_processes  s   z%MultiProcessTestCase._spawn_processesc                   @   s   e Zd ZdZdS )zMultiProcessTestCase.Eventrv   N)r&   r'   r(   GET_TRACEBACKr,   r,   r,   r-   Event  s    ru  r   c                 C   s   t d| 	 tj| |g}| |v r`| jrt d| d S |  }t d|| |tjj	kr`t
jdd#}t| |  |d | |  t d| W d    n1 s[w   Y  ||v rfd S q)	Nz*Starting event listener thread for rank %sTz:Pipe closed for process %s, stopping event listener threadzReceived event %s on process %szr+)moder   zProcess %s sent traceback)rh  ri  rd  
connectionwaitclosedrecvr/  ru  rt  r$  rX  faulthandlerdump_tracebackflushseeksendread)parent_pipesignal_piper   ready_pipeseventtmp_filer,   r,   r-   _event_listener  s,   

	z$MultiProcessTestCase._event_listener	test_namerY  c                 K   s$   | |}||_ ||_||| d S r   )r   rY  run_testclsr   r  rY  r  rg   r3  r,   r,   r-   rf    s   zMultiProcessTestCase._runc              
   C   s  t jjdd\}}tjtj||| jfdd}|  t	j
dkr*t	j
dkr*t jd dtjd< zizt| |  W nG tjy^ } ztd	| j|t| t	td
 j W Y d }~n&d }~w ty   tdt | jtj |t  t	tj Y nw W |d ur|d  |d usJ |  |  n|d ur|d  |d usJ |  |  w | j rzt!"  W d S  t#t$fy   Y d S w d S )NF)duplexT)rc  rf   daemonr   darwinr   TORCH_SHOW_CPP_STACKTRACESz4Process %s skipping test %s for following reason: %srD   z;Caught exception: 
%s exiting process %s with exit code: %s)%r^   rd  re  	threadingThreadr/  r  r   rg  r`   r   _C'_set_print_stack_traces_on_fatal_signalrc   rd   rL  unittestSkipTestrh  ri  r+   ra   rb   r$   	Exceptionr   	traceback
format_excTEST_ERROR_EXIT_CODEr  r)  closer6  r   destroy_process_groupAssertionErrorr   )r3  r  r  signal_recv_pipesignal_send_pipeevent_listener_threadser,   r,   r-   r    sX   






zMultiProcessTestCase.run_testc                 C   s  g }t | jD ]9\}}|jd u r@| j|j }z|tjj |	||f W q t
y? } ztd|| W Y d }~qd }~ww q|D ]A\}}z$|drd|jrXtd| W qC| }td|| ntd| W qC t
y } ztd|| W Y d }~qCd }~ww d S )NzBEncountered error while trying to get traceback for process %s: %sr   z5Pipe closed for process %s, cannot retrieve tracebackz)Process %s timed out with traceback: 

%sz6Could not retrieve traceback for timed out process: %s)	enumeraterW  exitcoderZ  rj  r  r/  ru  rt  r  ConnectionErrorrh  r   pollry  ri  rz  )r3  pipesr  rn  piperQ  r   r  r,   r,   r-   _get_timedout_process_traceback  sJ   

z4MultiProcessTestCase._get_timedout_process_tracebackc              	   C   sF  t |  }t }d}z	 t| jD ](\}}|jtjkr;td| d|j d t	j
 }|D ]}|  q0d} nq|r?n2tdd | jD rJn't | }	|	|krk|   td| d	 | jD ]}|  qcntd
 qt | }
|| jv r| |
 n| |
 W | j D ]}|  qd S | j D ]}|  qw )NFTProcess z terminated with exit code z", terminating remaining processes.c                 s   s    | ]}|j d uV  qd S r   )r  )r   r]  r,   r,   r-   	<genexpr>9  s    z7MultiProcessTestCase._join_processes.<locals>.<genexpr>zTiming out after z" seconds and killing subprocesses.g?)r   r`  timer  rW  r  r/  r  printr^   rd  active_childrenr\  allr  sleeprV  _check_no_test_errors_check_return_codesrZ  r  r  )r3  r	  r   
start_timesubprocess_errorr  r]  r  acelapsedelapsed_timer  r,   r,   r-   r=  #  sR   

	



 



z$MultiProcessTestCase._join_processesc                 C   sH   t | jD ]\}}|jdu rtd| d| d| | j|j qdS )zV
        Checks that we didn't have any errors thrown in the child processes.
        Nr  z timed out after  seconds)r  rW  r  RuntimeErrorassertNotEqualr  )r3  r  r  r]  r,   r,   r-   r  S  s   
z*MultiProcessTestCase._check_no_test_errorsc           
   
   C   sF  | j s
td dS | j d }dd t| j D }|r?d}|D ]\}}| j|j  }|d| dtj d	| d
7 }qt	|t| j D ])\}}|j
du rXt	d| d| d| j|j
|j
d| d|j
 d|j
 d qDt D ]}	|j
|	jkrtrtd|  |	j  dS t|	jqr| j|j
dd|j
 d|j d dS )z
        Checks that the return codes of all spawned processes match, and skips
        tests if they returned a return code indicating a skipping condition.
        z<Note: no subprocesses were spawned, test was likely skipped.Nr   c                 S   s$   g | ]\}}|j tjkr||fqS r,   )r  r/  r  )r   r  r]  r,   r,   r-   r   o  s
    z<MultiProcessTestCase._check_return_codes.<locals>.<listcomp> r  z exited with error code z and exception:

 terminated or timed out after r  zExpect process z+ exit code to match Process 0 exit code of z
, but got )r   6Skipping %s on sandcastle for the following reason: %sz Expected zero exit code but got z
 for pid: )rW  rh  warningr  rZ  rj  rz  r/  r  r  r  assertEqualrb   r  r$   r   ri  r`  r%   r  r  )
r3  r  first_processerrored_processesr   r  rn  error_messager]  skipr,   r,   r-   r  ^  sR   



z(MultiProcessTestCase._check_return_codesc                 C   s
   | j dkS )Nr   r   r2  r,   r,   r-   r        
zMultiProcessTestCase.is_masterrD  rD  r   N) r&   r'   r(   r<  r  boolr4  propertyr6  r)   rh   rB  r+   rK  rU  r[  ra  ro  rs  r   ru  staticmethodr  classmethodrf  r  r  r=  r  r  r   __classcell__r,   r,   rR  r-   r/  @  s6    	



3
%
0
@r/  c                       sB   e Zd Z fddZdd ZdefddZdd	 Zd
d Z  Z	S )DistributedTestBasec                       t    |   d S r   rJ  rU  rs  r2  rR  r,   r-   rU       
zDistributedTestBase.setUpc                 C   s(   z	t | j W d S  ty   Y d S w r   )rc   removerY  OSErrorr2  r,   r,   r-   r[    s
   zDistributedTestBase.tearDownr   c                 C   s(   d|v rdS d|v rdS d|v rdS dS )NrP   r>   rU   rT   rW   rV   rN   r,   r3  r   r,   r,   r-   r{     s   zDistributedTestBase.backendc                 C   sr   t | }t j| j|}t jj| || j| j	|d d| |v s,d| |v r3t j
| j	 t jj S )Nr{   rh   r   storer>   rV   )r^   get_device_modulere   distributed	FileStorerY  init_process_groupr{   rh   r   acceleratorset_device_indexdistributed_c10d_get_default_group)r3  r   num_visible_devicesr  r,   r,   r-   	create_pg  s   zDistributedTestBase.create_pgc                    s&   t |   fddt| jD S )Nc                    s   i | ]}||  gqS r,   r,   r  r  r,   r-   r    s    z6DistributedTestBase.rank_to_device.<locals>.<dictcomp>)r^   r  re   r   rh   r  r,   r  r-   rank_to_device  s   z"DistributedTestBase.rank_to_device)
r&   r'   r(   rU  r[  r+   r{   r  r  r  r,   r,   rR  r-   r    s    
r  subtest_configtest_fntest_kwargsc           
   	   O   s   t | }dd |D }dd |D }tj| D ]8}tt||}	| jdi |	 tj	  ||i ||	 tj	  W d   n1 sHw   Y  t
  qdS )a\  
    Runs a test function given by ``test_fn`` as a subtest according to the
    configurations specified by ``subtest_config``. This amortizes the
    costly setup overhead (including process spawn and initializing the
    process group) over the subtests.

    Args:
        subtest_config (Dict[str, List[Any]]): A mapping from subtest
            keyword argument name to a list of its possible values.
        test_fn (Callable): A callable that runs the actual test.
        test_args: Positional arguments to pass to ``test_fn``.
        test_kwargs: Keyword arguments to pass to ``test_fn``.
    c                 S      g | ]}|d  qS )r   r,   r   itemr,   r,   r-   r         z run_subtests.<locals>.<listcomp>c                 S   r  r  r,   r  r,   r,   r-   r     r  Nr,   )r  items	itertoolsproductdictzipsubTestr^   _dynamoresetr   r   )
cls_instr  r  	test_argsr  subtest_config_itemssubtest_config_keyssubtest_config_valuesr  subtest_kwargsr,   r,   r-   run_subtests  s   

r  c                   C   sD   t durt S ztjg dddjdka W t S  ty!   da Y t S w )a   
    If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
    Libfabric EFA interfaces and EFA software components installed,
    see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
    N)fi_infoz-pefaz-t	FI_EP_RDMF)checkr   )EFA_PROBE_RESULT
subprocessrun
returncodeFileNotFoundErrorr,   r,   r,   r-   has_efa  s   r  c                   C   s   t  rddgS dS )a  
    If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
    'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
    uses InfiniBand transport, so we exclude it from tensorpipe transports,
    see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
    shmuvN)r  r,   r,   r,   r-   tp_transports  s   r  c                    s:   du rt t|dS dd  t fdd}|S )z+
    Wrapper to use with a test method
    N)r   rh   c                    sf   t  t }fdd fdd}g }tD ]}tj|||fd}|  || q|S )Nc                          t jjkS r   r   r  _worldr,   worldr,   r-   world_is_valid!     zaspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.world_is_validc              
      s   t jd| |d z3z   W n! ty0 } ztj| t f t	| W Y d }~nd }~ww W  r;t 
  d S d S  rEt 
  w w )Nthreadedr{   r   rh   r  )r   r  BaseExceptionMultiThreadedTestCaseexception_queueputr`   exc_infor!   exception_handler  )r   world_pgr  ex)callbackr  rh   r,   r-   worker$  s    


zYspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.workerrc  rf   )r   r   	HashStorer   r  r  rg  r  )rh   r  global_storer  threadsr   tr,   )r  r  r  rh   r-   #_run_test_method_with_multi_threads  s   zIspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threadsc              	      sX   t jjd z fdd}t| W t jjd d S t jjd w )NTc                      s   g R i S r   r,   r,   )rf   rj   rg   r3  r,   r-   <lambda>@  r
  z?spawn_threads_and_init_comms.<locals>.wrapper.<locals>.<lambda>F)r^   r  _distributed_c10d_set_thread_isolation_moder  _join_threads)r3  rf   rg   r  r  rj   rh   )rf   rg   r3  r-   rk   ;  s
   "z-spawn_threads_and_init_comms.<locals>.wrapper)r   spawn_threads_and_init_commsr   )rj   r   rh   rk   r,   r   r-   r!    s   
r!  c                       s   e Zd ZdZe ZdZdd Zd(de	de	dd	f fd
dZ
dd Zdd Zd) fddZ fddZdd Zedd Zdd Zedd Zedd ZedefddZede	fd d!Zd*d"d#d$d%Zd*d"d#d&d'Z  ZS )+r  a5  
    Test runner that runs all tests with the in-proc process group using
    multiple threads with the threaded process group.

    Each test spawns world_size threads and run the test method in each thread.

    Difference from regular MultiProcess test runner:
    Must explicitly defines SetUp and call self._spawn_threads() to run the tests.
    Cannot use setUp / tearDown (must use perThreadSetup / perThreadShutdown)
        to set up / tear down each thread when running each test.
    No global state possible
        How bad of a limitation is this?
    r   c                    r;  )Nc                    s(   | j | jkr| | j  d S    d S r   )r   MAIN_THREAD_RANKr  r  r2  r  r,   r-   rk   \  s   
z2MultiThreadedTestCase.join_or_run.<locals>.wrapperr>  rA  r,   r  r-   rB  [  rC  z!MultiThreadedTestCase.join_or_runrD  rE  rF  r   Nc              
      rG  rH  rI  rP  rR  r,   r-   rK  e  rS  zMultiThreadedTestCase.__init__c                 C      d S r   r,   r2  r,   r,   r-   perThreadSetUpt  r7  z$MultiThreadedTestCase.perThreadSetUpc                 C   r#  r   r,   r2  r,   r,   r-   perThreadTearDownx  r5  z'MultiThreadedTestCase.perThreadTearDownc                    s&   t    | j| _g | _dtjd< dS )z
        setUp only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadSetUp
        r   r  N)rJ  rU  r"  r   r  rc   rd   r2  rR  r,   r-   rU  {  s   
zMultiThreadedTestCase.setUpc                    s   t    g | _dS )z
        tearDown only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadTearDown
        N)rJ  r[  r  r2  rR  r,   r-   r[    s   

zMultiThreadedTestCase.tearDownc                    s   t jjd | j}t  t | j_	 fdd}| s t
dt| jD ]}tj| jj||| jfd}|  | j| q%dS )zk
        class method to spawn threads and run test, use this method in the SetUp of your TestCase
        Tc                      r  r   r  r,   r  r,   r-   r    r	  z<MultiThreadedTestCase._spawn_threads.<locals>.world_is_validzInvalid worldr  N)r^   r  r  r  ra  r   r   r  rO  r  r  r   rh   r  r  rf  rg  r  r  )r3  r  r  r   r  r,   r  r-   _spawn_threads  s   z$MultiThreadedTestCase._spawn_threadsc                 K   sH   | |}||_ t|drt |_tj|j_tj|j_	|
||| d S )N_tls)r   hasattrr  localr'  r   
_precision	precision_rel_tolrel_tolrun_test_with_threaded_pg)r  r  r   rh   rg   r3  r,   r,   r-   rf    s   



zMultiThreadedTestCase._runc              
   C   s   t jd||| jjd |   z@zt| |  W n! ty9 } z| j|t	
 f t| W Y d}~nd}~ww W t   |   dS W t   |   dS t   |   w )zd
        Run the current test associated with `test_name` using the threaded process group.
        r
  r  N)r   r  rO  r  r$  rL  r  r  r  r`   r  r!   r  r  r%  )r3  r  r   rh   r  r,   r,   r-   r.    s&   
z/MultiThreadedTestCase.run_test_with_threaded_pgc              	   C   s   t }zLt|D ]!\}}|td| | r(tj|ttd| dd ff qt	
  g }| j sC| j }|| | j r4W t  tjjd nt  tjjd w | ||| d S )Nr   zRank failed to join in under r  F)r   r  r)  maxis_aliver  r  r  TimeoutErrorr!   r  emptyr   r  r    r^   r  r  r  r  )r  r  r	  r   idxthreadfailed_ranksfailurer,   r,   r-   r    s8   




z#MultiThreadedTestCase._join_threadsc                 C   sH  d}d}|D ]l\}}|d }t |tjr(td||t| |dk r'td j}qt |tr?d| d| d	}	t	|	 t
|	t |tr_dtj| }	t	d
|	| |d| d|	 d7 }qt |trrt|jtkrr|dk rr|j}qt|dkr}t
||dkrt D ]}
||
jkrtrtd||
j  d S t|
jqd S d S )Nr  r   rv   z3Thread %s skipping test %s for following reason: %sr   rD   zThread r  z	 seconds
z'Caught exception: 
%s exiting thread %sz exited with exception:
r  r  )
isinstancer  r  rh  ri  r+   rb   r$   r1  r   r  r  r)  r  format_exception
SystemExitr   coder)   lenr  r   r%   )r  r5  r   r	  	error_msg	skip_coder   r  excr   r  r,   r,   r-   r    sR   





z)MultiThreadedTestCase._check_return_codesc                 C   r8  r   r9  r2  r,   r,   r-   rh     r7  z MultiThreadedTestCase.world_sizec                 C   r^  r   r_  r2  r,   r,   r-   ra    s   z(MultiThreadedTestCase._current_test_namer   r  c                C   s    | j |kr| ||| dS dS )z
        The reason why we have this util function instead of
        self.assertEqual is all threads are sharing one CPU RNG
        so the assertion result is only reliable on rank 0
        N)r   r  r3  r   yr   r   r,   r,   r-   assertEqualOnRank  s   
z'MultiThreadedTestCase.assertEqualOnRankc                C   s   | j |kr| || d S d S r   )r   r  r?  r,   r,   r-   assertNotEqualOnRank'  s   
z*MultiThreadedTestCase.assertNotEqualOnRankr  r  r   )r&   r'   r(   __doc__queueQueuer  r"  rB  r+   rK  r$  r%  rU  r[  r&  r  rf  r.  r  r  r  r)   rh   ra  rA  rB  r  r,   r,   rR  r-   r  I  s0    



0	r  c                       L   e Zd Zdeejejf deddf fddZ	dejdejfdd	Z
  ZS )
SaveForwardInputsModuleforward_inputscast_forward_inputsr   Nc                    s(   t    tdd| _|| _|| _d S )Nd   )rJ  rK  nnLinearlrH  rI  r3  rH  rI  rR  r,   r-   rK  -  s   

z SaveForwardInputsModule.__init__r   c                 C   s,   || j | < | | jr|| jjjS |S r   )rH  rM  rI  toweightdtyper3  r   r,   r,   r-   forward7  s   
"zSaveForwardInputsModule.forwardr&   r'   r(   r  rK  Moduler^   Tensorr  rK  rS  r  r,   r,   rR  r-   rG  ,      
rG  c                       rF  )
SaveForwardInputsModelrH  rI  r   Nc                    s,   t    t||| _t||| _|| _d S r   )rJ  rK  rG  c1c2rH  rN  rR  r,   r-   rK  =  s   

zSaveForwardInputsModel.__init__r   c                 C   s   || j | < | | |S r   )rH  rZ  rY  rR  r,   r,   r-   rS  G  s   
zSaveForwardInputsModel.forwardrT  r,   r,   rR  r-   rX  <  rW  rX  c                 c   s    |s	t j|  dtjd< dtjd< |r1|r)t jjjj	 }t
jd|| |d nt
jd| |d t j  t jjj  zd V  W t j  t jjj  |rVt
  d S d S t j  t jjj  |rkt
  w w )	Nr   MASTER_ADDR6789MASTER_PORTfaker  r>   r   rh   )r^   r  r  rc   rd   testing	_internalr  rb  	FakeStorer   r  r  r  utilscountersclearr  )r   rh   init_pgrb  r  r,   r,   r-   _dynamo_dist_per_rank_initK  s:   





rg  c                       s4   e Zd ZdZe fddZe fddZ  ZS )#DynamoDistributedSingleProcTestCasez
    Test harness for single-process dynamo distributed tests,
    initializes dist process group.

    Prefer this for simple tests, as it's easier to debug.
    c                    sh   t    | jttjddd d| _d| j | _	d| j	v r$d n| jg| _
tjd| jdd	 d S )
Nr   12355)r[  r]  r   zcuda:rP   r>   rv   r_  )rJ  
setUpClass_exit_stackenter_contextr   r  rc   rd   r   r   
device_idsr   r  r  rR  r,   r-   rj  q  s   
	z.DynamoDistributedSingleProcTestCase.setUpClassc                    s   t   t   d S r   )r   r  rJ  tearDownClassrn  rR  r,   r-   ro    s   z1DynamoDistributedSingleProcTestCase.tearDownClass)r&   r'   r(   rC  r  rj  ro  r  r,   r,   rR  r-   rh  i  s    rh  c                	       s\   e Zd ZdZ fddZ fddZedefddZe	d	ed
e
de
ddfddZ  ZS )"DynamoDistributedMultiProcTestCasea   
    Use this for tests that actually run on multiple GPUs.

    Decorate tests with @skip_if_lt_x_gpu(ngpu)

    Note: MultiProcTestCase spawns processes per test and is slow.
    Prefer MultiThreadedTestCase for most tests. Perhaps use this one
    sparingly for integration tests.
    c                    r  r   r  r2  rR  r,   r-   rU    r  z(DynamoDistributedMultiProcTestCase.setUpc                    s2   t    z	t| j W d S  ty   Y d S w r   )rJ  r[  rc   r  rY  r  r2  rR  r,   r-   r[    s   
z+DynamoDistributedMultiProcTestCase.tearDownr   c                 C   s
   t j S r   )r^   rP   re   r2  r,   r,   r-   rh     r  z-DynamoDistributedMultiProcTestCase.world_sizer   r  rY  Nc                 K   s2   t t  | |}||_||_||| d S r   )r   
addHandlerloggingNullHandlerr   rY  r  r  r,   r,   r-   rf    s
   z'DynamoDistributedMultiProcTestCase._run)r&   r'   r(   rC  rU  r[  r  r)   rh   r  r+   rf  r  r,   r,   rR  r-   rp    s    	$rp  c                	       s   e Zd ZU dZeed< dZeed< dZee	 ed< e
ddZe
ed	< eejd
e	fddZedddZe fddZe fddZe	ddededee	 fddZ  ZS )MultiProcContinousTestru   rh   r   r   N	rdvz_filex   )secondsr   r   c                 C   s   t d)z
        ProcessGroup backend str.
        To be customized by sub test classes, e.g. "nccl".
        Here we raise error.
        z/Please implement backend_str in your test class)NotImplementedErrorrn  r,   r,   r-   backend_str  s   z"MultiProcContinousTest.backend_strFc                 C   r1  )z
        ProcessGroup init options.
        To be customized by sub test classes, e.g. ProcessGroupNCCLOpTest
        Here we return None.
        Nr,   )r  high_priority_streamr,   r,   r-   opts  s   zMultiProcContinousTest.optsc                    s   t    d| j  kr| jk sn td| j d| j | jr*t| j| j}nd}|  }| 	 }t
d| tj|| j| j||| jd tj | _t
d| j d dS )	z
        Class-scope test fixture. Run once for entire test class, before any test starts.
        Set up the process group.
        r   zBRank must be set and in the range of 0 to world_size. World size: z Rank: NzTesting backend=)r{   rh   r   r  
pg_optionsr   Rank z setup complete)rJ  rj  r   rh   r  ru  r   r  r{  ry  r  r  r   r  r  pg)r  r  r{  r{   rR  r,   r-   rj    s2   
z!MultiProcContinousTest.setUpClassc                    sR   t   t   | jrzt| j W n	 ty   Y nw td| j	 d dS )z
        Class-scope test fixture. Run once for entire test class, after all tests finish.
        Tear down the process group.
        r}  z teardown completeN)
r   r  rJ  ro  ru  rc   r  r  r  r   rn  rR  r,   r-   ro    s   
z$MultiProcContinousTest.tearDownClassc                 C   s   || _ || _|| _t  dS )ad  
        This is an entry point for each rank to run the tests in `MultiProcContinousTest`.
        In this entry point, we set the class variables for the test class.
        Then we run all tests.

        Note:
        - This helper only works for a subclass of `MultiProcContinousTest`.

        Example:
        - See `test_c10d_ops_nccl.py`.
        N)r   rh   ru  r   )r  r   rh   ru  r,   r,   r-   run_rank  s   
zMultiProcContinousTest.run_rank)Fr   )r&   r'   r(   rh   r)   r*   r   ru  r   r+   r   r   r  abcabstractmethodry  r{  rj  ro  r  r  r,   r,   rR  r-   rt    s.   
 rt  r   r  r  )TF)r  r{  r  rr  rd  rc   rD  r  r`   r$  r  r  r  r?  r  
contextlibr   dataclassesr   datetimer   enumr   	functoolsr   r   r   ior	   typingr
   r   r   r   r   unittest.mockr   torch._logging._internalr   r^   torch._dynamo.test_casetorch.cuda.nccltorch.distributedr  r   torch._C._autogradr   torch._C._distributed_c10dr   torch.nnrK  $torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   r   r   r   5torch.testing._internal.distributed.multi_threaded_pgr   r    r!   r  basicConfigINFO	getLoggerr&   rh  r#   rb   rJ   rn   rs   rw   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r)   r  r   r   r   getenvr   r   r   r   r  r+   r  r  r%  r*   r+  r-  r:  r/  r  r  r  r  r  r  r  r!  r  rU  rG  rX  rg  r  	test_caserh  rp  rt  r,   r,   r,   r-   <module>   s,  
8






	










.


,
  i'
#
8 d $