
    ZThp[                       d dl mZ d dlZd dlmZ d dlmZ d dlmZmZm	Z	m
Z
mZmZmZ d dlZd dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z"m#Z#m$Z$m%Z% d dl&m'Z'm(Z( d dl)mZm*Z*m+Z+m,Z,m-Z- d dl.m/Z/ d dl0m1Z1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7  ejp                  e9      Z: G d d      Z;y)    )annotationsN)Callable)datetime)AnyDictListOptionalSetUnioncast)	Timestamp)
StatusCode) IngestWithConfigDataChannelValue!IngestWithConfigDataStreamRequest)IngestServiceStub)ChannelConfig)IngestionConfig)ProtobufMaxSizeExceededError)SiftChannel)IngestionValidationError)create_flow_configscreate_ingestion_config"get_ingestion_config_by_client_keyget_ingestion_config_flows)
create_runget_run_id_by_name)r   ChannelValuechannel_fqnempty_valueis_data_type)TelemetryConfig)Flow
FlowConfigFlowOrderedChannelValues)
RuleConfig)RuleServicec                     e Zd ZU ded<   ded<   ded<   ded<   d	ed
<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   ded<   	 	 	 d-	 	 	 	 	 	 	 	 	 d.dZd/dZd0dZd1dZ	 	 	 	 	 d2	 	 	 	 	 	 	 	 	 	 	 	 	 d3dZd Z		 	 	 	 	 	 	 	 	 	 d4d Z
	 	 	 	 	 	 	 	 	 	 d5d!Zd6d"Zd6d#Z	 	 	 	 	 	 	 	 d7d$Z	 	 	 	 	 	 	 	 d8d%Zd9d&Zd:d'Zd9d(Zd:d)Zd/d*Ze	 	 	 	 	 	 d;d+       Ze	 d<	 	 	 	 	 	 	 d=d,       Zy)>_IngestionServiceImplr   transport_channelr   ingestion_configstr
asset_namezDict[str, FlowConfig]flow_configs_by_namezSet[str]flow_configs_createdzList[RuleConfig]rulesOptional[str]run_idorganization_idboolend_stream_on_errorr!   configuse_lazy_flow_creationr   ingest_service_stubr&   rule_serviceNc                0   	 | j                   j                  |||      }|| _        || _        |j
                  r`| j                  sT|j                  D ci c]  }|j                  | c}| _        |j                  D ch c]  }|j                   c}| _	        n| j                  r:|j                  D ci c]  }|j                  | c}| _        t               | _	        nut        ||j                        D cg c]  }t        j                  |       }	}|	D ci c]  }|j                  | c}| _        |	D ch c]  }|j                   c}| _	        t        |      | _        |j"                  rt|j"                  D ]@  }
|j$                  |
j&                  vs|
j&                  j)                  |j$                         B | j                   j+                  |j"                         |j"                  | _        |j$                  | _        || _        || _        |j0                  | _        || _        t5        |      | _        || _        y # t        $ r) | j                   j                  ||d      }d| _        Y Ow xY wc c}w c c}w c c}w c c}w c c}w c c}w )N)
lazy_flowsT)	__class___get_or_create_ingestion_configr6   r   r*   "_ingestion_client_key_is_generatedflowsnamer-   r.   setr   ingestion_config_idr#   from_pbr&   r8   r/   r,   asset_namesappendcreate_or_update_rulesr)   r1   r2   r4   r   r7   r5   )selfchannelr5   r1   r4   force_lazy_flow_creationr*   flowfr>   rules              g/home/www/backend.miabetepe.com/venv/lib/python3.12/site-packages/sift_py/ingestion/_internal/ingest.py__init__z_IngestionServiceImpl.__init__<   sN   
	C#~~MM,D  N   +CD' 0 44T=X=X FL\\(RTD(RD%?E||(Lt(LD% **IO,VTYY_,V)69e) 8(<< &&q)  JO,OTYY_,O)CH,I4TYY,I)'0<< ?$$D,<,<<$$++F,=,=>? 44V\\B\\
 ++!(%55#6 #4W#= i , 	/#~~MMD  N   +/D'		/ )S(L -W -P,Is5   I I:>I?4J8J	J:J.I76I7c                    | j                   r | j                  |  | j                  j                  t	        |             y)z)
        Perform data ingestion.
        N)r6   _lazy_flow_creationr7   IngestWithConfigDataStreamiter)rF   requestss     rL   ingestz_IngestionServiceImpl.ingest~   s5    
 &&$D$$h/  ;;DNK    c                    g }|D ]5  }|d   }|d   }|d   }| j                  |||      }|j                  |       7 | j                  r | j                  |  | j                  j                  t        |             y)z
        Combines the requests creation step and ingestion into a single call.
        See `create_ingestion_request` for information about how client-side validations are handled.
        	flow_name	timestampchannel_valuesN)create_ingestion_requestrD   r6   rO   r7   rP   rQ   rF   r>   rR   rI   rV   rW   rX   reqs           rL   ingest_flowsz"_IngestionServiceImpl.ingest_flows   s      	!D[)I[)I!"23N//	9nUCOOC 	! &&$D$$h/  ;;DNKrT   c                    g }|D ]5  }|d   }|d   }|d   }| j                  |||      }|j                  |       7 | j                  r | j                  |  | j                  j                  t        |             y)z
        Combines the requests creation step and ingestion into a single call.
        See `try_create_ingestion_request` for information about how client-side validations are handled.
        rV   rW   rX   N)try_create_ingestion_requestrD   r6   rO   r7   rP   rQ   rZ   s           rL   try_ingest_flowsz&_IngestionServiceImpl.try_ingest_flows   s      	!D[)I[)I!"23N33Iy.YCOOC 	! &&$D$$h/  ;;DNKrT   c                x    |st        ||      }||| _        yt        |||xs d|xs d|xs g |      | _        y)z
        Retrieve an existing run or create one to use during this period of ingestion.

        Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name.
        N )rG   run_namedescriptionr2   tagsmetadata)r   r1   r   )	rF   rG   rb   rc   r2   rd   re   	force_newr1   s	            rL   
attach_runz _IngestionServiceImpl.attach_run   sO     ':F!$ #)r+1r
rT   c                    d| _         y)z
        Detach run from this period of ingestion. Subsequent data ingested won't be associated with
        the run being detached.
        N)r1   )rF   s    rL   
detach_runz _IngestionServiceImpl.detach_run   s    
 rT   c                   g }t        |      t        |j                        k7  r.t        dt        |j                         dt        |       d      t        t        |            D ]  }||   }t	        |j                               }t        |      dk7  rt        d|       |d   }	|j                  |   }
	 |
j                  ||	         }t        ||
j                        r|j                  |        |S # t        $ rG t        d|j                  |   j                   d|j                  |   j                   d	| d
| d	      w xY w)Nz	Expected z channel values, got .   z2Expected exactly one key in flow value, got keys: r   Expected value for `` to be a 'z'. Instead found z	 in flow )lenchannelsr   rangelistkeys
ValueErrortry_value_fromr    	data_typerD   r?   )rF   rV   flow_configrW   rX   valuesichannel_dictchannel_typechannel_type_keychannel_config
chan_values               rL   +try_create_ingestion_request_ordered_valueszA_IngestionServiceImpl.try_create_ingestion_request_ordered_values   s    :<~#k&:&:";;*C 4 4566KCP^L_K``ab  s>*+ 	A)!,L 1 1 34L< A% HW   ,A(11!4N+::<HX;YZ

N,D,DEMM*-	(   .*;+?+?+B+G+G*HT_ThThijTkTuTuSv  wH  IU  HV  V_  `i  _j  jk  l s   8;C77AEc                6   i }|D ]2  }t        |      }|j                  |d       |||<   &t        d|        g }|j                  D ]  }	t        |	      }|j	                  |d       }
|
|j                  t                      <|
d   }t        ||	j                        r|j                  |       it        d|	j                   d|	j                   d       t        |      dkD  r.|j                         D cg c]  }| }}t        d| d|       |S c c}w )	Nz Encountered multiple values for valuerm   rn   '.r   z Unexpected channel(s) for flow 'z': )r   getr   rp   poprD   r   r    rv   r?   ro   rs   )rF   rV   rw   rW   rX   channel_values_by_fqnchannel_valuefqnrx   rG   channel_valr   r?   unexpected_channelss                 rL   +try_create_ingestion_request_channel_valueszA_IngestionServiceImpl.try_create_ingestion_request_channel_values   sR    :<+ 	YMm,C$((d3;-:%c*.1QRUQV/WXX	Y :<"++ 	Gg&C2G2K2KCQU2VK"km,(EE7#4#45e$.*7<<.GDUDUCVVXY 	" $%)4I4N4N4P"QD4"Q"Q*29+SAT@UV   #Rs   8	Dc                F    t        |t              xr d|v xr
 d|v xs d|v S )z
        Check if a value is a ChannelValue.
        ChannelValue has a "value" field and either a "name" or "channel_name" field.
        r   r?   channel_name)
isinstancedictrF   r   s     rL   _is_channel_valuez'_IngestionServiceImpl._is_channel_value%  s6     ud# =5 =5;Ne$;	
rT   c                t    t        t              xs& t        t              xr t        fddD              S )z
        Check if a value is an IngestWithConfigDataChannelValue.
        This is a protobuf message with specific fields.
        c              3  &   K   | ]  }|v  
 y w)N ).0fieldr   s     rL   	<genexpr>zA_IngestionServiceImpl._is_ingest_channel_value.<locals>.<genexpr>7  s      	 	s   )doublestringint32int64r3   )r   r   r   anyr   s    `rL   _is_ingest_channel_valuez._IngestionServiceImpl._is_ingest_channel_value0  s@    
 %!AB 
ud# 
 		 		
rT   c           	     Z   | j                   j                  |      }|t        d| d      |st        d      |d   }g }| j                  |      r*| j	                  |||t        t        t           |            }nS| j                  |      r*| j                  |||t        t        t           |            }nt        dt        |       d      |j                         dk7  rt        d|j                          d	      t               }|j                  |       | j!                  |||      S )
a  
        Creates an ingestion request for a flow that must exist in `flow_configs_by_name`. This method
        performs a series of client-side validations and will return a `IngestionValidationError` if any validations fail.
        Channel values can be provided as a list of `sift_py.ingestion.channel.ChannelValue` or a list of values from a
        `sift_py.ingestion.flow.FlowOrderedChannelValues`.
        zA flow config of name 'z' could not be found.z#Channel values list cannot be emptyr   zUnknown channel values format: zB. Expected either ChannelValue or IngestWithConfigDataChannelValueUTCz/Expected 'timestamp' to be in UTC but it is in rk   )r-   r   r   r   r   r   r   r   r   r   r   rt   typetznamer   FromDatetimerY   )rF   rV   rW   rX   rw   first_valuerx   timestamp_pbs           rL   r^   z2_IngestionServiceImpl.try_create_ingestion_requestC  sR    //33I>*))4IJ  *+PQQ$Q'9;!!+.EE;	4\8JN3[F **;7EET:;^L	F 1${2C1D ES S 
 &*A)BRBRBTAUUVW  !{!!),,,Y	6JJrT   c           	         t               }|j                  |       t        | j                  j                  |||| j
                  xs d| j                  xs d| j                        S )z
        Creates an ingestion request for a flow that must exist in `flow_configs_by_name`. This method
        does not do any sort of client-side validation and is recommended to use if performance is required.
        ra   )rA   rI   rW   rX   r1   r2   end_stream_on_validation_error)r   r   r   r*   rA   r1   r2   r4   )rF   rV   rW   rX   r   s        rL   rY   z._IngestionServiceImpl.create_ingestion_requestx  sa     !{!!),0 $ 5 5 I I");;$" 006B+/+C+C
 	
rT   c                   | j                   j                  rt        d      |D ]3  }|j                  | j                  v st        d|j                   d       t        | j                  | j                  j                  |       |D ]@  }|| j                  |j                  <   | j                  j                  |j                         B y)z
        Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists
        a flow with the name of the `flow_config` argument.
        Telemetry configs with generated ingestion client keys can not be updated at runtime.Use a custom ingestion client key if you want to update flows at runtime.z#There is already a flow with name 'r   N)r5   r=   r   r?   r-   r   r)   r*   rA   r.   addrF   rw   fcs      rL   try_create_flowz%_IngestionServiceImpl.try_create_flow  s    
 ;;99*\ 
  	bBww$333.1TUWU\U\T]]_/`aa	b 	""!!55	
  	3B13D%%bgg.%%))"''2	3rT   c                      | j                   | S )z)
        See `try_create_flows`.
        )r   rF   flow_configss     rL   try_create_flowsz&_IngestionServiceImpl.try_create_flows  s     $t##\22rT   c                &   | j                   j                  rt        d      t        | j                  | j
                  j                  |       |D ]@  }|| j                  |j                  <   | j                  j                  |j                         B y)z
        Like `try_create_flow` but will not do any client side validation and
        raise `IngestionValidationError`.
        r   N)r5   r=   r   r   r)   r*   rA   r-   r?   r.   r   r   s      rL   create_flowz!_IngestionServiceImpl.create_flow  s    
 ;;99*\ 
 	""!!55	

  	3B13D%%bgg.%%))"''2	3rT   c                      | j                   | S )z$
        See `create_flow`.
        )r   r   s     rL   create_flowsz"_IngestionServiceImpl.create_flows  s      t..rT   c                j   t               }|D ]O  }|j                  | j                  vs|j                  | j                  v s5|j	                  |j                         Q |D cg c]  }| j                  |    }}|rY	 t        | j                  | j                  j                  |       |D ]'  }| j                  j	                  |j                          ) yyc c}w # t        t        j                  f$ r}t        |t        j                        r"|j                         t        j                  k7  r |D ]q  }	 t        | j                  | j                  j                  |g       1# t        j                  $ r,}|j                         t        j                  k7  r Y d}~kd}~ww xY w Y d}~d}~ww xY w)z
        Used for lazy flow creation, which registers flows with sift as they are seen, instead of during
        the service initialization
        N)r@   rI   r.   r-   r   r   r)   r*   rA   r   grpcRpcErrorr   coder   ALREADY_EXISTSr?   )rF   rR   missing_flow_config_namesrequestflow_config_nameflow_configs_to_createerw   s           rL   rO   z)_IngestionServiceImpl._lazy_flow_creation  s   
 /2e! 	<G D$=$==LLD$=$==)--gll;	< %>"
  %%&67"
 "

 ""#**))==*,  6 @))--k.>.>?@1 ""
 1$--@ "a/vvx:#<#<< $: 	"K"+ 22 11EE(M
  == "668z'@'@@! A"	""sI   #C>+C F25AF-7,E$#F-$F#7"FF-F##F--F2c                    |j                   }t        |      dk(  ryt        | |      }t        |      D ci c]  \  }}|j                  | }}}g }d }	d }
|D ]{  }|j                  |j                        }||j                  |       2||   }|j                  D  ch c]
  }  |	|        }} |j                  D ]  } |
|      |vst        d       } t        |      dkD  rt         ||       yyc c}}w c c} w )z
        Compares local flows from a telemetry config with the flows registered in Sift. If a local flow
        contains channels that isn't in Sift then this will fail.
        r   Nc                6    t        |        d| j                   S Nrk   )r   rv   xs    rL   <lambda>z<_IngestionServiceImpl._update_flow_configs.<locals>.<lambda>  s    Q(!++7 rT   c                J    t        |        d| j                  j                   S r   )r   rv   r   r   s    rL   r   z<_IngestionServiceImpl._update_flow_configs.<locals>.<lambda>  s!    Q(!++*;*;)<= rT   z3Encountered duplicate flow with mismatched channels)
r>   ro   r   	enumerater?   r   rD   rp   r   r   )rG   rA   telemetry_configconfig_flows
sift_flowsry   rI   sift_flow_indices_by_nameflows_to_createsift_channel_identifierconfig_channel_identifierconfig_flowsift_flow_index	sift_flowsift_channel_identifiersconfig_channels                   rL   _update_flow_configsz*_IngestionServiceImpl._update_flow_configs  sE    (--|!/9LM
AJ:AV$WgaTYY\$W!$W,.
 8 	  > 	" ( 	K7;;K<L<LMO &&&{3
 #?3I AJ@R@R(5<'0($ ( #."6"6  1@D\\2M 	#	2 !#)<oN $M %X4(s   C5C;c                   t        ||j                        }|/|j                  s|r|S | j                  ||j                  |       |S |rg }n|j
                  }t        ||j                  ||j                  |j                        }|S )aP  
        Retrieves an existing ingestion config or creates a new one. If an existing ingestion config is fetched,
        then flows may be updated to reflect any changes that may have occured in the telemetry config.
        May raise `ProtobufMaxSizeExceeded` if a large number of flows needing updates or creation are passed
        )	r   ingestion_client_keyr=   r   rA   r>   r   r,   r2   )clsrG   r5   r:   r*   r   s         rL   r<   z5_IngestionServiceImpl._get_or_create_ingestion_config.  s     >gvGbGbc '88J''((2B2V2VX^_''L!<<L2''""
  rT   )NFF)
rG   r   r5   r!   r1   r0   r4   r3   rH   r3   )rR   r   )r>   r$   )r>   r"   )NNNNF)rG   r   rb   r+   rc   r0   r2   r0   rd   zOptional[List[str]]re   z,Optional[Dict[str, Union[str, float, bool]]]rf   r3   )
rV   r+   rw   r#   rW   r   rX   &List[IngestWithConfigDataChannelValue]returnr   )
rV   r+   rw   r#   rW   r   rX   zList[ChannelValue]r   r   )r   r   r   r3   )rV   r+   rW   r   rX   zAUnion[List[ChannelValue], List[IngestWithConfigDataChannelValue]]r   r   )rV   r+   rW   r   rX   r   r   r   )rw   r#   )r   r#   )rG   r   rA   r+   r   r!   )F)rG   r   r5   r!   r:   r3   r   r   )__name__
__module____qualname____annotations__rM   rS   r\   r_   rg   ri   r   r   r   r   r^   rY   r   r   r   r   rO   staticmethodr   classmethodr<   r   rT   rL   r(   r(   ,   s^   ""%%O//""""  ** !%$)).@@  @ 	@
 "@ #'@DLL(L0 &*)-$(AE

 
 #	

 '
 "
 ?
 
>!!  ! 	!
 ?! 
0!F**  * 	*
 +* 
0*X	

&3K3K 3K Z	3K
 
+3Kj

 
 ?	

 
+
.3233(/,@\ 5O5O365OJY5O 5On OT  !  +:  HL  	     rT   r(   )<
__future__r   loggingcollections.abcr   r   typingr   r   r   r	   r
   r   r   r   google.protobuf.timestamp_pb2r   r   sift.ingest.v1.ingest_pb2r   r   sift.ingest.v1.ingest_pb2_grpcr   /sift.ingestion_configs.v2.ingestion_configs_pb2r   ChannelConfigPbr   sift_py.errorr   sift_py.grpc.transportr   !sift_py.ingestion._internal.errorr   ,sift_py.ingestion._internal.ingestion_configr   r   r   r   sift_py.ingestion._internal.runr   r   sift_py.ingestion.channelr   r   r   r    "sift_py.ingestion.config.telemetryr!   sift_py.ingestion.flowr"   r#   r$   sift_py.ingestion.rule.configr%   sift_py.rule.servicer&   	getLoggerr   loggerr(   r   rT   rL   <module>r      s    "  $  > > >  3  = \ K 6 . F  K  ? M M 4 ,			8	$c  c rT   