Context 上下文
1、Content
语法
Context
基本语法
create context context_name partition [by] event_property [and event_property [and ...]] from stream_def [, event_property [...] from stream_def] [, ...]
context_name
:上下文名字,唯一。event_property
:事件的属性名,多个属性名之间用and
或逗号
连接。stream_def
:事件流的定义。
举例
create context NewUser partition by id and name from User -- id 和 name 是 User 的属性
如果 context
包含多个流,例子如下:
-- sid 是 Student 的属性,tid 是 Teacher 的属性 create context Person partition by sid from Student, tid from Teacher
注意: 多个流中每个流的中用于 context
的属性的数量要一样,数据类型也要一致。比如下面 错误示例:
-- 错误:sid是int,tname是String,数据类型不一致 create context Person partition by sid from Student, tname from Teacher -- 错误:Student 有一个属性,Teacher 有两个属性,属性数量不一致 create context Person partition by sid from Student, tid,tname from Teacher -- 错误:sid对应tname,sname对应tid,并且sname和tname是String,sid和tid是int,属性数量一样,但是对应的数据类型不一致 create context Person partition by sid,sname from Student, tname,tid from Teacher
过滤
实际上可以对进入 context
的事件增加过滤条件,不符合条件的就被过滤掉,就像下面这样:
-- age 大于 20 的 Student 事件才能建立或者进入 context create context Person partition by sid from Student(age > 20)
partition by
后面的属性,就是作为 context
的一个约束,比如说 id
,如果 id
相等的则进入同一个 context
里,如果 id
不同,那就新建一个 context
。好比根据 id
分组,id
相同的会被分到一个组里,不同的会新建一个组并等待相同的进入。
如果 parition by
后面跟着同一个流的两个属性,那么必须两个属性值一样才能进入 context
。比如说 A事件 id=1,name=a
,那么会以 1和a
两个值建立 context
,有点像数据库里的联合主键。然后 B事件id=1,name=b
,则又会新建一个context
。接着 C事件 id=1,name=a
,那么会进入 A事件建立的context
。
如果 partition by
后面跟着两个流的一个属性,那么两个属性值一样才能进入 context
。比如说 Student 事件 sid=1
,那么会新建一个 context
,然后来了个 Teacher 事件 tid=1
,则会进入 sid=1
的那个context
。多个流也一样,不用关心是什么事件,只用关心事件的属性值一样即可进入同一个context
。
2、Built-In Context Properties
Context
本身自带一些属性,最关键的是可以查看所创建的 context 的标识,并帮助我们理解context的语法。
如上所示,name
表示 上下文名称,这个是不会变的。id
是每个上下文的唯一标识,从 0
开始。key1
和 keyN
表示上下文定义时所选择的属性的值,1
和 N
表示属性的位置。例如:
-- key1为sid,key2为sname EPL: create context Person partition by sid, sname from Student
完整示例
为了说明对这几个属性的应用,举一个比较完整的例子。
1、定义事件模型
@Data @AllArgsConstructor public class Apple { private int id; private int price; }
2、测试
public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPRuntime runtime = epService.getEPRuntime(); // 指定事件模型 String apple = Apple.class.getName(); // 创建 context String epl1 = "create context appleTest partition by id,price from " + apple; // context.id 针对不同的 apple 的 id,price 建立一个 context // 如果事件的id和price相同,则 context.id 也相同,即表明事件进入了同一个context String epl2 = "context appleTest select context.id,context.name,context.key1,context.key2 from " + apple; // 添加事后处理 admin.createEPL(epl1); EPStatement state = admin.createEPL(epl2); // 注册监听 state.addListener((newEvents, oldEvents) -> { if (newEvents != null) { EventBean event = newEvents[0]; System.out.println("context.name:" + event.get("name") + ",context.id:" + event.get("id") + ",context.key1:" + event.get("key1") + ",context.key2:" + event.get("key2")); } }); // 模拟事件发生 System.out.println("sendEvent: id=1, price=20"); runtime.sendEvent(new Apple(1, 20)); System.out.println("sendEvent: id=2, price=30"); runtime.sendEvent(new Apple(2, 30)); System.out.println("sendEvent: id=1, price=20"); runtime.sendEvent(new Apple(1, 20)); System.out.println("sendEvent: id=4, price=20"); runtime.sendEvent(new Apple(4, 20)); }
执行结果:
sendEvent: id=1, price=20 context.name:appleTest,context.id:0,context.key1:1,context.key2:20 sendEvent: id=2, price=30 context.name:appleTest,context.id:1,context.key1:2,context.key2:30 sendEvent: id=1, price=20 context.name:appleTest,context.id:0,context.key1:1,context.key2:20 sendEvent: id=4, price=20 context.name:appleTest,context.id:2,context.key1:4,context.key2:20
这个例子说得比较明白,针对不同的 id和price
,都会新建一个 context
,并 context.id
会从 0
开始增加作为其标识。如果 id和price
一样,事件就会进入之前已经存在的 context
,所以 e3事件就和 e1事件一样存在于 context.id=0
的 context
里面。
其他示例
-- context 定义 create context test partition by id from Apple -- 每当 5 个 id 相同的 Apple 事件进入时,统计 price 总和 context test select sum(price) from Apple.win:length_batch(5) -- 根据不同的id,统计 3 秒内进入的事件的平均 price,且 price 必须大于10 context test select avg(price) from Apple(price>10).win:time(3 sec)
关于 .win:length
或者 .win:time
,可以理解为一堆事件,后面会介绍。
3、Hash Context
前面介绍的Context语法是以事件属性来定义的,Esper 提供了以 Hash
值为标准定义 Context,通俗一点说就是提供事件属性参与 hash 值的计算,计算的值再对某个值是同余的则进入到同一个 context 中。
语法
create context context_name coalesce [by] hash_func_name(hash_func_param) from stream_def [, hash_func_name(hash_func_param) from stream_def ] [, ...] granularity granularity_value [preallocate] -- create context 上下文名称 coalesce [by] hash函数名称(hash函数参数) from 事件类型 [, hash函数名称(hash函数参数) from 事件类型 ] [, ...] granularity granularity_value [preallocate]
hash_func_name
为 hash 函数的名称,Esper提供了CRC32
或者使用Java的hashcode
函数来计算hash
值,分别为consistent_hash_crc32
和hash_code
。你也可以自己定义hash函数,不过这需要配置。hash_func_param
为参与计算的属性列表,比如之前的sid
或者tname
属性。stream_def
就是事件类型,可以是多个。不同于前面的Context
语法要求,Hash Context
不管有多个少属性作为基础来计算hash值,hash值都只有一个,并且为int
型。所以就不用关心这些属性的个数以及数据类型了。granularity
是必选参数,表示为最多能创建多少个context
。granularity_value
就是那个用于取余的“某个值”,因为Esper为了防止内存溢出,就想出了取余这种办法来限制context
创建的数量。也就是说context.id=hash函数名称(hash函数参数) % granularity_value
。preallocate
是一个可选参数,如果使用它,那么Esper会预分配空间来创建granularity_value
数量的context
。比如说granularity_value 为 1024
,那么Esper会预创建1024个context
。内存不大的话不建议使用这个参数。
过滤条件
Hash Context 同样可以过滤事件,举个完整的例子:
-- 以 Java 的 hashcode 方法 计算 sid 的值(sid必须大于5) -- 以 CRC32 算法计算 tid 的值,然后对 10 取余后的值来建立 context create context HashPerson coalesce by hash_code(sid) from Student(sid>5), consistent_hash_crc32(tid) from Teacher granularity 10
Hash Context
也有 Built-In Context Properties
,只不过只有context.id和context.name了。用法和前面说的一样。
小贴士:
如果用于
hash
计算的属性比较多,那么就不建议使用CRC32
算法了,因为他会把这些属性值先序列化字节数组以后才能计算hash
值。hashcode
方法相对它能快很多。如果使用
preallocate
参数,建议granularity_value
不要超过1000
。如果
granularity_value
超过65536
,引擎查找context会比较费劲,进而影响计算速度。
4、Category Context
Category Context 相对之前的 Context
和 Hash Context
要简单许多,语法说明如下
语法
create context context_name group [by] group_expression as category_label [, group [by] group_expression as category_label] [, ...] from stream_def
group_expression
表示分组策略的表达式。category_label
为策略定义一个名字。
一个 context
可以有多个策略同时存在,但是特殊的是之能有一个 stream_def
。
例如:
create context CategoryByTemp group temp < 5 as cold, group temp between 5 and 85 as normal, group temp > 85 as large from Temperature
Category Context
也有它自带的属性。
label
指明进入的事件所处的 group
是什么。
完整示例
完整例子如下:
1、定义事件模型
@Data @AllArgsConstructor public class Apple { private int id; private int price; }
2、测试
public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPRuntime runtime = epService.getEPRuntime(); String apple = Apple.class.getName(); String epl1 = "create context appleTest group by id<0 as low, group by id>0 and id<10 as middle, group by id>10 as high from " + apple; String epl2 = "context appleTest select context.id, context.name, context.label, price from " + apple; admin.createEPL(epl1); EPStatement state = admin.createEPL(epl2); // 注册监听 state.addListener((newEvents, oldEvents) -> { if (newEvents != null) { EventBean event = newEvents[0]; System.out.println("context.name:" + event.get("name") + ",context.id:" + event.get("id") + ",context.label:" + event.get("label")); } }); System.out.println("sendEvent: id=1, price=20"); runtime.sendEvent(new Apple(1, 20)); System.out.println("sendEvent: id=0, price=30"); runtime.sendEvent(new Apple(0, 30)); System.out.println("sendEvent: id=11, price=20"); runtime.sendEvent(new Apple(11, 20)); System.out.println("sendEvent: id=-1, price=40"); runtime.sendEvent(new Apple(-1, 40)); }
执行结果
sendEvent: id=1, price=20 context.name:appleTest,context.id:1,context.label:middle sendEvent: id=0, price=30 sendEvent: id=11, price=20 context.name:appleTest,context.id:2,context.label:high sendEvent: id=-1, price=40 context.name:appleTest,context.id:0,context.label:low
可以发现,id=0
的事件,并没有触发监听器,那是因为 context
里的三个 category
没有包含 id=0
的情况,所以这个事件就被排除掉了。
5、Non-Overlapping Context
这类Context有个特点,是由开始和结束两个条件构成context
。语法如下:
语法
create context context_name start start_condition end end_condition
这个 context
有两个条件做限制,形成一个约束范围。当开始条件和结束条件都没被触发时,引擎会观察事件的进入是否会触发开始条件。如果开始条件被触发了,那么就新建一个 context
,并且观察结束条件是否被触发。如果结束条件被触发,那么 context
结束,引擎继续观察开始条件何时被触发。所以说这类 Context
的另一个特点是,要么 context
存在并且只有一个,要么条件都没被触发,也就一个 context
都没有了。
start_condition
和 end_condition
可以是时间,或者是事件类型。比如说:
-- 9点到17点此context才可用(以引擎的时间为准)。如果事件进入的时间不在此范围内,则不受该context影响 create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
完整示例
以某类事件开始,以某类事件结束。
1、定义事件模型
/** * 开始事件 */ public class StartEvent { } /** * 结束事件 */ public class EndEvent { } /** * 其他事件 */ @Getter @AllArgsConstructor public class OtherEvent { private int id; }
2、测试类
public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPRuntime runtime = epService.getEPRuntime(); String start = StartEvent.class.getName(); String end = EndEvent.class.getName(); String other = OtherEvent.class.getName(); // 以StartEvent事件作为开始条件,EndEvent事件作为结束条件 String epl1 = "create context NoOverLapping start " + start + " end " + end; String epl2 = "context NoOverLapping select * from " + other; admin.createEPL(epl1); EPStatement state = admin.createEPL(epl2); // 注册监听 state.addListener((newEvents, oldEvents) -> { EventBean event = newEvents[0]; System.out.println("Class:" + event.getUnderlying().getClass().getName() + ", id:" + event.get("id")); }); System.out.println("sendEvent: StartEvent"); runtime.sendEvent(new StartEvent()); System.out.println("sendEvent: OtherEvent"); runtime.sendEvent(new OtherEvent(2)); System.out.println("sendEvent: EndEvent"); runtime.sendEvent(new EndEvent()); System.out.println("sendEvent: OtherEvent"); runtime.sendEvent(new OtherEvent(4)); }
执行结果
sendEvent: StartEvent sendEvent: OtherEvent Class:com.esper.non_overlapping_context.OtherEvent, id:2 sendEvent: EndEvent sendEvent: OtherEvent
由此看出,在 NoOverLapping Context
下监控 OtherEvent
,必须是在 StartEvent
被触发才能监控到,所以在 EndEvent
发送后,再发送一个 OtherEvent
是不会触发 Listener
的。
6、OverLapping
OverLapping
和 NoOverLapping
一样都有两个条件限制,但是区别在于 OverLapping
的初始条件可以被触发多次,并且只要被触发就会新建一个 context
,但是当终结条件被触发时,之前建立的所有 context
都会被销毁。
语法
create context context_name initiated [by] initiating_condition terminated [by] terminating_condition
initiating_condition
和 terminating_condition
可以为事件类型,事件或者别的条件表达式。
完整示例
1、定义事件模型
/** * 初始事件 */ public class InitialEvent { } /** * 终止事件 */ public class TerminateEvent { } /** * 其他事件 */ @Getter @AllArgsConstructor public class SomeEvent { private int id; }
2、测试类
public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPRuntime runtime = epService.getEPRuntime(); String initial = InitialEvent.class.getName(); String terminate = TerminateEvent.class.getName(); String some = SomeEvent.class.getName(); // 以 InitialEvent 事件作为初始事件,TerminateEvent 事件作为终结事件 String epl1 = "create context OverLapping initiated " + initial + " terminated " + terminate; String epl2 = "context OverLapping select context.id from " + initial; String epl3 = "context OverLapping select * from " + some; admin.createEPL(epl1); EPStatement state = admin.createEPL(epl2); EPStatement state1 = admin.createEPL(epl3); // 注册监听 state.addListener((newEvents, oldEvents) -> { if (newEvents != null) { EventBean event = newEvents[0]; System.out.println("context.id:" + event.get("id") + ", id:" + event.get("id")); } }); state1.addListener((newEvents, oldEvents) -> { if (newEvents != null) { EventBean event = newEvents[0]; System.out.println("Class:" + event.getUnderlying().getClass().getName() + ", id:" + event.get("id")); } }); System.out.println("sendEvent: InitialEvent"); runtime.sendEvent(new InitialEvent()); System.out.println("sendEvent: SomeEvent"); runtime.sendEvent(new SomeEvent(2)); System.out.println("sendEvent: InitialEvent"); runtime.sendEvent(new InitialEvent()); System.out.println("sendEvent: TerminateEvent"); runtime.sendEvent(new TerminateEvent()); System.out.println("sendEvent: SomeEvent"); runtime.sendEvent(new SomeEvent(4)); }
执行结果
sendEvent: InitialEvent context.id:0, id:0 sendEvent: SomeEvent Class:com.esper.context_sample.overlapping.SomeEvent, id:2 sendEvent: InitialEvent context.id:1, id:1 sendEvent: TerminateEvent sendEvent: SomeEvent
从结果可以看得出来,每发送一个 InitialEvent
,都会新建一个 context
,以至于 context.id=0和1
。并且当发送 TerminateEvent
后,再发送 SomeEvent
监听器也不会被触发了。
另外,context.id
是每一种 Context
都会有的自带属性,而且针对 OverLapping
,还增加了 startTime
和 endTime
两种属性,表明 context
的开始时间和结束时间。
7、Context Condition
Context Condition
主要包含 Filter
,Pattern
,Crontab
以及 Time Period
Filter
是对属性值的过滤,比如:create context NewUser partition by id from User(id > 10)
Pattern
是复杂事件流的代表,比如:”A事件到达后跟着B事件到达” 这是一个完整的Pattern
。Crontab
是定时任务,主要用于NoOverLapping
,就像前面提到的(0, 9, *, *, *)
,括号里的五项代表(分,时,天,月,年)
。Time Period
在这里只有一种表达式,就是after time_period_expression
时间周期,举例如下。
-- 以 0秒 为时间初始点,新建一个 context,于 10秒 后开始,1 分钟后结束。下一个 context 从 1分20秒 开始 create context NonOverlap10SecFor1Min start after 10 seconds end after 1 minute
8、Context Nesting
Context
也可以嵌套,意义就是多个 Context
联合在一起组成一个大的 Context
,以满足复杂的限制需求。
语法
create context context_name context nested_context_name [as] nested_context_definition , context nested_context_name [as] nested_context_definition [, ...]
举例
create context NineToFiveSegmented context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *), context SegmentedByUser partition by userId from User
用法和普通的 Context
没区别。
另外针对嵌套 Context
,其自带的属性使用方式会有些变化。比如针对上面这个,若想查看 NineToFive
的 startTime
和 SegmentedByUser
的第一个属性值,要按照下面这样写:
context NineToFiveSegmented select context.NineToFive.startTime, context.SegmentedByUser.key1 from User
9、Output When Context Partition Ends
当 Context
销毁时,如果你想同时查看此时 Context
里的东西,那么Esper提供了一种办法来输出其内容。例如:
create context OverLapping initiated InitialEvent terminated TerminateEvent context OverLapping select * from User output snapshot when terminated
当终结事件发送到引擎后,会立刻输出 OverLapping
的快照。如果你想以固定的频率查看 Context
的内容,Esper也支持。例如:
-- 每两分钟输出OverLapping的事件 context OverLapping select * from User output snapshot every 2 minute
以上的内容算是包含了Context的所有方面,可能还有些细节需要各位自己去研读他的手册,并且多加练习。
补充一下 Context和
和 Group by
有什么区别?
其实如果只是很简单的用Context,那么确实没太大区别,无非是在Context下select可以不包含group by修饰的属性。但是Group by明显没有Context强大,很多复杂的分组Group by是没法做到的。不过在能达到同样效果的情况下,我还是建议使用Group by,毕竟Context的名字是不能重复的,而且在高并发的情况下Context会短时间锁住。至于原因,这已经是Esper的高级篇了,这里暂且不说。
未经允许请勿转载:程序喵 » Esper教程 —— Context上下文有哪些(4)