
    AҐi^                     D   U d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	m
Z
mZ ddlmZmZ  G d d      Z G d d	      Z G d
 d      Z ej$                         Z ej(                  d      Zej(                  e   ed<   dad Zd ZddefdZddefdZd Zd Zy)zu
Dirty Client

Client for HTTP workers to communicate with the dirty worker pool.
Provides both sync and async APIs.
    N   )DirtyConnectionError
DirtyErrorDirtyTimeoutError)DirtyProtocolmake_requestc                   x    e Zd ZdZddZd Zd Zd Zd Zd Z	d Z
d	 Zd
 Zd Zd Zd Zd Zd Zd Zd Zd Zy)DirtyClientz
    Client for calling dirty workers from HTTP workers.

    Provides both sync and async APIs. The sync API is for traditional
    sync workers (sync, gthread), while the async API is for async
    workers (asgi, gevent, eventlet).
    c                 |    || _         || _        d| _        d| _        d| _        t        j                         | _        y)z
        Initialize the dirty client.

        Args:
            socket_path: Path to the dirty arbiter's Unix socket
            timeout: Default timeout for operations in seconds
        N)socket_pathtimeout_sock_reader_writer	threadingLock_lock)selfr   r   s      N/var/www/descvideos/venv/lib/python3.12/site-packages/gunicorn/dirty/client.py__init__zDirtyClient.__init__(   s6     '
^^%
    c                    | j                   y	 t        j                  t        j                  t        j                        | _         | j                   j	                  | j
                         | j                   j                  | j                         y# t        j                  t        f$ r'}d| _         t        d| | j                        |d}~ww xY w)z
        Establish sync socket connection to arbiter.

        Raises:
            DirtyConnectionError: If connection fails
        N$Failed to connect to dirty arbiter: r   )r   socketAF_UNIXSOCK_STREAM
settimeoutr   connectr   errorOSErrorr   r   es     r   r   zDirtyClient.connect;   s     ::!		v~~v7I7IJDJJJ!!$,,/JJt//0g& 	DJ&6qc: ,, 	s   BB C*"CCc                 n    | j                   5  | j                  ||||      cddd       S # 1 sw Y   yxY w)a  
        Execute an action on a dirty app (sync/blocking).

        Args:
            app_path: Import path of the dirty app (e.g., 'myapp.ml:MLApp')
            action: Action to call on the app
            *args: Positional arguments
            **kwargs: Keyword arguments

        Returns:
            Result from the dirty app action

        Raises:
            DirtyConnectionError: If connection fails
            DirtyTimeoutError: If operation times out
            DirtyError: If execution fails
        N)r   _execute_lockedr   app_pathactionargskwargss        r   executezDirtyClient.executeP   s8    $ ZZ 	H''&$G	H 	H 	Hs   +4c                 *   | j                   | j                          t        t        j                               }t        |||||      }	 t        j                  | j                   |       t        j                  | j                         }| j                  |      S # t        j                  $ r( | j                          t        d| j                        t        $ r5}| j                          t        |t               r t#        d|       |d}~ww xY w)zExecute while holding the lock.N
request_idr'   r(   r)   r*   &Timeout waiting for dirty app responser   Communication error: )r   r   struuiduuid4r   r   write_messageread_message_handle_responser   r   _close_socketr   	Exception
isinstancer   r   	r   r'   r(   r)   r*   r.   requestresponser#   s	            r   r%   zDirtyClient._execute_lockede   s     ::LLN &
!
	K''

G< %11$**=H ((22~~ 	 #8   	K !Z(&)>qc'BCJ		Ks   AB AD0DDc                      t        | ||||      S )a*  
        Stream results from a dirty app action (sync).

        This method returns an iterator that yields chunks from a streaming
        response. Use this for actions that return generators.

        Args:
            app_path: Import path of the dirty app (e.g., 'myapp.ml:MLApp')
            action: Action to call on the app
            *args: Positional arguments
            **kwargs: Keyword arguments

        Yields:
            Chunks of data from the streaming response

        Raises:
            DirtyConnectionError: If connection fails
            DirtyTimeoutError: If operation times out
            DirtyError: If execution fails

        Example::

            for chunk in client.stream("myapp.llm:LLMApp", "generate", prompt):
                print(chunk, end="", flush=True)
        )DirtyStreamIteratorr&   s        r   streamzDirtyClient.stream   s    4 #464HHr   c                     |j                  d      }|t        j                  k(  r|j                  d      S |t        j                  k(  r)|j                  di       }t	        j
                  |      }|t	        d|       )z<Handle response message, extracting result or raising error.typeresultr    zUnknown response type: )getr   MSG_TYPE_RESPONSEMSG_TYPE_ERRORr   	from_dict)r   r=   msg_type
error_infor    s        r   r7   zDirtyClient._handle_response   ss    <<'}666<<))555!gr2J((4EK6xjABBr   c                     | j                   #	 | j                   j                          d| _         yy# t        $ r Y w xY w)zClose the socket connection.N)r   closer9   r   s    r   r8   zDirtyClient._close_socket   sC    ::!

  " DJ "  s   1 	==c                 f    | j                   5  | j                          ddd       y# 1 sw Y   yxY w)zClose the sync connection.N)r   r8   rL   s    r   rK   zDirtyClient.close   s*    ZZ 	! 	! 	! 	!s   '0c                   K   | j                   y	 t        j                  t        j                  | j                        | j
                         d{   \  | _        | _         y7 # t        j                  $ r t        d| j
                        t        t        f$ r }t        d| | j                        |d}~ww xY ww)z
        Establish async connection to arbiter.

        Raises:
            DirtyConnectionError: If connection fails
        Nr0   z#Timeout connecting to dirty arbiterr   r   )r   asynciowait_foropen_unix_connectionr   r   r   TimeoutErrorr   r!   ConnectionErrorr   r"   s     r   connect_asynczDirtyClient.connect_async   s      <<#	/6/?/?,,T-=-=>0 *&DL$, * ## 	#5  ) 	&6qc: ,, 	s;   CAA' A%A' $C%A' '8B?B::B??Cc                   K   | j                   | j                          d{    t        t        j                               }t        |||||      }	 t        j                  | j                   |       d{    t        j                  t        j                  | j                        | j                         d{   }| j                  |      S 7 7 ]7 # t        j                  $ r1 | j                          d{  7   t!        d| j                        t"        $ r>}| j                          d{  7   t%        |t&              r t)        d|       |d}~ww xY ww)a  
        Execute an action on a dirty app (async/non-blocking).

        Args:
            app_path: Import path of the dirty app
            action: Action to call on the app
            *args: Positional arguments
            **kwargs: Keyword arguments

        Returns:
            Result from the dirty app action

        Raises:
            DirtyConnectionError: If connection fails
            DirtyTimeoutError: If operation times out
            DirtyError: If execution fails
        Nr-   r0   r/   r1   )r   rT   r2   r3   r4   r   r   write_message_asyncrO   rP   read_message_asyncr   r   r7   rR   _close_asyncr   r9   r:   r   r   r;   s	            r   execute_asynczDirtyClient.execute_async   sD    & <<$$&&& &
!
	K33DLL'JJJ %--00> H ((22/ ' K ## 	##%%%#8   	K##%%%!Z(&)>qc'BCJ		Ksp    E&C1E&#C 8C9AC >C?C E&C C &E# D$E#%E8D;9%EE##E&c                      t        | ||||      S )a8  
        Stream results from a dirty app action (async).

        This method returns an async iterator that yields chunks from a
        streaming response. Use this for actions that return generators.

        Args:
            app_path: Import path of the dirty app (e.g., 'myapp.ml:MLApp')
            action: Action to call on the app
            *args: Positional arguments
            **kwargs: Keyword arguments

        Yields:
            Chunks of data from the streaming response

        Raises:
            DirtyConnectionError: If connection fails
            DirtyTimeoutError: If operation times out
            DirtyError: If execution fails

        Example::

            async for chunk in client.stream_async("myapp.llm:LLMApp", "generate", prompt):
                await response.write(chunk)
        )DirtyAsyncStreamIteratorr&   s        r   stream_asynczDirtyClient.stream_async  s    4 (hfMMr   c                    K   | j                   L	 | j                   j                          | j                   j                          d{    d| _         d| _        yy7 # t        $ r Y w xY wwzClose the async connection.N)r   rK   wait_closedr9   r   rL   s    r   rX   zDirtyClient._close_async3  sf     <<#""$ll..000  DLDL $ 1 s:   A-7A AA A-A 	A*'A-)A**A-c                 @   K   | j                          d{    y7 wr^   )rX   rL   s    r   close_asynczDirtyClient.close_async>  s     !!!   c                 &    | j                          | S N)r   rL   s    r   	__enter__zDirtyClient.__enter__F  s    r   c                 $    | j                          y rd   )rK   r   exc_typeexc_valexc_tbs       r   __exit__zDirtyClient.__exit__J  s    

r   c                 B   K   | j                          d {    | S 7 wrd   )rT   rL   s    r   
__aenter__zDirtyClient.__aenter__M  s"       """ 	#s   c                 @   K   | j                          d {    y 7 wrd   )ra   rg   s       r   	__aexit__zDirtyClient.__aexit__Q  s        rb   N      >@)__name__
__module____qualname____doc__r   r   r+   r%   r@   r7   r8   rK   rT   rY   r\   rX   ra   re   rk   rm   ro    r   r   r
   r
      sd    &&*H*#KJI8C!46KpN8	 "!r   r
   c                   :    e Zd ZdZdZdZ	 d
dZd Zd Zd Z	d	 Z
y)r?   a  
    Iterator for streaming responses from dirty workers (sync).

    This class is returned by `DirtyClient.stream()` and yields chunks
    from a streaming response until the end message is received.

    Uses a deadline-based timeout approach:
    - Total stream timeout: limits entire stream duration
    - Idle timeout: limits gap between chunks (defaults to total timeout)
    rq         @Nc                     || _         || _        || _        || _        || _        d| _        d| _        d | _        d | _        d | _	        ||| _        y t        | j                  |j                        | _        y NFclientr'   r(   r)   r*   _started
_exhausted_request_id	_deadline_last_chunk_timeminDEFAULT_IDLE_TIMEOUTr   _idle_timeoutr   r|   r'   r(   r)   r*   idle_timeouts          r   r   zDirtyStreamIterator.__init__m  x     	 $ )4L 	T..? 	r   c                     | S rd   rv   rL   s    r   __iter__zDirtyStreamIterator.__iter__      r   c                     | j                   rt        | j                  s| j                          d| _        | j	                         S NT)r~   StopIterationr}   _start_request_read_next_chunkrL   s    r   __next__zDirtyStreamIterator.__next__  s8    ??}}! DM$$&&r   c                 H   | j                   j                  5  | j                   j                  | j                   j                          t	        j
                         }|| j                   j                  z   | _        || _        t        t        j                               | _        t        | j                  | j                  | j                  | j                   | j"                        }t%        j&                  | j                   j                  |       ddd       y# 1 sw Y   yxY wz(Send the initial request to the arbiter.N)r)   r*   )r|   r   r   r   time	monotonicr   r   r   r2   r3   r4   r   r   r'   r(   r)   r*   r   r5   r   nowr<   s      r   r   z"DirtyStreamIterator._start_request  s    [[ 	D{{  (##% .."C 4;;#6#66DN$'D!"4::<0D"  YY{{G ''(9(97C#	D 	D 	Ds   C8DD!c                 x   | j                   j                  5  t        j                         }|| j                  k\  r(d| _        t        d| j                   j                        | j                  |z
  }|| j                  kD  r| j                  }nt        || j                        }	 | j                   j                  j                  |       t        j                  | j                   j                        }t        j                         | _        |j)                  d	      }|t        j*                  k(  r|j)                  d
      cddd       S |t        j,                  k(  rd| _        t.        |t        j0                  k(  r.d| _        |j)                  di       }t3        j4                  |      |t        j6                  k(  rd| _        t.        d| _        t3        d|       # t        j                  $ r~ t        j                         }|| j                  k\  r(d| _        t        d| j                   j                        || j                   z
  }d| _        t        d|dd| j                        t"        $ r5}d| _        | j                   j%                          t'        d|       |d}~ww xY w# 1 sw Y   yxY w)&Read the next message from the stream.TStream exceeded total timeoutr0   %Timeout waiting for next chunk (idle .1fs)r1   NrB   datar    Unknown message type: )r|   r   r   r   r   r~   r   r   _TIMEOUT_THRESHOLDr   r   r   r   r   r6   r   r   r9   r8   r   rD   MSG_TYPE_CHUNKMSG_TYPE_ENDr   rF   r   rG   rE   )	r   r   	remainingread_timeoutr=   idle_durationr#   rH   rI   s	            r   r   z$DirtyStreamIterator._read_next_chunk  sy   [[ F	B.."Cdnn$"&'3 KK// 
 ,I 4222#66"9d.@.@AO!!,,\:(55dkk6G6GH, %)NN$4D!||F+H =777||F+cF	B F	Bh =555"&## =777"&%\\'26
 **:66 =:::"&## #DO5hZ@AAa >> nn&$..(&*DO+7 $ 3 3  !$d&;&; ;"&';M#;NbQ ..   O"&))+*-B1#+FGQNOKF	B F	Bs:   BJ0%AG 3AJ0
BJ0 BJ-80J((J--J00J9rd   )rr   rs   rt   ru   r   r   r   r   r   r   r   rv   r   r   r?   r?   Z  s8    	    #
$'D*HBr   r?   c                   :    e Zd ZdZdZ	 d
dZd Zd Zd ZdZ	d	 Z
y)r[   a  
    Async iterator for streaming responses from dirty workers.

    This class is returned by `DirtyClient.stream_async()` and yields chunks
    from a streaming response until the end message is received.

    Uses a deadline-based timeout approach for efficiency:
    - Total stream timeout: limits entire stream duration
    - Idle timeout: limits gap between chunks (defaults to total timeout)

    This avoids the overhead of asyncio.wait_for() on every chunk read.
    rq   Nc                     || _         || _        || _        || _        || _        d| _        d| _        d | _        d | _        d | _	        ||| _        y t        | j                  |j                        | _        y rz   r{   r   s          r   r   z!DirtyAsyncStreamIterator.__init__  r   r   c                     | S rd   rv   rL   s    r   	__aiter__z"DirtyAsyncStreamIterator.__aiter__  r   r   c                    K   | j                   rt        | j                  s| j                          d {    d| _        | j	                          d {   S 7 #7 wr   )r~   StopAsyncIterationr}   r   r   rL   s    r   	__anext__z"DirtyAsyncStreamIterator.__anext__  sP     ??$$}}%%''' DM**,,, ( -s!   2AAAAAAc                 "  K   | j                   j                  "| j                   j                          d{    t        j                         }|| j                   j
                  z   | _        || _        t        t        j                               | _        t        | j                  | j                  | j                  | j                  | j                         }t#        j$                  | j                   j                  |       d{    y7 7 wr   )r|   r   rT   r   r   r   r   r   r2   r3   r4   r   r   r'   r(   r)   r*   r   rV   r   s      r   r   z'DirtyAsyncStreamIterator._start_request  s     ;;&++++--- nnt{{222 #tzz|,MMKK;;
 //0C0CWMMM . 	Ns"   4DDCDDDDrx   c                 p  K   t        j                         }|| j                  k\  r(d| _        t	        d| j
                  j                        | j                  |z
  }	 || j                  kD  r2t        j                  | j
                  j                         d{   }n\t        || j                        }t        j                  t        j                  | j
                  j                        |       d{   }t        j                         | _        |j)                  d	      }|t        j*                  k(  r|j)                  d
      S |t        j,                  k(  rd| _        t.        |t        j0                  k(  r.d| _        |j)                  di       }t3        j4                  |      |t        j6                  k(  rd| _        t.        d| _        t3        d|       7 F7 # t        j                  $ rw d| _        t        j                         }|| j                  k\  r!t	        d| j
                  j                        || j                   z
  }t	        d|dd| j                        t"        $ r>}d| _        | j
                  j%                          d{  7   t'        d|       |d}~ww xY ww)r   Tr   r0   Nr   r   r   r1   rB   r   r    r   )r   r   r   r~   r   r|   r   r   r   rW   r   r   r   rO   rP   rR   r   r9   rX   r   rD   r   r   r   rF   r   rG   rE   )	r   r   r   r=   r   r   r#   rH   rI   s	            r   r   z)DirtyAsyncStreamIterator._read_next_chunk4  sr     nn $.. "DO#/++ 
 NNS(		K 4222!.!A!AKK''" 
  #9d.@.@A!(!1!1!44T[[5H5HI(" . !% 0<<' }333<<'' }111"DO$$ }333"DO!gr2J&&z22 }666"DO$$ 1(<==o ## 	"DO.."Cdnn$'3 KK//   $"7"77M#7c7J"M**   	K"DO++**,,,&)>qc'BCJ	Ks]   AJ6;G$ GAG$ 6G"7G$ ;C$J6G$ "G$ $BJ35$J.JJ..J33J6rd   )rr   rs   rt   ru   r   r   r   r   r   r   r   rv   r   r   r[   r[     s7       #
$-N, J>r   r[   dirty_client_async_client_varc                 $    | a ddlm}  ||        y)z@Set the global dirty socket path (called during initialization).r   )set_stash_socket_pathN)_dirty_socket_pathstashr   )pathr   s     r   set_dirty_socket_pathr     s      -$r   c                  v    t         .t        j                  j                  d      } | r| S t	        d      t         S )zGet the dirty socket path.GUNICORN_DIRTY_SOCKETz\Dirty socket path not configured. Make sure dirty_workers > 0 and dirty_apps are configured.)r   osenvironrD   r   )r   s    r   get_dirty_socket_pathr     s>    !zz~~56KI
 	
 r   returnc                 p    t        t        dd      }|"t               }t        ||       }|t        _        |S )a  
    Get or create a thread-local sync client.

    This is the recommended way to get a client in sync HTTP workers.

    Args:
        timeout: Timeout for operations in seconds

    Returns:
        DirtyClient: Thread-local client instance

    Example::

        from gunicorn.dirty import get_dirty_client

        def my_view(request):
            client = get_dirty_client()
            result = client.execute("myapp.ml:MLApp", "inference", data)
            return result
    r   Nr0   )getattr_thread_localr   r
   r   r   r|   r   s      r   get_dirty_clientr     s8    * ]ND9F~+-[':%+"Mr   c                    K   	 t         j                         }|S # t        $ r0 t               }t	        ||       }t         j                  |       Y |S w xY ww)a  
    Get or create a context-local async client.

    This is the recommended way to get a client in async HTTP workers.

    Args:
        timeout: Timeout for operations in seconds

    Returns:
        DirtyClient: Context-local client instance

    Example::

        from gunicorn.dirty import get_dirty_client_async

        async def my_view(request):
            client = await get_dirty_client_async()
            result = await client.execute_async("myapp.ml:MLApp", "inference", data)
            return result
    r0   )r   rD   LookupErrorr   r
   setr   s      r   get_dirty_client_asyncr     sW     *&"&&(
 M	  &+-[':f%M	&s%   A A5AAAAc                  b    t        t        dd      } | | j                          dt        _        yy)z4Close the thread-local client (call on worker exit).r   N)r   r   rK   r   r|   s    r   close_dirty_clientr     s,    ]ND9F%)" r   c                     K   	 t         j                         } | j                          d{    y7 # t        $ r Y yw xY ww)z%Close the context-local async client.N)r   rD   ra   r   r   s    r   close_dirty_client_asyncr     s<     "&&(  """ s,   A'3 13 A3 	?A?Arp   )ru   rO   contextvarsr   r   r   r   r3   errorsr   r   r   protocolr   r   r
   r?   r[   localr   
ContextVarr   __annotations__r   r   r   r   r   r   r   rv   r   r   <module>r      s   
   	     
s! s!v	OB OBdR> R>t  	! :P9O9O: ;))+6 
   k :+ <*r   