a
    3jr                     @   s0  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZmZ d dlZd dlZd dlmZ zd dlZW n ey   dZY n0 dZejdkZedpd	Zed
pd	Zdev pdev Zdev pdev ZdddZdd Zedd ZG dd dejZ e!dkr,e"  dS )    )contextmanagerN)support)script_helper
is_android)dedentg      ?ntZCFLAGS ZCONFIG_ARGSz-fsanitize=undefinedz#--with-undefined-behavior-sanitizerz-fsanitize=memoryz--with-memory-sanitizer   c                 C   sL   |}|d|  7 }|d| 7 }d|k r<d|d |d   | S d| d S d S )Nz#  File "<string>", line %s in func
z&  File "<string>", line %s in <module>r	   ^
$ )Zlineno1Zlineno2header	min_countregexr   r   R/www/server/python_manager/versions/3.9.10/lib/python3.9/test/test_faulthandler.pyexpected_traceback"   s    r   c                 C   s   t td| S )Nz(raising SIGSEGV on Android is unreliable)unittestskipIfr   )testr   r   r   skip_segfault_on_android+   s
    r   c               	   c   s.   t  } z| V  W t|  nt|  0 d S N)tempfilemktempr   unlinkfilenamer   r   r   temporary_filename0   s    r   c                   @   s:  e Zd ZdddZdddddddddZdd	d
Zdd Zee	j
dddd Zedd Zdd Zdd Zee	j
dkddd Zeedu deeeddedd Zeedu deeed d!ed"d# Zd$d% Zd&d' Zee	j
d(d)eeed* d+d,d- Zed.d/ ZeepBed0ed1d2 Zee	j
dkd3eepped0ed4d5 Z ed6d7 Z!ed8d9 Z"d:d; Z#d<d= Z$d>d? Z%d@dA Z&dddBdCdDZ'dEdF Z(dGdH Z)ee	j
dkd3dIdJ Z*dKdL Z+dMdN Z,dOdP Z-dQdR Z.ddddBdTdUZ/dVdW Z0dXdY Z1dZd[ Z2d\d] Z3ee	j
dkd3d^d_ Z4d`da Z5eeedb dcddddeZ6dfdg Z7dhdi Z8djdk Z9ee	j
dkd3dldm Z:dndo Z;dpdq Z<e=drds Z>dtdu Z?ee@dvdwdx ZAee@dvdydz ZBee@dvd{d| ZCee@dvd}d~ ZDdd ZEdS )FaultHandlerTestsNc           
   	   C   sd  t | }g }|d ur"|| t X tjd||d}|$ | \}}| }W d    n1 sj0    Y  W d    n1 s0    Y  |	dd}|r| 
|d t|d}	|	 }W d    n1 s0    Y  |	dd}nj|d urX| 
|d t|tjd t|ddd	}	|	 }W d    n1 sB0    Y  |	dd}| |fS )
N-c)pass_fdsasciibackslashreplacer   rbr   F)closefd)r   stripappendr   ZSuppressCrashReportr   Zspawn_pythoncommunicatewaitdecodeassertEqualopenreadoslseekSEEK_SET
splitlines)
selfcoder   fdr    processoutputstderrexitcodefpr   r   r   
get_output9   s,    


D&
(zFaultHandlerTests.get_outputTF)r   all_threadsother_regexr3   know_current_threadpy_fatal_errorc                C   s   |r|rd}
qd}
nd}
d}|	r(|d7 }t |j|||
d }|rP|d| 7 }| j|||d\}}d	|}| || | |d
 d S )NzCurrent thread 0x[0-9a-f]+zThread 0x[0-9a-f]+ZStackz
            (?m)^{fatal_error}

            {header} \(most recent call first\):
              File "<string>", line {lineno} in <module>
            z"
Python runtime state: initialized)linenofatal_errorr   |r   r3   r   r   )r   formatr%   r9   joinassertRegexassertNotEqual)r1   r2   line_numberr?   r   r:   r;   r3   r<   r=   r   r   r5   r7   r   r   r   check_errorZ   s&    


zFaultHandlerTests.check_errorc                 K   s2   |rd||f }d| }| j |||fi | d S )Nz%s: %szFatal Python error: %srG   )r1   r2   rF   
name_regexfunckwr?   r   r   r   check_fatal_error~   s    z#FaultHandlerTests.check_fatal_errorc                 K   s"   d| }| j |||fi | d S )NzWindows fatal exception: %srH   )r1   r2   rF   rI   rK   r?   r   r   r   check_windows_exception   s    z)FaultHandlerTests.check_windows_exceptionZaixz5the first page of memory is a mapped read-only on AIXc                 C   s&   t s| ddd n| ddd d S )Nz
                import faulthandler
                faulthandler.enable()
                faulthandler._read_null()
                   z4(?:Segmentation fault|Bus error|Illegal instruction)access violation)
MS_WINDOWSrL   rM   r1   r   r   r   test_read_null   s    z FaultHandlerTests.test_read_nullc                 C   s   |  ddd d S )Nzs
            import faulthandler
            faulthandler.enable()
            faulthandler._sigsegv()
            rN   Segmentation faultrL   rQ   r   r   r   test_sigsegv   s    zFaultHandlerTests.test_sigsegvc                 C   s   | j ddddddd d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler._fatal_error_c_thread()
            rN   zin new threadFZfaulthandler_fatal_error_threadT)r<   rJ   r=   rT   rQ   r   r   r   test_fatal_error_c_thread   s    z+FaultHandlerTests.test_fatal_error_c_threadc                 C   s   |  ddd d S )Nzs
            import faulthandler
            faulthandler.enable()
            faulthandler._sigabrt()
            rN   ZAbortedrT   rQ   r   r   r   test_sigabrt   s    zFaultHandlerTests.test_sigabrtwin32z"SIGFPE cannot be caught on Windowsc                 C   s   |  ddd d S )Nzr
            import faulthandler
            faulthandler.enable()
            faulthandler._sigfpe()
            rN   zFloating point exceptionrT   rQ   r   r   r   test_sigfpe   s    zFaultHandlerTests.test_sigfpezneed _testcapiSIGBUSzneed signal.SIGBUSc                 C   s   |  ddd d S )Nz
            import faulthandler
            import signal

            faulthandler.enable()
            signal.raise_signal(signal.SIGBUS)
               z	Bus errorrT   rQ   r   r   r   test_sigbus   s    zFaultHandlerTests.test_sigbusSIGILLzneed signal.SIGILLc                 C   s   |  ddd d S )Nz
            import faulthandler
            import signal

            faulthandler.enable()
            signal.raise_signal(signal.SIGILL)
            r[   zIllegal instructionrT   rQ   r   r   r   test_sigill   s    zFaultHandlerTests.test_sigillc                 C   s   | j dddddd d S )Nz[
            import faulthandler
            faulthandler._fatal_error(b'xyz')
               xyzfaulthandler_fatal_error_pyTrJ   r=   rT   rQ   r   r   r   test_fatal_error   s    z"FaultHandlerTests.test_fatal_errorc                 C   s   | j dddddd d S )Nza
            import faulthandler
            faulthandler._fatal_error(b'xyz', True)
            r_   r`   ra   Trb   rT   rQ   r   r   r   test_fatal_error_without_gil   s    z.FaultHandlerTests.test_fatal_error_without_gilZopenbsdzVIssue #12868: sigaltstack() doesn't work on OpenBSD if Python is compiled with pthreadZ_stack_overflowz#need faulthandler._stack_overflow()c                 C   s   | j ddddd d S )Nzz
            import faulthandler
            faulthandler.enable()
            faulthandler._stack_overflow()
            rN   z (?:Segmentation fault|Bus error)z unable to raise a stack overflow)r;   rT   rQ   r   r   r   test_stack_overflow   s
    z%FaultHandlerTests.test_stack_overflowc                 C   s   |  ddd d S )Nzw
            import faulthandler
            faulthandler.enable()
            faulthandler._sigsegv(True)
            rN   rS   rT   rQ   r   r   r   test_gil_released  s    z#FaultHandlerTests.test_gil_releasedz0sanitizer builds change crashing process output.c                 C   sH   t  .}| jdjt|ddd|d W d    n1 s:0    Y  d S )Nz
                import faulthandler
                output = open({filename}, 'wb')
                faulthandler.enable(output)
                faulthandler._sigsegv()
                r      rS   )r   rL   rB   reprr1   r   r   r   r   test_enable_file  s    z"FaultHandlerTests.test_enable_filez.subprocess doesn't support pass_fds on Windowsc                 C   sL   t d.}| }| jd| dd|d W d    n1 s>0    Y  d S )Nwb+z
                import faulthandler
                import sys
                faulthandler.enable(%s)
                faulthandler._sigsegv()
                rg   rS   r3   )r   TemporaryFilefilenorL   )r1   r8   r3   r   r   r   test_enable_fd!  s    z FaultHandlerTests.test_enable_fdc                 C   s   | j ddddd d S )Nz
            import faulthandler
            faulthandler.enable(all_threads=False)
            faulthandler._sigsegv()
            rN   rS   Fr:   rT   rQ   r   r   r   test_enable_single_thread3  s
    z+FaultHandlerTests.test_enable_single_threadc                 C   sH   d}d}|  |\}}d|}| ||vd||f  | |d d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            faulthandler._sigsegv()
            zFatal Python errorr   z%r is present in %rr   )r9   rC   
assertTruerE   )r1   r2   Znot_expectedr6   r7   r   r   r   test_disable>  s    


zFaultHandlerTests.test_disablec                 C   s   t j}zzt jt _t }zFt  | t  t  | t  W |rVt  qxt  n|rnt  nt  0 W |t _n|t _0 d S r   )	sysr6   
__stderr__faulthandler
is_enabledenablerr   disableZassertFalse)r1   Zorig_stderrZwas_enabledr   r   r   test_is_enabledM  s     


z!FaultHandlerTests.test_is_enabledc                 C   s0   d}t jdd|f}t|}| | d d S )N5import faulthandler; print(faulthandler.is_enabled())-Er      False)rt   
executable
subprocesscheck_outputr*   rstrip)r1   r2   argsr5   r   r   r   test_disabled_by_defaultc  s    
z*FaultHandlerTests.test_disabled_by_defaultc                 C   s`   d}t d tjtjjrdndddd|f}tj }|dd  t	j
||d}| | d	 d S )
Nr{   r|   r   z-Xrv   r   PYTHONFAULTHANDLERenv   True)filterrt   r~   flagsignore_environmentr-   environcopypopr   r   r*   r   r1   r2   r   r   r5   r   r   r   test_sys_xoptionsk  s    
z#FaultHandlerTests.test_sys_xoptionsc                 C   s   d}t jd|f}ttj}d|d< d|d< tj||d}| | d ttj}d|d< d|d< tj||d}| | d	 d S )
Nr{   r   r   r   ZPYTHONDEVMODEr   r}   1r   )	rt   r~   dictr-   r   r   r   r*   r   r   r   r   r   test_env_varw  s    

zFaultHandlerTests.test_env_varrA   c                C   sl   d}|j ||d}|rd}n|d ur*d}nd}dd| dd	g}| |||\}}| || | |d
 d S )Na[  
            import faulthandler

            filename = {filename!r}
            fd = {fd}

            def funcB():
                if filename:
                    with open(filename, "wb") as fp:
                        faulthandler.dump_traceback(fp, all_threads=False)
                elif fd is not None:
                    faulthandler.dump_traceback(fd,
                                                all_threads=False)
                else:
                    faulthandler.dump_traceback(all_threads=False)

            def funcA():
                funcB()

            funcA()
            rA   	         Stack (most recent call first):z#  File "<string>", line %s in funcBz#  File "<string>", line 17 in funcAz&  File "<string>", line 19 in <module>r   rB   r9   r*   )r1   r   r3   r2   r>   expectedtracer7   r   r   r   check_dump_traceback  s$    z&FaultHandlerTests.check_dump_tracebackc                 C   s   |    d S r   )r   rQ   r   r   r   test_dump_traceback  s    z%FaultHandlerTests.test_dump_tracebackc                 C   s6   t  }| j|d W d    n1 s(0    Y  d S Nr   )r   r   ri   r   r   r   test_dump_traceback_file  s    z*FaultHandlerTests.test_dump_traceback_filec                 C   s>   t d }| j| d W d    n1 s00    Y  d S Nrk   rl   )r   rm   r   rn   r1   r8   r   r   r   test_dump_traceback_fd  s    z(FaultHandlerTests.test_dump_traceback_fdc                 C   sd   d}d|d  }d| d }d}|j |d}dd| d	g}| |\}}| || | |d
 d S )Ni  x2   z...z
            import faulthandler

            def {func_name}():
                faulthandler.dump_traceback(all_threads=False)

            {func_name}()
            )	func_namer   z  File "<string>", line 4 in %sz%  File "<string>", line 6 in <module>r   r   )r1   maxlenr   Z	truncatedr2   r   r   r7   r   r   r   test_truncate  s    zFaultHandlerTests.test_truncatec                 C   sp   d}|j t|d}| ||\}}d|}|r8d}nd}d}t|j |d }| || | |d d S )	Na  
            import faulthandler
            from threading import Thread, Event
            import time

            def dump():
                if {filename}:
                    with open({filename}, "wb") as fp:
                        faulthandler.dump_traceback(fp, all_threads=True)
                else:
                    faulthandler.dump_traceback(all_threads=True)

            class Waiter(Thread):
                # avoid blocking if the main thread raises an exception.
                daemon = True

                def __init__(self):
                    Thread.__init__(self)
                    self.running = Event()
                    self.stop = Event()

                def run(self):
                    self.running.set()
                    self.stop.wait()

            waiter = Waiter()
            waiter.start()
            waiter.running.wait()
            dump()
            waiter.stop.set()
            waiter.join()
            r   r      
   a  
            ^Thread 0x[0-9a-f]+ \(most recent call first\):
            (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
            ){{1,3}}  File "<string>", line 23 in run
              File ".*threading.py", line [0-9]+ in _bootstrap_inner
              File ".*threading.py", line [0-9]+ in _bootstrap

            Current thread 0x[0-9a-f]+ \(most recent call first\):
              File "<string>", line {lineno} in dump
              File "<string>", line 28 in <module>$
            )r>   r   )rB   rh   r9   rC   r   r%   rD   r*   )r1   r   r2   r5   r7   r>   r   r   r   r   check_dump_traceback_threads  s     
z.FaultHandlerTests.check_dump_traceback_threadsc                 C   s   |  d  d S r   )r   rQ   r   r   r   test_dump_traceback_threads  s    z-FaultHandlerTests.test_dump_traceback_threadsc                 C   s4   t  }| | W d    n1 s&0    Y  d S r   )r   r   ri   r   r   r    test_dump_traceback_threads_file  s    z2FaultHandlerTests.test_dump_traceback_threads_filer	   c                C   s   t tjtd}d}|jt|||||d}| ||\}}	d|}|s~|}
|rX|
d9 }
d| }tdd||
d	}| || n| 	|d
 | 	|	d d S )N)Zsecondsa  
            import faulthandler
            import time
            import sys

            timeout = {timeout}
            repeat = {repeat}
            cancel = {cancel}
            loops = {loops}
            filename = {filename!r}
            fd = {fd}

            def func(timeout, repeat, cancel, file, loops):
                for loop in range(loops):
                    faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
                    if cancel:
                        faulthandler.cancel_dump_traceback_later()
                    time.sleep(timeout * 5)
                    faulthandler.cancel_dump_traceback_later()

            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
            else:
                file = None
            func(timeout, repeat, cancel, file, loops)
            if filename:
                file.close()
            )timeoutrepeatcancelloopsr   r3   r   r_   zATimeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n      )r   r   r   )
strdatetimeZ	timedeltaTIMEOUTrB   r9   rC   r   rD   r*   )r1   r   r   r   r   r3   Ztimeout_strr2   r   r7   countr   r   r   r   r   check_dump_traceback_later  s*    	
z,FaultHandlerTests.check_dump_traceback_laterc                 C   s   |    d S r   r   rQ   r   r   r   test_dump_traceback_later\  s    z+FaultHandlerTests.test_dump_traceback_laterc                 C   s   | j dd d S )NT)r   r   rQ   r   r   r    test_dump_traceback_later_repeat_  s    z2FaultHandlerTests.test_dump_traceback_later_repeatc                 C   s   | j dd d S )NT)r   r   rQ   r   r   r    test_dump_traceback_later_cancelb  s    z2FaultHandlerTests.test_dump_traceback_later_cancelc                 C   s6   t  }| j|d W d    n1 s(0    Y  d S r   )r   r   ri   r   r   r   test_dump_traceback_later_filee  s    z0FaultHandlerTests.test_dump_traceback_later_filec                 C   s>   t d }| j| d W d    n1 s00    Y  d S r   )r   rm   r   rn   r   r   r   r   test_dump_traceback_later_fdi  s    z.FaultHandlerTests.test_dump_traceback_later_fdc                 C   s   | j dd d S )Nr_   )r   r   rQ   r   r   r   test_dump_traceback_later_twiceo  s    z1FaultHandlerTests.test_dump_traceback_later_twiceregisterzneed faulthandler.registerc                 C   s   t j}d}|j||||||d}| ||\}}	d|}|sf|rHd}
nd}
tdd|
}
| ||
 n| |d |r| |	d	 n| |	d	 d S )
Nax  
            import faulthandler
            import os
            import signal
            import sys

            all_threads = {all_threads}
            signum = {signum}
            unregister = {unregister}
            chain = {chain}
            filename = {filename!r}
            fd = {fd}

            def func(signum):
                os.kill(os.getpid(), signum)

            def handler(signum, frame):
                handler.called = True
            handler.called = False

            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
            else:
                file = None
            if chain:
                signal.signal(signum, handler)
            faulthandler.register(signum, file=file,
                                  all_threads=all_threads, chain={chain})
            if unregister:
                faulthandler.unregister(signum)
            func(signum)
            if chain and not handler.called:
                if file is not None:
                    output = file
                else:
                    output = sys.stderr
                print("Error: signal handler not called!", file=output)
                exitcode = 1
            else:
                exitcode = 0
            if filename:
                file.close()
            sys.exit(exitcode)
            )r:   signum
unregisterchainr   r3   r   z8Current thread 0x[0-9a-f]+ \(most recent call first\):\nz#Stack \(most recent call first\):\nr       r   r   )	signalSIGUSR1rB   r9   rC   r   rD   r*   rE   )r1   r   r:   r   r   r3   r   r2   r   r7   r   r   r   r   check_registerr  s,    .
z FaultHandlerTests.check_registerc                 C   s   |    d S r   r   rQ   r   r   r   test_register  s    zFaultHandlerTests.test_registerc                 C   s   | j dd d S )NT)r   r   rQ   r   r   r   test_unregister  s    z!FaultHandlerTests.test_unregisterc                 C   s6   t  }| j|d W d    n1 s(0    Y  d S r   )r   r   ri   r   r   r   test_register_file  s    z$FaultHandlerTests.test_register_filec                 C   s>   t d }| j| d W d    n1 s00    Y  d S r   )r   rm   r   rn   r   r   r   r   test_register_fd  s    z"FaultHandlerTests.test_register_fdc                 C   s   | j dd d S )NTrp   r   rQ   r   r   r   test_register_threads  s    z'FaultHandlerTests.test_register_threadsc                 C   s   | j dd d S )NT)r   r   rQ   r   r   r   test_register_chain  s    z%FaultHandlerTests.test_register_chainc                 c   sf   t j}zRd t _| t}d V  W d    n1 s40    Y  | t|jd W |t _n|t _0 d S )Nzsys.stderr is None)rt   r6   ZassertRaisesRuntimeErrorr*   r   	exception)r1   r6   cmr   r   r   check_stderr_none  s    $z#FaultHandlerTests.check_stderr_nonec                 C   s   |    t  W d    n1 s&0    Y  |    t  W d    n1 sV0    Y  |    td W d    n1 s0    Y  ttdr|    ttj W d    n1 s0    Y  d S )NgMbP?r   )	r   rv   rx   Zdump_tracebackZdump_traceback_laterhasattrr   r   r   rQ   r   r   r   test_stderr_None  s    
&
&
(

z"FaultHandlerTests.test_stderr_Nonezspecific to Windowsc                 C   s(   dD ]\}}|  d| dd| qd S )N))ZEXCEPTION_ACCESS_VIOLATIONrO   )ZEXCEPTION_INT_DIVIDE_BY_ZEROzint divide by zero)ZEXCEPTION_STACK_OVERFLOWzstack overflowz
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(faulthandler._)
                rN   )rM   )r1   excnamer   r   r   test_raise_exception  s    z&FaultHandlerTests.test_raise_exceptionc                 C   sH   dD ]>}d| d}t |}| |\}}| |g  | || qd S )N)l   cs@ l   RC@ z
                    import faulthandler
                    faulthandler.enable()
                    faulthandler._raise_exception(z)
                    r   r9   r*   )r1   Zexc_coder2   r5   r7   r   r   r   test_ignore_exception  s    z'FaultHandlerTests.test_ignore_exceptionc                 C   sF   dD ]<}|  d|dd\}}| |g  | |||d@ f qd S )N)r   ixV4i   @i  @i   piz{
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(0xr   r   i)r9   r*   ZassertIn)r1   r   r5   r7   r   r   r   test_raise_nonfatal_exception  s    
z/FaultHandlerTests.test_raise_nonfatal_exceptionc                 C   s2   t d}| |\}}| |g  | |d d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            code = faulthandler._EXCEPTION_ACCESS_VIOLATION
            faulthandler._raise_exception(code)
        l       r   r1   r2   r5   r7   r   r   r    test_disable_windows_exc_handler.  s    z2FaultHandlerTests.test_disable_windows_exc_handlerc                 C   s2   t d}| |\}}| |g  | |d d S )Nz`
            import faulthandler
            faulthandler.cancel_dump_traceback_later()
        r   r   r   r   r   r   .test_cancel_later_without_dump_traceback_later;  s    z@FaultHandlerTests.test_cancel_later_without_dump_traceback_later)NN)N)FFr	   )FFFFN)F__name__
__module____qualname__r9   rG   rL   rM   r   r   rt   platform
startswithrR   r   rU   rV   rW   rY   	_testcapiZ
skipUnlessr   r   r\   r^   rc   rd   rv   re   rf   UB_SANITIZERMEMORY_SANITIZERrj   ro   rq   rs   rz   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rP   r   r   r   r   r   r   r   r   r   r   8   s   
"$


		
	



	


.
;>
  Q










r   __main__)r	   )#
contextlibr   r   rv   r-   r   r   rt   Z	sysconfigr   r   Ztest.supportr   r   r   r   textwrapr   r   ImportErrorr   r   rP   Zget_config_varZ_cflagsZ_config_argsr   r   r   r   r   ZTestCaser   r   mainr   r   r   r   <module>   sN   


	
      
