o
    hR                     @   sP  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
Z
ddl
mZ dd	l
mZ dd
l
mZ ddl
mZ ddl
mZ ddl
mZ ddl
mZ ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddlm!Z! ddlm"Z" ddlm#Z# ddlm$Z$ dd lm%Z% e&ed!e' Z(d"d# Z)ej*j+d$d%G d&d' d'eZ,G d(d) d)e,Z-ej*j+d$d%G d*d+ d+e,Z.ej*j+d$d%G d,d- d-e,Z/G d.d/ d/e,Z0ej*j1ed0d1G d2d3 d3e,Z2G d4d5 d5eZ3dS )6zFTests for psutil.net_connections() and Process.net_connections() APIs.    N)closing)AF_INET)AF_INET6)
SOCK_DGRAM)SOCK_STREAM)FREEBSD)LINUX)MACOS)NETBSD)OPENBSD)POSIX)SUNOS)WINDOWS)supports_ipv6)AF_UNIX)HAS_NET_CONNECTIONS_UNIX)SKIP_SYSCONS)PsutilTestCase)bind_socket)bind_unix_socket)check_connection_ntuple)create_sockets)filter_proc_net_connections)pytest)reap_children)retry_on_failure)skip_on_access_denied)tcp_socketpair)unix_socketpair)wait_for_fileSOCK_SEQPACKETc                 C   s$   t  j| d}| dv rt|S |S )Nkind>   allunix)psutilProcessnet_connectionsr   )r"   cons r)   Q/var/www/vscode/kcb/lib/python3.10/site-packages/psutil/tests/test_connections.pythis_proc_net_connections1   s   r+   serial)namec                   @   s&   e Zd Zdd Zdd Zd	ddZdS )
ConnectionTestCasec                 C      t ddg ks	J d S Nr#   r!   r+   selfr)   r)   r*   setUp:   s   zConnectionTestCase.setUpc                 C   r/   r0   r1   r2   r)   r)   r*   tearDown=   s   zConnectionTestCase.tearDownr#   c                    s`   zt j|d}W n t jy   trY dS  w  fdd|D }|  |  ||ks.J dS )zGiven a process PID and its list of connections compare
        those against system-wide connections retrieved via
        psutil.net_connections.
        r!   Nc                    s"   g | ]}|j  kr|d d qS )Npid.0cr7   r)   r*   
<listcomp>P   s   " zBConnectionTestCase.compare_procsys_connections.<locals>.<listcomp>)r%   r'   AccessDeniedr	   sort)r3   r8   	proc_consr"   sys_consr)   r7   r*   compare_procsys_connectionsA   s   z.ConnectionTestCase.compare_procsys_connectionsN)r#   )__name__
__module____qualname__r4   r5   rA   r)   r)   r)   r*   r.   8   s    r.   c                   @   s4   e Zd Zejjedddd Zdd Zdd Z	d	S )
TestBasicOperationsrequires rootreasonc                 C   sF   t   tjddD ]}t| q
W d    d S 1 sw   Y  d S r0   )r   r%   r'   r   r3   connr)   r)   r*   test_systemW   s
   
"zTestBasicOperations.test_systemc                 C   sD   t   tddD ]}t| q	W d    d S 1 sw   Y  d S r0   )r   r+   r   rI   r)   r)   r*   test_process]   s
   
"z TestBasicOperations.test_processc                 C   sp   t t tdd W d    n1 sw   Y  t t tjdd W d    d S 1 s1w   Y  d S )Nz???r!   )r   raises
ValueErrorr+   r%   r'   r2   r)   r)   r*   test_invalid_kindb   s   "z%TestBasicOperations.test_invalid_kindN)
rB   rC   rD   r   markskipifr   rK   rL   rO   r)   r)   r)   r*   rE   V   s
    
rE   c                   @   s   e Zd ZdZdd Zdd Zdd Zejj	e
  dd	d
d Zdd Zejj	e
  dd	dd Zejj	e dd	dd Zejj	e dd	dd ZdS )TestUnconnectedSocketsz;Tests sockets which are open but not connected to anything.c                 C   sl   t dd}dd |D }tstr||  S t|dksJ |d jdkr2||  j| ks2J |d S )Nr#   r!   c                 S   s   i | ]}|j |qS r)   )fdr9   r)   r)   r*   
<dictcomp>o   s    z=TestUnconnectedSockets.get_conn_from_sock.<locals>.<dictcomp>   r   r6   )r+   r
   r   filenolenrS   )r3   sockr(   smapr)   r)   r*   get_conn_from_sockm   s   
z)TestUnconnectedSockets.get_conn_from_sockc                 C   s   |  |}t| |jdkr|j| ksJ |j|jksJ |j|tjtj	ks,J |
 }|s;t|tr;| }|jtkrF|dd }|j|ksMJ |jtkrctrctdd}| jt |dd |S )zGiven a socket, makes sure it matches the one obtained
        via psutil. It assumes this process created one connection
        only (the one supposed to be checked).
        r6   N   r#   r!   )rZ   r   rS   rV   familytype
getsockoptsocket
SOL_SOCKETSO_TYPEgetsockname
isinstancebytesdecoder   laddrr   r   r+   rA   osgetpid)r3   rX   rJ   rf   r(   r)   r)   r*   check_socketz   s    



z#TestUnconnectedSockets.check_socketc                 C   d   d}t ttt|d}| |}|jdksJ |jtjks J W d    d S 1 s+w   Y  d S N	127.0.0.1r   addrr)   )	r   r   r   r   ri   raddrstatusr%   CONN_LISTENr3   ro   rX   rJ   r)   r)   r*   test_tcp_v4      
"z"TestUnconnectedSockets.test_tcp_v4zIPv6 not supportedrG   c                 C   rj   N)::1r   rn   r)   )	r   r   r   r   ri   rp   rq   r%   rr   rs   r)   r)   r*   test_tcp_v6      
"z"TestUnconnectedSockets.test_tcp_v6c                 C   rj   rk   )	r   r   r   r   ri   rp   rq   r%   	CONN_NONErs   r)   r)   r*   test_udp_v4   ru   z"TestUnconnectedSockets.test_udp_v4c                 C   rj   rv   )	r   r   r   r   ri   rp   rq   r%   rz   rs   r)   r)   r*   test_udp_v6   ry   z"TestUnconnectedSockets.test_udp_v6
POSIX onlyc                 C   f   |   }tt|td}| |}|jdksJ |jtjks!J W d    d S 1 s,w   Y  d S N)r]    	
get_testfnr   r   r   ri   rp   rq   r%   rz   r3   testfnrX   rJ   r)   r)   r*   test_unix_tcp      
"z$TestUnconnectedSockets.test_unix_tcpc                 C   r~   r   r   r   r)   r)   r*   test_unix_udp   r   z$TestUnconnectedSockets.test_unix_udpN)rB   rC   rD   __doc__rZ   ri   rt   r   rP   rQ   r   rx   r{   r|   r   r   r   r)   r)   r)   r*   rR   i   s    


rR   c                   @   sB   e Zd ZdZejjedddd Zejje	 dddd Z
d	S )
TestConnectedSocketzFTest socket pairs which are actually connected to
    each other.
    zunreliable on SUONSrG   c                 C   s   d}t ddg ksJ tt|d\}}z,t dd}t|dks!J |d jtjks+J |d jtjks5J W |  |  d S |  |  w )Nrl   tcp4r!   rn   r[   r   rU   )r+   r   r   rW   rq   r%   CONN_ESTABLISHEDclose)r3   ro   serverclientr(   r)   r)   r*   test_tcp   s   

zTestConnectedSocket.test_tcpr}   c                 C   s*  |   }t|\}}ztdd}|d jr|d jrJ ||d jr,|d jr,J |ts0tr7dd |D }t|dks?J tsGtsGt	sGt
rh|d jdksPJ |d jdksYJ ||d jpc|d jksgJ n|d jpq|d j|ksJ W |  |  d S W |  |  d S |  |  w )	Nr$   r!   r   rU   c                 S   s   g | ]	}|j d kr|qS )z/var/run/log)rp   r9   r)   r)   r*   r<          z1TestConnectedSocket.test_unix.<locals>.<listcomp>r[   r   )r   r   r+   rf   rp   r
   r   rW   r   r   r   r   )r3   r   r   r   r(   r)   r)   r*   	test_unix   s,   

zTestConnectedSocket.test_unixN)rB   rC   rD   r   r   rP   rQ   r   r   r   r   r)   r)   r)   r*   r      s    
r   c                   @   s.   e Zd Zdd Zeeddd Zdd ZdS )	TestFiltersc                 C   s   dd }t  n |dtttgtttg |dttgttg |dtgttg |dttgtg |dtgtg |dtgtg |d	ttgtg |d
tgtg |dtgtg trm|dtgtttg W d    d S W d    d S 1 sxw   Y  d S )Nc                 S   sf   t | dD ]}|j|v sJ |j|v sJ qts/tj| dD ]}|j|v s'J |j|v s.J qd S d S )Nr!   )r+   r\   r]   r   r%   r'   )r"   familiestypesrJ   r)   r)   r*   check  s   z'TestFilters.test_filters.<locals>.checkr#   inetinet4tcpr   tcp6udpudp4udp6r$   )r   r   r   r   r   r   r    r   )r3   r   r)   r)   r*   test_filters   s0   	"zTestFilters.test_filters)only_ifc                    s  t    fdd}td}td}tj jt d}|jt	t
d|d}|jt	t
d|d}|jt	td|d}|jt	td|d} |}	tt|d	d
}
 |}tt|d	d
}t r |}tt|d	d
} |}tt|d	d
}nd }d }d }d }t  D ]d}| }t|dksJ |D ]S}|j|	jkr|||t
t|
dtjd q|j|jkr|||t
t|dtjd q|jt|dd kr|||tt|dtjd q|jt|dd kr|||tt|dtjd qqd S )Nc                    s   d}t | |j|ksJ |j|ksJ |j|ksJ |j|ks"J |j|ks)J |D ]}	| j|	d}
|	|v r>|
g ks=J q+|
g ksDJ q+trQ | j	|g d S d S )N)
r#   r   r   inet6r   r   r   r   r   r   r!   )
r   r\   r]   rf   rp   rq   r'   r   rA   r8   )procrJ   r\   r]   rf   rp   rq   kinds	all_kindsr"   r(   r2   r)   r*   
check_conn#  s   z+TestFilters.test_combos.<locals>.check_conna4  
            import socket, time
            s = socket.socket({family}, socket.SOCK_STREAM)
            s.bind(('{addr}', 0))
            s.listen(5)
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            [time.sleep(0.1) for x in range(100)]
            a  
            import socket, time
            s = socket.socket({family}, socket.SOCK_DGRAM)
            s.bind(('{addr}', 0))
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            [time.sleep(0.1) for x in range(100)]
            )dirrm   )r\   ro   r   rw   T)deleterU   r)   )r#   r   r   r   r   )r#   r   r   r   r   r8   )r#   r   r   r   r   )r#   r   r   r   r   )r   textwrapdedentrg   pathbasenamer   getcwdformatintr   r   pyrunevalr   r   r%   r&   childrenr'   rW   r8   r   rr   r   rz   getattr)r3   r   tcp_templateudp_templatetestfiletcp4_templateudp4_templatetcp6_templateudp6_template	tcp4_proc	tcp4_addr	udp4_proc	udp4_addr	tcp6_proc	tcp6_addr	udp6_proc	udp6_addrpr(   rJ   r)   r2   r*   test_combos  s   











zTestFilters.test_combosc                 C   s  t  s tdd}t|t rdndksJ |D ]}|jtthv s$J |jtks+J qtdd}t|dks9J |d jtksBJ |d jtksKJ t rmtdd}t|dks[J |d jtksdJ |d jtksmJ tdd}t|t rzdndksJ |D ]}|jtthv sJ |jt	ksJ qtd	d}t|dksJ |d jtksJ |d jt	ksJ t rtd
d}t|dksJ |d jtksJ |d jt	ksJ tdd}t|t rdndksJ |D ]}|jtthv sJ |jtt	hv sJ qt r(tdd}t|dksJ |D ]}|jtksJ |jtt	hv s&J qt
rWts_tsgtdd}t|dks?J |D ]-}|jtksKJ |jtt	hv sUJ qAW d    d S W d    d S W d    d S W d    d S 1 s{w   Y  d S )Nr   r!   r[   rU   r   r   r   r   r   r   r      r   r$      )r   r+   rW   r   r\   r   r   r]   r   r   r   r   r
   r   )r3   r(   rJ   r)   r)   r*   
test_count  sl   









225$zTestFilters.test_countN)rB   rC   rD   r   r   r	   r   r   r)   r)   r)   r*   r      s    
 r   rF   rG   c                   @   s&   e Zd ZdZdd Ze dd ZdS )TestSystemWideConnectionszTests for net_connections().c                 C   s   dd }t  : ddlm} | D ]&\}}|dkrtsq|\}}t|}t|tt|ks2J |||| qW d    d S 1 sDw   Y  d S )Nc                 S   s<   | D ]}|j |v sJ |j tkr|j|v sJ t| qd S )N)r\   r   r]   r   )r(   r   types_rJ   r)   r)   r*   r     s   

z0TestSystemWideConnections.test_it.<locals>.checkr   )	conn_tmapr$   )	r   psutil._commonr   itemsr   r%   r'   rW   set)r3   r   r   r"   groupsr   r   r(   r)   r)   r*   test_it  s   
"z!TestSystemWideConnections.test_itc                    s   t  }t|}W d    n1 sw   Y  g d}g }t|D ]}|  }|| td| d}| |}|j q!|D ]}t	| qCfddt
jddD }	D ]! t fdd|	D |ksjJ t
 }
t|
d|kszJ qYd S )	N
   z                import time, os
                from psutil.tests import create_sockets
                with create_sockets():
                    with open(r'z', 'w') as f:
                        f.write("hello")
                    [time.sleep(0.1) for x in range(100)]
                c                    s   g | ]	}|j  v r|qS r)   r7   r:   x)pidsr)   r*   r<     s    zFTestSystemWideConnections.test_multi_sockets_procs.<locals>.<listcomp>r#   r!   c                    s   g | ]	}|j  kr|qS r)   r7   r   r7   r)   r*   r<      r   )r   rW   ranger   appendr   r   r   r8   r   r%   r'   r&   )r3   socksexpectedtimesfnames_fnamesrcsprocsysconsr   r)   )r8   r   r*   test_multi_sockets_procs  s0   







z2TestSystemWideConnections.test_multi_sockets_procsN)rB   rC   rD   r   r   r   r   r)   r)   r)   r*   r     s
    r   c                   @   s   e Zd Zdd ZdS )TestMiscc                 C   s   g }g }t tD ].}|dr6tt|}t|}| s J |t|vs&J ||vs,J || || qtr?tj tj	 t
rFtj d S d S )NCONN_)r   r%   
startswithr   strisupperr   r   	CONN_IDLE
CONN_BOUNDr   CONN_DELETE_TCB)r3   intsstrsr-   numstr_r)   r)   r*   test_net_connection_constants&  s$   




z&TestMisc.test_net_connection_constantsN)rB   rC   rD   r   r)   r)   r)   r*   r   %  s    r   )4r   rg   r_   r   
contextlibr   r   r   r   r   r%   r   r   r	   r
   r   r   r   r   r   r   psutil.testsr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   objectr    r+   rP   xdist_groupr.   rE   rR   r   r   rQ   r   r   r)   r)   r)   r*   <module>   sb   ]7 g?