flink cep sql Event Not triggering

I use CEP Pattern in Flink SQL which is working as expected connecting to Kafka broker. But when i connecting to cluster based cloud kafka setup, the Flink CEP is not triggering. Here is my sql:

create table agent_action_detail 
(
    agent_id String, 
    room_id String, 
    create_time Bigint, 
    call_type String, 
    application_id String, 
    connect_time Bigint, 
    row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time  - INTERVAL '1' MINUTE) 
with ('connector'='kafka', 'topic'='agent-action-detail', ...)

then I send messages in json format like

{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}

in flink web ui, watermark works fine flink web ui

I run my cep sql :

select * from agent_action_detail
 match_recognize(
    partition by agent_id 
    order by row_time 
    measures 
        last(BF.create_time) as create_time, 
        first(AF.connect_time) as connect_time 
    one row per match AFTER MATCH SKIP PAST LAST ROW 
    pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
 )

every kafka message, connect_time is > 0, but flink not triggering. Can somebody help to this issue, thanks in advance!



from Recent Questions - Stack Overflow https://ift.tt/3Bdxgtg
https://ift.tt/eA8V8J

Comments

Popular posts from this blog

Spring Elasticsearch Operations

Network Error and Timeout on Authorize.net JS

Object oriented programming concepts (OOPs)