o
    Hh                      @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
mZmZ zd dlZW n	 ey=   Y nw eeZdd Zd
de
e fddZG dd	 d	ZdS )    N)OptionalTextIOUnionc                  C   s   t jddt jt jd} | D ]<}|\}}}}}zt  |||}|d |d |W   S  tyI } z|  td|  W Y d}~qd}~ww t	d)a  
    Find a free port and binds a temporary socket to it so that the port can be "reserved" until used.

    .. note:: the returned socket must be closed before using the port,
              otherwise a ``address already in use`` error will happen.
              The socket should be held and closed as close to the
              consumer of the port as possible since otherwise, there
              is a greater chance of race-condition where a different
              process may see the port as being free and take it.

    Returns: a socket binded to the reserved free port

    Usage::

    sock = find_free_port()
    port = sock.getsockname()[1]
    sock.close()
    use_port(port)
    	localhostN)hostportfamilytype)r   r   r   z Socket creation attempt failed: zFailed to create a socket)
socketgetaddrinfo	AF_UNSPECSOCK_STREAMbindlistenOSErrorcloseprintRuntimeError)addrsaddrr   r	   proto_se r   d/var/www/vscode/kcb/lib/python3.10/site-packages/torch/distributed/elastic/rendezvous/etcd_server.pyfind_free_port   s    


r   data_dirc                 C   sP   | r|   d u rtd |   |   |r&td| tj|dd d S d S )Nzstopping etcd serverzdeleting etcd data dir: %sTignore_errors)pollloggerinfo	terminatewaitshutilrmtree)
subprocessr   r   r   r   	stop_etcdC   s   
r(   c                
   @   s   e Zd ZdZddee fddZdejfddZ	de
fd	d
ZdefddZdefddZ			dde
de
dee
edf ddfddZ	d dede
dee
edf ddfddZdd Zd!de
ddfddZd"ddZdS )#
EtcdServera  
    .. note:: tested on etcd server v3.4.3.

    Starts and stops a local standalone etcd server on a random free
    port. Useful for single node, multi-worker launches or testing,
    where a sidecar etcd server is more convenient than having to
    separately setup an etcd server.

    This class registers a termination handler to shutdown the etcd
    subprocess on exit. This termination handler is NOT a substitute for
    calling the ``stop()`` method.

    The following fallback mechanism is used to find the etcd binary:

    1. Uses env var TORCHELASTIC_ETCD_BINARY_PATH
    2. Uses ``<this file root>/bin/etcd`` if one exists
    3. Uses ``etcd`` from ``PATH``

    Usage
    ::

     server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
     server.start()
     client = server.get_client()
     # use client
     server.stop()

    Args:
        etcd_binary_path: path of etcd server binary (see above for fallback path)
    Nr   c                 C   sp   d| _ d| _tjt}tj|d}tjd|| _	tj
| j	s%d| _	|r)|ntjdd| _d | _d | _d S )Nr   zbin/etcdTORCHELASTIC_ETCD_BINARY_PATHetcdtorchelastic_etcd_data)prefix)_port_hostospathdirname__file__joinenvironget_etcd_binary_pathisfiletempfilemkdtemp_base_data_dir	_etcd_cmd
_etcd_proc)selfr   rootdefault_etcd_binr   r   r   __init__n   s   
zEtcdServer.__init__returnc                 C   s   | j std| j S )Nz>No etcd server process started. Call etcd_server.start() first)r>   r   r?   r   r   r   _get_etcd_server_process   s
   z#EtcdServer._get_etcd_server_processc                 C      | j S )z)Return the port the server is running on.)r/   rD   r   r   r   get_port      zEtcdServer.get_portc                 C   rF   )z)Return the host the server is running on.)r0   rD   r   r   r   get_host   rH   zEtcdServer.get_hostc                 C   s   | j  d| j S )z,Return the etcd server endpoint (host:port).:)r0   r/   rD   r   r   r   get_endpoint   s   zEtcdServer.get_endpoint<      timeoutnum_retriesstderrc              
   C   s   d}	 zt j| jt|}t j|dd | |||W S  tyL } z$|d7 }t| j	 t
dt| ||krBtj| jdd  W Y d}~nd}~ww q)a  
        Start the server, and waits for it to be ready. When this function returns the sever is ready to take requests.

        Args:
            timeout: time (in seconds) to wait for the server to be ready
                before giving up.
            num_retries: number of retries to start the server. Each retry
                will wait for max ``timeout`` before considering it as failed.
            stderr: the standard error file handle. Valid values are
                `subprocess.PIPE`, `subprocess.DEVNULL`, an existing file
                descriptor (a positive integer), an existing file object, and
                `None`.

        Raises:
            TimeoutError: if the server is not ready within the specified timeout
        r   T)exist_ok   z4Failed to start etcd server, got error: %s, retryingr   N)r1   r2   r5   r<   strmakedirs_start	Exceptionr(   r>   r!   warningr%   r&   atexitregister)r?   rN   rO   rP   curr_retriesr   r   r   r   r   start   s&   
zEtcdServer.startc                 C   s   t  }t  }| d | _| d }td| jdd|dd| j d| j dd| j d| j d	d| j d| g
}t	d
| |
  |
  tj|d|d| _| | d S )NrR    z--enable-v2z
--data-dirz--listen-client-urlszhttp://rJ   z--advertise-client-urlsz--listen-peer-urlszStarting etcd server: [%s]T)	close_fdsrP   )r   getsocknamer/   shlexsplitr5   r8   r0   r!   r"   r   r'   Popenr>   _wait_for_ready)r?   r   rN   rP   sock	sock_peer	peer_portetcd_cmdr   r   r   rU      s0   zEtcdServer._startc                 C   s   t j| j| jdddS )zNReturn an etcd client object that can be used to make requests to this server./v2
   r   r   version_prefixread_timeout)r,   Clientr0   r/   rD   r   r   r   
get_client   s   zEtcdServer.get_clientc                 C   s   t j| j | jddd}t | }t |k rK|   d ur,|  j}td| z
t	
d|j W d S  tyD   td Y nw t |k std)Nrg      ri   z*Etcd server process exited with the code: zetcd server ready. version: %srR   z.Timed out waiting for etcd server to be ready!)r,   rl   r0   r/   timerE   r    
returncoder   r!   r"   versionrV   sleepTimeoutError)r?   rN   clientmax_timeexitcoder   r   r   rb      s$   
zEtcdServer._wait_for_readyc                 C   s   t d t| j| j dS )zGStop the server and cleans up auto generated resources (e.g. data dir).zEtcdServer stop method calledN)r!   r"   r(   r>   r<   rD   r   r   r   stop   s   
zEtcdServer.stopN)rL   rM   N)rL   N)rL   )rC   N)__name__
__module____qualname____doc__r   rS   rB   r'   ra   rE   intrG   rI   rK   r   r   r[   rU   rm   rb   rw   r   r   r   r   r)   N   s@    
(
 r)   rx   )rX   loggingr1   r_   r%   r
   r'   r:   ro   typingr   r   r   r,   ModuleNotFoundError	getLoggerry   r!   r   rS   r(   r)   r   r   r   r   <module>   s&   
%