flink cep sql Event Not triggering
I use CEP Pattern in Flink SQL which is working as expected connecting to Kafka broker. But when i connecting to cluster based cloud kafka setup, the Flink CEP is not triggering. Here is my sql:
create table agent_action_detail
(
agent_id String,
room_id String,
create_time Bigint,
call_type String,
application_id String,
connect_time Bigint,
row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time - INTERVAL '1' MINUTE)
with ('connector'='kafka', 'topic'='agent-action-detail', ...)
then I send messages in json format like
{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}
in flink web ui, watermark works fine flink web ui
I run my cep sql :
select * from agent_action_detail
match_recognize(
partition by agent_id
order by row_time
measures
last(BF.create_time) as create_time,
first(AF.connect_time) as connect_time
one row per match AFTER MATCH SKIP PAST LAST ROW
pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
)
every kafka message, connect_time is > 0, but flink not triggering. Can somebody help to this issue, thanks in advance!
from Recent Questions - Stack Overflow https://ift.tt/3Bdxgtg
https://ift.tt/eA8V8J
Comments
Post a Comment