Apache FlinkCEP 实现超时状态监控的步骤详解

CEP - Complex Event Processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

Apache FlinkCEP API

CEPTimeoutEventJob

FlinkCEP源码简析

DataStream和PatternStream

DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

public

static
 <IN, OUT>
SingleOutputStreamOperator
<OUT> createPatternStream(...){...}
public

static
 <IN, OUT1, OUT2>
SingleOutputStreamOperator
<OUT1> createTimeoutPatternStream(...){...}

final

SingleOutputStreamOperator
<OUT> patternStream;

SingleOutputStreamOperator

@Public

public

class

SingleOutputStreamOperator
<T>
extends

DataStream
<T> {...}

PatternStream的构造方法:

PatternStream
(
final

DataStream
<T> inputStream,
final

Pattern
<T, ?> pattern) {

this
.inputStream = inputStream;

this
.pattern = pattern;

this
.comparator =
null
;

}

PatternStream
(
final

DataStream
<T> inputStream,
final

Pattern
<T, ?> pattern,
final

EventComparator
<T> comparator) {

this
.inputStream = inputStream;

this
.pattern = pattern;

this
.comparator = comparator;

}

Pattern、Quantifier和EventComparator

Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

public
class
Pattern
<T, F
extends
 T> {
/** 模式名称 */
private
final
String
 name;
/** 前面一个模式 */
private
final
Pattern
<T, ?
extends
 T> previous;
/** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */
private
IterativeCondition
<F> condition;
/** 时间窗口长度,在时间长度内进行模式匹配 */
private
Time
 windowTime;
/** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */
private
Quantifier
 quantifier =
Quantifier
.one(
ConsumingStrategy
.STRICT);
/** 停止将事件收集到循环状态时,事件必须满足的条件 */
private
IterativeCondition
<F> untilCondition;
/**
   * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数
   */
private
Times
 times;
// 匹配到事件之后的跳过策略
private
final
AfterMatchSkipStrategy
 afterMatchSkipStrategy;
  ...
}

Quantifier是用来描述具体模式行为的,主要有三大类:

Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。

每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。

循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。

public
class
Quantifier
 {
  ...
/**
   * 5个属性,可以组合,但并非所有的组合都是有效的
   */
public
enum
QuantifierProperty
 {
    SINGLE,
    LOOPING,
    TIMES,
    OPTIONAL,
    GREEDY
  }
/**
   * 描述在此模式中匹配哪些事件的策略
   */
public
enum
ConsumingStrategy
 {
    STRICT,
    SKIP_TILL_NEXT,
    SKIP_TILL_ANY,
    NOT_FOLLOW,
    NOT_NEXT
  }
/**
   * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到
   */
public
static
class
Times
 {
private
final
int
 from;
private
final
int
 to;
private
Times
(
int
 from,
int
 to) {
Preconditions
.checkArgument(from >
0
,
"The from should be a positive number greater than 0."
);
Preconditions
.checkArgument(to >= from,
"The to should be a number greater than or equal to from: "
 + from +
"."
);
this
.from = from;
this
.to = to;
    }
public
int
 getFrom() {
return
 from;
    }
public
int
 getTo() {
return
 to;
    }
// 次数范围
public
static
Times
 of(
int
 from,
int
 to) {
return
new
Times
(from, to);
    }
// 指定具体次数
public
static
Times
 of(
int
 times) {
return
new
Times
(times, times);
    }
@Override
public
boolean
 equals(
Object
 o) {
if
 (
this
 == o) {
return
true
;
      }
if
 (o ==
null
 || getClass() != o.getClass()) {
return
false
;
      }
Times
 times = (
Times
) o;
return
 from == times.from &&
        to == times.to;
    }
@Override
public
int
 hashCode() {
return
Objects
.hash(from, to);
    }
  }
  ...
}

EventComparator,自定义事件比较器,实现EventComparator接口。

public

interface

EventComparator
<T>
extends

Comparator
<T>,
Serializable
 {
long
 serialVersionUID =
1L
;
}

NFACompiler和NFA

NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

public
class
NFACompiler
 {
  ...
/**
   * NFAFactory 创建NFA的接口
   *
   * @param <T> Type of the input events which are processed by the NFA
   */
public
interface
NFAFactory
<T>
extends
Serializable
 {
    NFA<T> createNFA();
  }

/**
   * NFAFactory的具体实现NFAFactoryImpl
   *
   * <p>The implementation takes the input type serializer, the window time and the set of
   * states and their transitions to be able to create an NFA from them.
   *
   * @param <T> Type of the input events which are processed by the NFA
   */
private
static
class
NFAFactoryImpl
<T>
implements
NFAFactory
<T> {

private
static
final
long
 serialVersionUID =
8939783698296714379L
;

private
final
long
 windowTime;
private
final
Collection
<
State
<T>> states;
private
final
boolean
 timeoutHandling;

private
NFAFactoryImpl
(
long
 windowTime,
Collection
<
State
<T>> states,
boolean
 timeoutHandling) {

this
.windowTime = windowTime;
this
.states = states;
this
.timeoutHandling = timeoutHandling;
    }

@Override
public
 NFA<T> createNFA() {
// 一个NFA由状态集合、时间窗口的长度和是否处理超时组成
return
new
 NFA<>(states, windowTime, timeoutHandling);
    }
  }
}

NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。

更多内容参见

https://zh.wikipedia.org/wiki/非确定有限状态自动机

public
class
 NFA<T> {
/**
   * NFACompiler返回的所有有效的NFA状态集合
   * These are directly derived from the user-specified pattern.
   */
private
final
Map
<
String
,
State
<T>> states;

/**
   * Pattern.within(Time)指定的时间窗口长度
   */
private
final
long
 windowTime;

/**
   * 一个超时匹配的标记
   */
private
final
boolean
 handleTimeout;
  ...
}

PatternSelectFunction和PatternFlatSelectFunction

当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

public

interface

PatternSelectFunction
<IN, OUT>
extends

Function
,
Serializable
 {

/**

   * 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识

   */

  OUT select(
Map
<
String
,
List
<IN>> pattern)
throws

Exception
;

}

PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。

public
interface
PatternFlatSelectFunction
<IN, OUT>
extends
Function
,
Serializable
 {

/**
   * 生成一个或多个结果
   */
void
 flatSelect(
Map
<
String
,
List
<IN>> pattern,
Collector
<OUT> out)
throws
Exception
;
}

SelectTimeoutCepOperator、PatternTimeoutFunction

SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。

SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。

模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

public
class
SelectTimeoutCepOperator
<IN, OUT1, OUT2, KEY>
extends
AbstractKeyedCEPPatternOperator
<IN, KEY, OUT1,
SelectTimeoutCepOperator
.
SelectWrapper
<IN, OUT1, OUT2>> {
private
OutputTag
<OUT2> timedOutOutputTag;
public
SelectTimeoutCepOperator
(
TypeSerializer
<IN> inputSerializer,
boolean
 isProcessingTime,
NFACompiler
.
NFAFactory
<IN> nfaFactory,
final
EventComparator
<IN> comparator,
AfterMatchSkipStrategy
 skipStrategy,
// 参数命名混淆了flat...包括SelectWrapper类中的成员命名...
PatternSelectFunction
<IN, OUT1> flatSelectFunction,
PatternTimeoutFunction
<IN, OUT2> flatTimeoutFunction,
OutputTag
<OUT2> outputTag,
OutputTag
<IN> lateDataOutputTag) {
super
(
      inputSerializer,
      isProcessingTime,
      nfaFactory,
      comparator,
      skipStrategy,
new
SelectWrapper
<>(flatSelectFunction, flatTimeoutFunction),
      lateDataOutputTag);
this
.timedOutOutputTag = outputTag;
  }
  ...
}
public
interface
PatternTimeoutFunction
<IN, OUT>
extends
Function
,
Serializable
 {
  OUT timeout(
Map
<
String
,
List
<IN>> pattern,
long
 timeoutTimestamp)
throws
Exception
;
}
public
interface
PatternFlatTimeoutFunction
<IN, OUT>
extends
Function
,
Serializable
 {
void
 timeout(
Map
<
String
,
List
<IN>> pattern,
long
 timeoutTimestamp,
Collector
<OUT> out)
throws
Exception
;
}

CEP和CEPOperatorUtils

CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

public
class
 CEP {

public
static
 <T>
PatternStream
<T> pattern(
DataStream
<T> input,
Pattern
<T, ?> pattern) {
return
new
PatternStream
<>(input, pattern);
  }

public
static
 <T>
PatternStream
<T> pattern(
DataStream
<T> input,
Pattern
<T, ?> pattern,
EventComparator
<T> comparator) {
return
new
PatternStream
<>(input, pattern, comparator);
  }
}

CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

public
class
CEPOperatorUtils
 {
  ...
private
static
 <IN, OUT, K>
SingleOutputStreamOperator
<OUT> createPatternStream(
final
DataStream
<IN> inputStream,
final
Pattern
<IN, ?> pattern,
final
TypeInformation
<OUT> outTypeInfo,
final
boolean
 timeoutHandling,
final
EventComparator
<IN> comparator,
final
OperatorBuilder
<IN, OUT> operatorBuilder) {
final
TypeSerializer
<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());

// check whether we use processing time
final
boolean
 isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() ==
TimeCharacteristic
.
ProcessingTime
;

// compile our pattern into a NFAFactory to instantiate NFAs later on
final
NFACompiler
.
NFAFactory
<IN> nfaFactory =
NFACompiler
.compileFactory(pattern, timeoutHandling);

final
SingleOutputStreamOperator
<OUT> patternStream;

if
 (inputStream
instanceof
KeyedStream
) {
KeyedStream
<IN, K> keyedStream = (
KeyedStream
<IN, K>) inputStream;
      patternStream = keyedStream.transform(
        operatorBuilder.getKeyedOperatorName(),
        outTypeInfo,
        operatorBuilder.build(
          inputSerializer,
          isProcessingTime,
          nfaFactory,
          comparator,
          pattern.getAfterMatchSkipStrategy()));
    }
else
 {
KeySelector
<IN,
Byte
> keySelector =
new
NullByteKeySelector
<>();
      patternStream = inputStream.keyBy(keySelector).transform(
        operatorBuilder.getOperatorName(),
        outTypeInfo,
        operatorBuilder.build(
          inputSerializer,
          isProcessingTime,
          nfaFactory,
          comparator,
          pattern.getAfterMatchSkipStrategy()
        )).forceNonParallel();
    }

return
 patternStream;
  }
  ...
}

FlinkCEP实现步骤

  1. IN: DataSource -> DataStream -> Transformations -> DataStream
  2. Pattern: Pattern.begin.where.next.where...times...
  3. PatternStream: CEP.pattern(DataStream, Pattern)
  4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
  5. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP匹配超时实现步骤

TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

KeySelector
<IN,
Byte
> keySelector =
new

NullByteKeySelector
<>();

Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。

  1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
  2. Pattern: Pattern.begin.where.next.where...within(Time windowTime)
  3. PatternStream: CEP.pattern(KeyedStream, Pattern)
  4. OutputTag: new OutputTag(...)
  5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
  6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
  7. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP超时不足

和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

FlinkCEP超时完整demo

public
class
CEPTimeoutEventJob
 {
private
static
final
String
 LOCAL_KAFKA_BROKER =
"localhost:9092"
;
private
static
final
String
 GROUP_ID =
CEPTimeoutEventJob
.
class
.getSimpleName();
private
static
final
String
 GROUP_TOPIC = GROUP_ID;

public
static
void
 main(
String
[] args)
throws
Exception
 {
// 参数
ParameterTool
 params =
ParameterTool
.fromArgs(args);

StreamExecutionEnvironment
 env =
StreamExecutionEnvironment
.getExecutionEnvironment();
// 使用事件时间
    env.setStreamTimeCharacteristic(
TimeCharacteristic
.
EventTime
);
    env.enableCheckpointing(
5000
);
    env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(
RestartStrategies
.fixedDelayRestart(
5
,
10000
));

// 不使用POJO的时间
final
AssignerWithPeriodicWatermarks
 extractor =
new
IngestionTimeExtractor
<POJO>();

// 与Kafka Topic的Partition保持一致
    env.setParallelism(
3
);

Properties
 kafkaProps =
new
Properties
();
    kafkaProps.setProperty(
"bootstrap.servers"
, LOCAL_KAFKA_BROKER);
    kafkaProps.setProperty(
"group.id"
, GROUP_ID);

// 接入Kafka的消息
FlinkKafkaConsumer011
<POJO> consumer =
new
FlinkKafkaConsumer011
<>(GROUP_TOPIC,
new
POJOSchema
(), kafkaProps);
DataStream
<POJO> pojoDataStream = env.addSource(consumer)
        .assignTimestampsAndWatermarks(extractor);
    pojoDataStream.print();

// 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】
// 1.
DataStream
<POJO> keyedPojos = pojoDataStream
        .keyBy(
"aid"
);

// 从初始化到终态-一个完整的POJO事件序列
// 2.
Pattern
<POJO, POJO> completedPojo =
Pattern
.<POJO>begin(
"init"
)
            .where(
new
SimpleCondition
<POJO>() {
private
static
final
long
 serialVersionUID = -
6847788055093903603L
;

@Override
public
boolean
 filter(POJO pojo)
throws
Exception
 {
return
"02"
.equals(pojo.getAstatus());
              }
            })
            .followedBy(
"end"
)
//            .next("end")
            .where(
new
SimpleCondition
<POJO>() {
private
static
final
long
 serialVersionUID = -
2655089736460847552L
;

@Override
public
boolean
 filter(POJO pojo)
throws
Exception
 {
return
"00"
.equals(pojo.getAstatus()) ||
"01"
.equals(pojo.getAstatus());
              }
            });

// 找出1分钟内【便于测试】都没有到终态的事件aid
// 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream
// 3.
PatternStream
<POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within(
Time
.minutes(
1
)));

// 定义侧面输出timedout
// 4.
OutputTag
<POJO> timedout =
new
OutputTag
<POJO>(
"timedout"
) {
private
static
final
long
 serialVersionUID =
773503794597666247L
;
    };

// OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction
// 5.
SingleOutputStreamOperator
<POJO> timeoutPojos = patternStream.flatSelect(
        timedout,
new
POJOTimedOut
(),
new
FlatSelectNothing
()
    );

// 打印输出超时的POJO
// 6.7.
    timeoutPojos.getSideOutput(timedout).print();
    timeoutPojos.print();
    env.execute(
CEPTimeoutEventJob
.
class
.getSimpleName());
  }

/**
   * 把超时的事件收集起来
   */
public
static
class
POJOTimedOut
implements
PatternFlatTimeoutFunction
<POJO, POJO> {
private
static
final
long
 serialVersionUID = -
4214641891396057732L
;

@Override
public
void
 timeout(
Map
<
String
,
List
<POJO>> map,
long
 l,
Collector
<POJO> collector)
throws
Exception
 {
if
 (
null
 != map.get(
"init"
)) {
for
 (POJO pojoInit : map.get(
"init"
)) {
System
.out.println(
"timeout init:"
 + pojoInit.getAid());
          collector.collect(pojoInit);
        }
      }
// 因为end超时了,还没收到end,所以这里是拿不到end的
System
.out.println(
"timeout end: "
 + map.get(
"end"
));
    }
  }

/**
   * 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了
   * 一分钟时间内走完init和end的数据
   *
   * @param <T>
   */
public
static
class
FlatSelectNothing
<T>
implements
PatternFlatSelectFunction
<T, T> {
private
static
final
long
 serialVersionUID = -
3029589950677623844L
;

@Override
public
void
 flatSelect(
Map
<
String
,
List
<T>> pattern,
Collector
<T> collector) {
System
.out.println(
"flatSelect: "
 + pattern);
    }
  }
}

测试结果(followedBy):

3
> POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-1'
, astyle=
'STYLE000-2'
, aname=
'NAME-1'
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}]}
timeout init:ID000-
1
3
> POJO{aid=
'ID000-1'
, astyle=
'STYLE000-2'
, aname=
'NAME-1'
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
timeout
end
:
null
3
> POJO{aid=
'ID000-2'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419829639
, energy=
467.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-2'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419841394
, energy=
107.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419979567
, energy=
32.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
'ID000-3'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createTime=
null
, updateTime=
null
}]}
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420078008
, energy=
275.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
timeout init:ID000-
4
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
timeout
end
:
null

总结

以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

(0)

相关推荐

  • Apache FlinkCEP 实现超时状态监控的步骤详解

    CEP - Complex Event Processing复杂事件处理. 订单下单后超过一定时间还未进行支付确认. 打车订单生成后超过一定时间没有确认上车. 外卖超过预定送达时间一定时限还没有确认送达. Apache FlinkCEP API CEPTimeoutEventJob FlinkCEP源码简析 DataStream和PatternStream DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter.Map等转换为另一个Da

  • 在Spring Boot应用程序中使用Apache Kafka的方法步骤详解

    第1步:生成我们的项目: Spring Initializr来生成我们的项目.我们的项目将提供Spring MVC / Web支持和Apache Kafka支持. 第2步:发布/读取Kafka主题中的消息: <b>public</b> <b>class</b> User { <b>private</b> String name; <b>private</b> <b>int</b> age

  • Java 用Prometheus搭建实时监控系统过程详解

    上帝之火 本系列讲述的是开源实时监控告警解决方案Prometheus,这个单词很牛逼.每次我都能联想到带来上帝之火的希腊之神,普罗米修斯.而这个开源的logo也是火,个人挺喜欢这个logo的设计. 本系列着重介绍Prometheus以及如何用它和其周边的生态来搭建一套属于自己的实时监控告警平台. 本系列受众对象为初次接触Prometheus的用户,大神勿喷,偏重于操作和实战,但是重要的概念也会精炼出提及下.系列主要分为以下几块 Prometheus各个概念介绍和搭建,如何抓取数据(本次分享内容)

  • IntelliJ IDEA基于SpringBoot如何搭建SSM开发环境的步骤详解

    之前给大家在博文中讲过如何通过eclipse快速搭建SSM开发环境,但相对而言还是有些麻烦的,今天玄武老师给大家介绍下如何使用IntelliJ IDEA基于SpringBoot来更快速地搭建SSM开发环境,相比于传统搭建方式,极少的配置文件和配置信息会让你彻底爱上它. 环境搭建步骤详解 第1步:创建Spring Initializr项目 在IntelliJ IDEA中新建项目,选择Spring Initializr,JDK版本选择自己安装的版本(首次使用可能显示没有,那么就点击New去按照步骤创

  • jboss( WildFly)上运行 springboot程序的步骤详解

    WildFly,原名 JBoss AS(JBoss Application Server) 或者 JBoss,是一套应用程序服务器,属于开源的企业级 Java 中间件软件,用于实现基于 SOA 架构的 Web 应用和服务. WildFly 包含一组可独立运行的软件. WildFly采用积极的方法进行内存管理.开发基本运行时服务是为了最大程度地减少堆分配.这些服务在重复的完整解析中使用公共的缓存索引元数据,从而减少了堆和对象的流失.模块化类加载的使用可防止重复类和加载超出系统配置要求的类.这不仅减

  • Spring Cloud Eureka 注册与发现操作步骤详解

    在搭建Spring Cloud Eureka环境前先要了解整个架构的组成,常用的基础模式如下图: 服务提供者:将springboot服务编写好以后,通过配置注册中心地址方式注册,提供给消费者使用. 注册中心:服务的中间桥梁,服务提供者将服务注册.服务消费者可以通过注册信息调用需要使用的服务. 服务消费者:通过规定的调用方式,读取注册中心的注册信息,调用相应的服务. 根据后续的服务复杂度进化以后,可以看到服务提供者也可以是服务消费者,服务消费者也可以是服务提供者.根据不同的业务情况是可以互相调用的

  • IDEA SSM框架整合配置及步骤详解

    参考 狂神说SpringMVC05:整合SSM框架 https://mp.weixin.qq.com/s?__biz=Mzg2NTAzMTExNg==&mid=2247484004&idx=1&sn=cef9d881d0a8d7db7e8ddc6a380a9a76&scene=19#wechat_redirect 前言 根据自己的环境参考狂神的视频进行了SSM框架整合,用于备忘 SSM框架整合步骤 1. 创建数据库 2. IDEA创建maven项目.在pom.xml中设设置

  • Rancher通过界面管理K8s平台的图文步骤详解

    目录 一.Rancher 简介 1.Rancher API Server 的功能 2.Rancher 主要组件和功能图示 二.Rancher 安装 1.通过 Docker 来进行安装 2.在 Rancher 的界面上绑定 K8s 3.在 Rancher 上部署应用 一.Rancher 简介 Rancher 是为使用容器的公司打造的容器管理平台,通过 Rancher,企业不再需要使用一系列开源软件从零开始构建一个容器服务平台.同时 Rancher 还提供了一个全栈容器部署和管理平台,用于管理 Do

  • mysql 5.7.20解压版安装方法步骤详解(两种方法)

    我来讲解下window64位下MySQL的安装,MySQL是在5.7开始安装版就只有32位下载服务了,这里我讲解解压版的MySQL如何安装,在安装MySQL解压版时对于新手的小编来说也是头疼得很,各种问题各种来没有安装版的一键轻松搞定的方便,安装时需要注意三点:1.路径配置,2.安装时MySQL端口被占用这时需要关闭被占用端口,3.cmd必须是在管理员环境下设置MySQL信息. MySQL官网: https://www.mysql.com/downloads/ http://www.jb51.n

  • Docker新手实践及部署NGINX的步骤详解

    docker是什么 官方文档入口 在知乎上看到关于docker的思想讲的挺好的:如何通俗解释Docker是什么?.这里直接引用了. Docker的思想来自于集装箱,集装箱解决了什么问题?在一艘大船上,可以把货物规整的摆放起来.并且各种各样的货物被集装箱标准化了,集装箱和集装箱之间不会互相影响.那么我就不需要专门运送水果的船和专门运送化学品的船了.只要这些货物在集装箱里封装的好好的,那我就可以用一艘大船把他们都运走. docker就是类似的理念.现在都流行云计算了,云计算就好比大货轮.docker

随机推荐