
    AҐiDD                         d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZ ddlmZmZmZmZmZ  G d	 d
      Zy)a	  
Dirty Worker Process

Asyncio-based worker that loads dirty apps and handles requests
from the DirtyArbiter.

Threading Model
---------------
Each dirty worker runs an asyncio event loop in the main thread for:
- Handling connections from the arbiter
- Managing heartbeat updates
- Coordinating task execution

Actual app execution runs in a ThreadPoolExecutor (separate threads):
- The number of threads is controlled by ``dirty_threads`` config (default: 1)
- Each thread can execute one app action at a time
- The asyncio event loop is NOT blocked by task execution

State and Global Objects
------------------------
Apps can maintain persistent state because:

1. Apps are loaded ONCE when the worker starts (in ``load_apps()``)
2. The same app instances are reused for ALL requests
3. App state (instance variables, loaded models, etc.) persists

Example::

    class MLApp(DirtyApp):
        def init(self):
            self.model = load_heavy_model()  # Loaded once, reused
            self.cache = {}                   # Persistent cache

        def predict(self, data):
            return self.model.predict(data)  # Uses loaded model

Thread Safety:
- With ``dirty_threads=1`` (default): No concurrent access, thread-safe by design
- With ``dirty_threads > 1``: Multiple threads share the same app instances,
  apps MUST be thread-safe (use locks, thread-local storage, etc.)

Heartbeat and Liveness
----------------------
The worker sends heartbeat updates to prove it's alive:

1. A dedicated asyncio task (``_heartbeat_loop``) runs independently
2. It updates the heartbeat file every ``dirty_timeout / 2`` seconds
3. Since tasks run in executor threads, they do NOT block heartbeats
4. The arbiter kills workers that miss heartbeat updates

Timeout Control
---------------
Execution timeout is enforced at two levels:

1. **Worker level**: Each task execution has a timeout (``dirty_timeout``).
   If exceeded, the worker returns a timeout error but the thread may
   continue running (Python threads cannot be cancelled).

2. **Arbiter level**: The arbiter also enforces timeout when waiting
   for worker response. Workers that don't respond are killed via SIGABRT.

Note: Since Python threads cannot be forcibly cancelled, a truly stuck
operation will continue until the worker is killed by the arbiter.
    N)util)	WorkerTmp   )load_dirty_apps)DirtyAppErrorDirtyAppNotFoundErrorDirtyTimeoutErrorDirtyWorkerError)DirtyProtocolmake_responsemake_error_responsemake_chunk_messagemake_end_messagec                       e Zd ZdZdj	                         D  cg c]  }t        t        d|z         c}}}} Zd Zd Z	d Z
d Zd Zd	 Zd
 Zd Zd Zd Zd Zd Zd Zd Zd Zd Zd Zyc c}}}} w )DirtyWorkerz
    Dirty worker process that loads dirty apps and handles requests.

    Each worker runs its own asyncio event loop and listens on a
    worker-specific Unix socket for requests from the DirtyArbiter.
    zABRT HUP QUIT INT TERM USR1zSIG%sc                     || _         d| _        || _        || _        || _        || _        || _        d| _        d| _        d| _	        t        |      | _        i | _        d| _        d| _        d| _        y)a?  
        Initialize a dirty worker.

        Args:
            age: Worker age (for identifying workers)
            ppid: Parent process ID
            app_paths: List of dirty app import paths
            cfg: Gunicorn config
            log: Logger
            socket_path: Path to this worker's Unix socket
        z	[booting]FTN)agepidppid	app_pathscfglogsocket_pathbootedabortedaliver   tmpapps_server_loop	_executor)selfr   r   r   r   r   r   s          N/var/www/descvideos/venv/lib/python3.12/site-packages/gunicorn/dirty/worker.py__init__zDirtyWorker.__init__k   sr     	"&
S>	
    c                 "    d| j                    dS )Nz<DirtyWorker >)r   r"   s    r#   __str__zDirtyWorker.__str__   s    txxj**r%   c                 8    | j                   j                          y)zUpdate heartbeat timestamp.N)r   notifyr(   s    r#   r+   zDirtyWorker.notify   s    r%   c                    | j                   j                  r?| j                   j                  j                         D ]  \  }}|t        j                  |<    t        j                  | j                   j                  | j                   j                  | j                   j                         t        j                          t        j                  | j                  j                                | j                  j                          | j                          | j!                          t        j"                         | _        | j                   j'                  |        d| _        | j+                          y)z
        Initialize the worker process after fork.

        This is called in the child process after fork. It sets up
        the environment, loads apps, and starts the main run loop.
        )
initgroupsTN)r   envitemsosenvironr   set_owner_processuidgidr-   seedclose_on_execr   filenor   init_signals	load_appsgetpidr   dirty_worker_initr   run)r"   kvs      r#   init_processzDirtyWorker.init_process   s     88<<**, "1 !

1" 	txx||TXX\\*.((*=*=	? 			 	488??,-  	 	 99;""4( 
r%   c                 :   | j                   D ]&  }t        j                  |t        j                         ( t        j                  t        j                  | j                         t        j                  t        j
                  | j                         t        j                  t        j                  | j                         t        j                  t        j                  | j                         t        j                  t        j                  | j                         y)zSet up signal handlers.N)	SIGNALSsignalSIG_DFLSIGTERM_signal_handlerSIGQUITSIGINTSIGABRTSIGUSR1)r"   sigs     r#   r8   zDirtyWorker.init_signals   s     << 	/CMM#v~~.	/ 	fnnd&:&:;fnnd&:&:;fmmT%9%9: 	fnnd&:&:; 	fnnd&:&:;r%   c                     |t         j                  k(  r| j                  j                          yd| _        | j
                  r&| j
                  j                  | j                         yy)z(Handle signals by setting alive = False.NF)rB   rI   r   reopen_filesr   r    call_soon_threadsafe	_shutdown)r"   rJ   frames      r#   rE   zDirtyWorker._signal_handler   sJ    &.. HH!!#
::JJ++DNN; r%   c                 R    | j                   r| j                   j                          yy)zInitiate async shutdown.N)r   closer(   s    r#   rN   zDirtyWorker._shutdown   s    <<LL  r%   c                    	 t        | j                        | _        | j                  j                         D ]N  \  }}| j                  j                  d|       	 |j                          | j                  j                  d|       P y# t        $ r#}| j                  j                  d||        d}~ww xY w# t        $ r"}| j                  j                  d|        d}~ww xY w)zLoad all configured dirty apps.zLoaded dirty app: %szInitialized dirty app: %sz%Failed to initialize dirty app %s: %sNzFailed to load dirty apps: %s)
r   r   r   r/   r   debuginitinfo	Exceptionerrorr"   pathappes       r#   r9   zDirtyWorker.load_apps   s    	'7DI!YY__. 	c5t<HHJHHMM"=tD	
 ! HHNN#J#',  	HHNN:A>	s<   AB7 ,BB7 	B4B//B44B7 7	C" CC"c                     ddl m} | j                  j                  } ||d| j                   d      | _        | j                  j                  d|       	 t        j                         | _
        t        j                  | j                         | j                  j                  | j                                | j!                          y# t        $ r&}| j                  j                  d|       Y d}~;d}~ww xY w# | j!                          w xY w)	z Run the main asyncio event loop.r   )ThreadPoolExecutorzdirty-worker--)max_workersthread_name_prefixz#Created thread pool with %d threadszWorker error: %sN)concurrent.futuresr]   r   dirty_threadsr   r!   r   rS   asyncionew_event_loopr    set_event_looprun_until_complete
_run_asyncrV   rW   _cleanup)r"   r]   num_threadsr[   s       r#   r<   zDirtyWorker.run   s     	: hh,,+#!.txxj:
 	<kJ	 //1DJ""4::.JJ))$//*;< MMO  	2HHNN-q11	2 MMOs+   A!C	 		C8C3.C; 3C88C; ;Dc                   K   t         j                  j                  | j                        rt        j                  | j                         t        j                  | j                  | j                         d{   | _        t        j                  | j                  d       | j                  j                  d| j                  | j                         t        j                  | j                               }	 | j                  4 d{    | j                  j                          d{    ddd      d{    |j#                          	 | d{    y7 7 T7 47 &# 1 d{  7  sw Y   6xY w# t
        j                   $ r Y Nw xY w7 ;# t
        j                   $ r Y yw xY w# |j#                          	 | d{  7   w # t
        j                   $ r Y w w xY wxY ww)z6Main async loop - start server and handle connections.)rY   Ni  zDirty worker %s listening on %s)r0   rY   existsr   unlinkrc   start_unix_serverhandle_connectionr   chmodr   rU   r   create_task_heartbeat_loopserve_foreverCancelledErrorcancel)r"   heartbeat_tasks     r#   rg   zDirtyWorker._run_async   s     77>>$**+IId&&' %66""!!
 
 	!!5)7hh 0 0	2 !,,T-A-A-CD
	|| 3 3ll002223 3
 !!#$$$-
323 3 3 3%% 		
 %))  !!#$$$)) s  A7G&9E:A=G&8E8 E	E8 E#*E+E#/E8 :E!;E8 ?G&F FF G&E8 E#!E8 #E5)E,*E51E8 8FF, FF, F F)&G&(F))G&,G#>G
GG
	G#
G G#G  G##G&c                    K   | j                   rR| j                          t        j                  | j                  j
                  dz         d{    | j                   rQyy7 w)zPeriodically update heartbeat.g       @N)r   r+   rc   sleepr   dirty_timeoutr(   s    r#   rq   zDirtyWorker._heartbeat_loop  sC     jjKKM-- 6 6 <=== jj=s   AA$A"A$ A$c                 t  K   | j                   j                  d       	 | j                  rE	 t        j                  |       d{   }| j                  ||       d{    | j                  rE|j                          	 |j                          d{    y7 U# t
        j                  $ r Y Aw xY w7 V# t        $ r&}| j                   j                  d|       Y d}~qd}~ww xY w7 T# t        $ r Y yw xY w# |j                          	 |j                          d{  7   w # t        $ r Y w w xY wxY ww)zl
        Handle a connection from the arbiter.

        Each connection can send multiple requests.
        zNew connection from arbiterNzConnection error: %s)r   rS   r   r   read_message_asyncrc   IncompleteReadErrorhandle_requestrV   rW   rQ   wait_closed)r"   readerwritermessager[   s        r#   rn   zDirtyWorker.handle_connection   s     	45	**$1$D$DV$LLG ))'6::: ** LLN((*** M22 
 ; 	6HHNN1155	6
 +  LLN((*** s   D8B7 B BB 	B7 B5B7 0D8C+ C)C+ D8B B2/B7 1B22B7 7	C& C!C: !C&&C: )C+ +	C74D86C77D8:D5D&D" D&%D5&	D2/D51D22D55D8c           
      D  K   |j                  dt        t        j                                     }|j                  d      }|t        j
                  k7  r7t        |t        d|             }t	        j                  ||       d{    y|j                  d      }|j                  d      }|j                  dg       }|j                  di       }	| j                          	 | j                  ||||	       d{   }
t        j                  |
      r| j                  ||
|       d{    yt        j                  |
      r| j                  ||
|       d{    yt!        ||
      }t	        j                  ||       d{    y7 7 7 f7 77 # t"        $ r~}t%        j&                         }| j(                  j+                  d	||||       t        |t-        t        |      |||
            }t	        j                  ||       d{  7   Y d}~yd}~ww xY ww)ai  
        Handle a single request message.

        Supports both regular (non-streaming) and streaming responses.
        For streaming, detects if the result is a generator and sends
        chunk messages followed by an end message.

        Args:
            message: Request dict from protocol
            writer: StreamWriter for sending responses
        idtypezUnknown message type: Napp_pathactionargskwargszError executing %s.%s: %s
%s)r   r   	traceback)getstruuiduuid4r   MSG_TYPE_REQUESTr   r
   write_message_asyncr+   executeinspectisgenerator_stream_sync_generator
isasyncgen_stream_async_generatorr   rV   r   
format_excr   rW   r   )r"   r   r   
request_idmsg_typeresponser   r   r   r   resultr[   tbs                r#   r|   zDirtyWorker.handle_request;  s     [[s4::<'89
;;v&}555* #9(!DEH  33FHEEE;;z*X&{{62&Xr* 		F<<&$GGF ""6*11*ffMMM##F+22:vvNNN )V<#77III- F H NN J 		F%%'BHHNN:#VQ4*c!fx(*,H
  33FHEEE		Fs   BH FAH "F 9F:/F )F*F .H /+F FF H  %F FF 
H F F F F 	HA.HHHH HH c           	        
K   t               

fd}	 t        j                         }	 |j                  | j                  |       d{   }|
u rn9t        j                  |t        ||             d{    | j                          bt        j                  |t        |             d{    j%                          y7 z7 O7 # t        $ rz}t        j                         }| j                  j                  d||       t        |t!        t#        |      |            }	t        j                  ||	       d{  7   Y d}~d}~ww xY w# j%                          w xY ww)z
        Stream chunks from a synchronous generator.

        Args:
            request_id: Request ID for the messages
            gen: Sync generator to iterate
            writer: StreamWriter for sending messages
        c                  >    	 t              S # t        $ r  cY S w xY wN)nextStopIteration)
_EXHAUSTEDgens   r#   	_get_nextz5DirtyWorker._stream_sync_generator.<locals>._get_next~  s'    "Cy   "!!"s   
 NError during streaming: %s
%sr   )objectrc   get_running_looprun_in_executorr!   r   r   r   r+   r   rV   r   r   r   rW   r   r   r   rQ   )r"   r   r   r   r   loopchunkr[   r   r   r   s     `       @r#   r   z"DirtyWorker._stream_sync_generatorq  s;     X
	"	++-D"224>>9MMJ&#77.z5A      33(4   IIK/ N  	F%%'BHHNN;QC*c!f3H  33FHEEE	F IIKs}   E&4C 
C,C 7C87C /C	0C 4E&C C 	C 	EA*E	>E?E	E 	EE E##E&c           	      l  K   	 |2 3 d{   }t        j                  |t        ||             d{    | j                          B7 =7 6 t        j                  |t	        |             d{  7   n# t
        $ rz}t        j                         }| j                  j                  d||       t        |t        t        |      |            }t        j                  ||       d{  7   Y d}~nd}~ww xY w|j                          d{  7   y# |j                          d{  7   w xY ww)z
        Stream chunks from an asynchronous generator.

        Args:
            request_id: Request ID for the messages
            gen: Async generator to iterate
            writer: StreamWriter for sending messages
        Nr   r   )r   r   r   r+   r   rV   r   r   r   rW   r   r   r   aclose)r"   r   r   r   r   r[   r   r   s           r#   r   z#DirtyWorker._stream_async_generator  s
    	"  e#77.z5A     #  33(4    	F%%'BHHNN;QC*c!f3H  33FHEEE	F **,#**,s   D4A6 AAA$A6 A
A6 A
A6 #A6 /A20A6 5D 6	C9?A*C4)C,*C4/D 4C99D <D4DD4D1*D-+D11D4c           	        K   || j                   vrt        |      | j                   |   | j                  j                  dkD  r| j                  j                  nd}t	        j
                         }	 t	        j                  |j                  | j                  fd      |       d{   }|S 7 # t        j                  $ r3 | j                  j                  d||       t        d| d d|      w xY ww)	a  
        Execute an action on a dirty app.

        The action runs in a thread pool executor to avoid blocking the
        asyncio event loop. Execution timeout is enforced using
        ``dirty_timeout`` config.

        Args:
            app_path: Import path of the dirty app
            action: Action name to execute
            args: Positional arguments
            kwargs: Keyword arguments

        Returns:
            Result from the app action

        Raises:
            DirtyAppNotFoundError: If app is not loaded
            DirtyTimeoutError: If execution exceeds timeout
            DirtyAppError: If execution fails
        r   Nc                        gi S r    )r   rZ   r   r   s   r#   <lambda>z%DirtyWorker.execute.<locals>.<lambda>  s    C888 r%   )timeoutz%Execution timeout for %s.%s after %dszExecution of .z
 timed out)r   r   r   rx   rc   r   wait_forr   r!   TimeoutErrorr   warningr	   )	r"   r   r   r   r   r   r   r   rZ   s	     ```   @r#   r   zDirtyWorker.execute  s     , 499$'11ii!,0HH,B,BQ,F$((((D '')	"++$$NN8   F M ## 		HH7&' $z6(*= 		s1   A.D5:B8 /B60B8 5D6B8 8AC>>Dc                    | j                   r$| j                   j                  dd       d| _         | j                  j                         D ]2  \  }}	 |j	                          | j
                  j                  d|       4 	 | j                  j	                          	 t        j                  j                  | j                        rt        j                  | j                         | j
                  j                  d| j                          y# t        $ r'}| j
                  j                  d||       Y d}~d}~ww xY w# t        $ r Y w xY w# t        $ r Y tw xY w)zClean up resources on shutdown.FT)waitcancel_futuresNzClosed dirty app: %szError closing dirty app %s: %szDirty worker %s exiting)r!   shutdownr   r/   rQ   r   rS   rV   rW   r   r0   rY   rk   r   rl   rU   r   rX   s       r#   rh   zDirtyWorker._cleanup  s    >>NN##t#D!DN * 	JID#J		5t<	J	HHNN
	ww~~d../		$**+ 	/:!  J?qIIJ  		  		s=   ,DD> AE 	D;D66D;>	E
	E
	EEN)__name__
__module____qualname____doc__splitgetattrrB   rA   r$   r)   r+   r?   r8   rE   rN   r9   r<   rg   rq   rn   r|   r   r   r   rh   ).0xr   rB   s   0000r#   r   r   `   s     -2246 6wvw{+ 6G8+"H<"<!
",B>64Fl.`B2h;]6s   A0
r   )r   rc   r   r0   rB   r   r   gunicornr   gunicorn.workers.workertmpr   rZ   r   errorsr   r   r	   r
   protocolr   r   r   r   r   r   r   r%   r#   <module>r      sH   
?B   	     0    r; r;r%   