Esper教程 —— Context上下文有哪些(4)

Context 上下文

1、Content

语法

Context 基本语法

create context context_name partition [by] event_property [and event_property [and ...]] from stream_def   
[, event_property [...] from stream_def] [, ...]
  • context_name:上下文名字,唯一。

  • event_property:事件的属性名,多个属性名之间用 and逗号 连接。

  • stream_def:事件流的定义。

举例

create context NewUser partition by id and name from User
-- id 和 name 是 User 的属性

如果 context 包含多个流,例子如下:

-- sid 是 Student 的属性,tid 是 Teacher 的属性
create context Person partition by sid from Student, tid from Teacher

注意: 多个流中每个流的中用于 context 的属性的数量要一样,数据类型也要一致。比如下面 错误示例:

-- 错误:sid是int,tname是String,数据类型不一致
create context Person partition by sid from Student, tname from Teacher  

-- 错误:Student 有一个属性,Teacher 有两个属性,属性数量不一致  
create context Person partition by sid from Student, tid,tname from Teacher  

-- 错误:sid对应tname,sname对应tid,并且sname和tname是String,sid和tid是int,属性数量一样,但是对应的数据类型不一致  
create context Person partition by sid,sname from Student, tname,tid from Teacher

过滤

实际上可以对进入 context 的事件增加过滤条件,不符合条件的就被过滤掉,就像下面这样:

-- age 大于 20 的 Student 事件才能建立或者进入 context  
create context Person partition by sid from Student(age > 20)

partition by 后面的属性,就是作为 context 的一个约束,比如说 id,如果 id 相等的则进入同一个 context 里,如果 id 不同,那就新建一个 context。好比根据 id 分组,id 相同的会被分到一个组里,不同的会新建一个组并等待相同的进入。

如果 parition by 后面跟着同一个流的两个属性,那么必须两个属性值一样才能进入 context。比如说 A事件 id=1,name=a,那么会以 1和a 两个值建立 context,有点像数据库里的联合主键。然后 B事件id=1,name=b,则又会新建一个context。接着 C事件 id=1,name=a,那么会进入 A事件建立的context

如果 partition by 后面跟着两个流的一个属性,那么两个属性值一样才能进入 context。比如说 Student 事件 sid=1,那么会新建一个 context,然后来了个 Teacher 事件 tid=1,则会进入 sid=1 的那个context。多个流也一样,不用关心是什么事件,只用关心事件的属性值一样即可进入同一个context

2、Built-In Context Properties

Context 本身自带一些属性,最关键的是可以查看所创建的 context 的标识,并帮助我们理解context的语法。

UpdateListener-5.png

如上所示,name 表示 上下文名称,这个是不会变的。id 是每个上下文的唯一标识,从 0 开始。key1 和 keyN 表示上下文定义时所选择的属性的值,1 和 N 表示属性的位置。例如:

-- key1为sid,key2为sname
EPL: create context Person partition by sid, sname from Student

完整示例

为了说明对这几个属性的应用,举一个比较完整的例子。

1、定义事件模型

@Data
@AllArgsConstructor
public class Apple {
    private int id;
    private int price;
}

2、测试

public static void main(String[] args) {
    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
    EPAdministrator admin = epService.getEPAdministrator();
    EPRuntime runtime = epService.getEPRuntime();

    // 指定事件模型
    String apple = Apple.class.getName();
    // 创建 context
    String epl1 = "create context appleTest partition by id,price from " + apple;

    // context.id 针对不同的 apple 的 id,price 建立一个 context
    // 如果事件的id和price相同,则 context.id 也相同,即表明事件进入了同一个context
    String epl2 = "context appleTest select context.id,context.name,context.key1,context.key2 from " + apple;

    // 添加事后处理
    admin.createEPL(epl1);
    EPStatement state = admin.createEPL(epl2);

     // 注册监听
    state.addListener((newEvents, oldEvents) -> {
        if (newEvents != null) {
            EventBean event = newEvents[0];
            System.out.println("context.name:" + event.get("name") + ",context.id:" + event.get("id") + ",context.key1:" + event.get("key1") + ",context.key2:" + event.get("key2"));
        }
    });

    // 模拟事件发生
    System.out.println("sendEvent: id=1, price=20");
    runtime.sendEvent(new Apple(1, 20));

    System.out.println("sendEvent: id=2, price=30");
    runtime.sendEvent(new Apple(2, 30));

    System.out.println("sendEvent: id=1, price=20");
    runtime.sendEvent(new Apple(1, 20));

    System.out.println("sendEvent: id=4, price=20");
    runtime.sendEvent(new Apple(4, 20));
}

执行结果:

sendEvent: id=1, price=20
context.name:appleTest,context.id:0,context.key1:1,context.key2:20
sendEvent: id=2, price=30
context.name:appleTest,context.id:1,context.key1:2,context.key2:30
sendEvent: id=1, price=20
context.name:appleTest,context.id:0,context.key1:1,context.key2:20
sendEvent: id=4, price=20
context.name:appleTest,context.id:2,context.key1:4,context.key2:20

这个例子说得比较明白,针对不同的 id和price,都会新建一个 context,并 context.id 会从 0 开始增加作为其标识。如果 id和price 一样,事件就会进入之前已经存在的 context,所以 e3事件就和 e1事件一样存在于 context.id=0 的 context 里面。

其他示例

-- context 定义  
create context test partition by id from Apple  
  
-- 每当 5 个 id 相同的 Apple 事件进入时,统计 price 总和  
context test select sum(price) from Apple.win:length_batch(5)  
  
-- 根据不同的id,统计 3 秒内进入的事件的平均 price,且 price 必须大于10  
context test select avg(price) from Apple(price>10).win:time(3 sec)

关于 .win:length 或者 .win:time,可以理解为一堆事件,后面会介绍。

3、Hash Context

前面介绍的Context语法是以事件属性来定义的,Esper 提供了以 Hash 值为标准定义 Context,通俗一点说就是提供事件属性参与 hash 值的计算,计算的值再对某个值是同余的则进入到同一个 context 中。

语法

create context context_name coalesce [by]  
hash_func_name(hash_func_param) from stream_def  
[, hash_func_name(hash_func_param) from stream_def ]  
[, ...]  
granularity granularity_value  
[preallocate]

--

create context 上下文名称 coalesce [by] 
hash函数名称(hash函数参数) from 事件类型  
[, hash函数名称(hash函数参数) from 事件类型 ] 
[, ...]  
granularity granularity_value
[preallocate]
  • hash_func_name 为 hash 函数的名称,Esper提供了 CRC32 或者使用Java的 hashcode 函数来计算 hash 值,分别为 consistent_hash_crc32 和 hash_code。你也可以自己定义hash函数,不过这需要配置。

  • hash_func_param 为参与计算的属性列表,比如之前的 sid 或者 tname 属性。

  • stream_def 就是事件类型,可以是多个。不同于前面的 Context 语法要求,Hash Context 不管有多个少属性作为基础来计算hash值,hash值都只有一个,并且为 int 型。所以就不用关心这些属性的个数以及数据类型了。

  • granularity 是必选参数,表示为最多能创建多少个 context

  • granularity_value 就是那个用于取余的“某个值”,因为Esper为了防止内存溢出,就想出了取余这种办法来限制 context 创建的数量。也就是说 context.id=hash函数名称(hash函数参数) % granularity_value

  • preallocate 是一个可选参数,如果使用它,那么Esper会预分配空间来创建 granularity_value 数量的 context。比如说 granularity_value 为 1024,那么Esper会预创建 1024个context。内存不大的话不建议使用这个参数。

过滤条件

Hash Context 同样可以过滤事件,举个完整的例子:

-- 以 Java 的 hashcode 方法 计算 sid 的值(sid必须大于5)
-- 以 CRC32 算法计算 tid 的值,然后对 10 取余后的值来建立 context  
create context HashPerson coalesce by hash_code(sid) from Student(sid>5), consistent_hash_crc32(tid) from Teacher granularity 10

Hash Context 也有 Built-In Context Properties,只不过只有context.id和context.name了。用法和前面说的一样。

小贴士:

  • 如果用于 hash 计算的属性比较多,那么就不建议使用 CRC32 算法了,因为他会把这些属性值先序列化字节数组以后才能计算 hash 值。hashcode 方法相对它能快很多。

  • 如果使用 preallocate 参数,建议 granularity_value 不要超过 1000

  • 如果 granularity_value 超过 65536,引擎查找context会比较费劲,进而影响计算速度。

4、Category Context

Category Context 相对之前的 Context 和 Hash Context 要简单许多,语法说明如下

语法

create context context_name  
group [by] group_expression as category_label  
[, group [by] group_expression as category_label]  
[, ...]  
from stream_def
  • group_expression 表示分组策略的表达式。

  • category_label 为策略定义一个名字。

一个 context 可以有多个策略同时存在,但是特殊的是之能有一个 stream_def

例如:

create context CategoryByTemp  
group temp < 5 as cold, group temp between 5 and 85 as normal, group temp > 85 as large  
from Temperature

Category Context 也有它自带的属性。

CategoryContext.png

label 指明进入的事件所处的 group 是什么。

完整示例

完整例子如下:

1、定义事件模型

@Data
@AllArgsConstructor
public class Apple {
    private int id;
    private int price;
}

2、测试

public static void main(String[] args) {
    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
    EPAdministrator admin = epService.getEPAdministrator();
    EPRuntime runtime = epService.getEPRuntime();

    String apple = Apple.class.getName();
    String epl1 = "create context appleTest group by id<0 as low, group by id>0 and id<10 as middle, group by id>10 as high from " + apple;
    String epl2 = "context appleTest select context.id, context.name, context.label, price from " + apple;

    admin.createEPL(epl1);
    EPStatement state = admin.createEPL(epl2);
    
    // 注册监听
    state.addListener((newEvents, oldEvents) -> {
        if (newEvents != null) {
            EventBean event = newEvents[0];
            System.out.println("context.name:" + event.get("name") + ",context.id:" + event.get("id") + ",context.label:" + event.get("label"));
        }
    });

    System.out.println("sendEvent: id=1, price=20");
    runtime.sendEvent(new Apple(1, 20));

    System.out.println("sendEvent: id=0, price=30");
    runtime.sendEvent(new Apple(0, 30));

    System.out.println("sendEvent: id=11, price=20");
    runtime.sendEvent(new Apple(11, 20));

    System.out.println("sendEvent: id=-1, price=40");
    runtime.sendEvent(new Apple(-1, 40));
}

执行结果

sendEvent: id=1, price=20
context.name:appleTest,context.id:1,context.label:middle
sendEvent: id=0, price=30
sendEvent: id=11, price=20
context.name:appleTest,context.id:2,context.label:high
sendEvent: id=-1, price=40
context.name:appleTest,context.id:0,context.label:low

可以发现,id=0 的事件,并没有触发监听器,那是因为 context 里的三个 category 没有包含 id=0 的情况,所以这个事件就被排除掉了。

5、Non-Overlapping Context

这类Context有个特点,是由开始和结束两个条件构成context。语法如下:

语法

create context context_name 
start start_condition 
end end_condition

这个 context 有两个条件做限制,形成一个约束范围。当开始条件和结束条件都没被触发时,引擎会观察事件的进入是否会触发开始条件。如果开始条件被触发了,那么就新建一个 context,并且观察结束条件是否被触发。如果结束条件被触发,那么 context 结束,引擎继续观察开始条件何时被触发。所以说这类 Context 的另一个特点是,要么 context 存在并且只有一个,要么条件都没被触发,也就一个 context 都没有了。

start_condition 和 end_condition 可以是时间,或者是事件类型。比如说:

-- 9点到17点此context才可用(以引擎的时间为准)。如果事件进入的时间不在此范围内,则不受该context影响
create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)

完整示例

以某类事件开始,以某类事件结束。

1、定义事件模型

/**
 * 开始事件
 */
public class StartEvent {
}

/**
 * 结束事件
 */
public class EndEvent {
}

/**
 * 其他事件
 */
@Getter
@AllArgsConstructor
public class OtherEvent {
    private int id;
}

2、测试类

public static void main(String[] args) {
    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
    EPAdministrator admin = epService.getEPAdministrator();
    EPRuntime runtime = epService.getEPRuntime();

    String start = StartEvent.class.getName();
    String end = EndEvent.class.getName();
    String other = OtherEvent.class.getName();

    // 以StartEvent事件作为开始条件,EndEvent事件作为结束条件
    String epl1 = "create context NoOverLapping start " + start + " end " + end;
    String epl2 = "context NoOverLapping select * from " + other;

    admin.createEPL(epl1);
    EPStatement state = admin.createEPL(epl2);
    
    // 注册监听
    state.addListener((newEvents, oldEvents) -> {
        EventBean event = newEvents[0];
        System.out.println("Class:" + event.getUnderlying().getClass().getName() + ", id:" + event.get("id"));
    });

    System.out.println("sendEvent: StartEvent");
    runtime.sendEvent(new StartEvent());

    System.out.println("sendEvent: OtherEvent");
    runtime.sendEvent(new OtherEvent(2));

    System.out.println("sendEvent: EndEvent");
    runtime.sendEvent(new EndEvent());

    System.out.println("sendEvent: OtherEvent");
    runtime.sendEvent(new OtherEvent(4));
}

执行结果

sendEvent: StartEvent
sendEvent: OtherEvent
Class:com.esper.non_overlapping_context.OtherEvent, id:2
sendEvent: EndEvent
sendEvent: OtherEvent

由此看出,在 NoOverLapping Context 下监控 OtherEvent,必须是在 StartEvent 被触发才能监控到,所以在 EndEvent 发送后,再发送一个 OtherEvent 是不会触发 Listener 的。

6、OverLapping

OverLapping 和 NoOverLapping 一样都有两个条件限制,但是区别在于 OverLapping 的初始条件可以被触发多次,并且只要被触发就会新建一个 context,但是当终结条件被触发时,之前建立的所有 context 都会被销毁。

语法

create context context_name 
initiated [by] initiating_condition 
terminated [by] terminating_condition

initiating_condition 和 terminating_condition 可以为事件类型,事件或者别的条件表达式。

完整示例

1、定义事件模型

/**
 * 初始事件
 */
public class InitialEvent {
}

/**
 * 终止事件
 */
public class TerminateEvent {
}

/**
 * 其他事件
 */
@Getter
@AllArgsConstructor
public class SomeEvent {
    private int id;
}

2、测试类

public static void main(String[] args) {
    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
    EPAdministrator admin = epService.getEPAdministrator();
    EPRuntime runtime = epService.getEPRuntime();

    String initial = InitialEvent.class.getName();
    String terminate = TerminateEvent.class.getName();
    String some = SomeEvent.class.getName();
    // 以 InitialEvent 事件作为初始事件,TerminateEvent 事件作为终结事件
    String epl1 = "create context OverLapping initiated " + initial + " terminated " + terminate;
    String epl2 = "context OverLapping select context.id from " + initial;
    String epl3 = "context OverLapping select * from " + some;

    admin.createEPL(epl1);
    EPStatement state = admin.createEPL(epl2);
    EPStatement state1 = admin.createEPL(epl3);

    // 注册监听
    state.addListener((newEvents, oldEvents) -> {
        if (newEvents != null) {
            EventBean event = newEvents[0];
            System.out.println("context.id:" + event.get("id") + ", id:" + event.get("id"));
        }
    });
    state1.addListener((newEvents, oldEvents) -> {
        if (newEvents != null) {
            EventBean event = newEvents[0];
            System.out.println("Class:" + event.getUnderlying().getClass().getName() + ", id:" + event.get("id"));
        }
    });

    System.out.println("sendEvent: InitialEvent");
    runtime.sendEvent(new InitialEvent());

    System.out.println("sendEvent: SomeEvent");
    runtime.sendEvent(new SomeEvent(2));

    System.out.println("sendEvent: InitialEvent");
    runtime.sendEvent(new InitialEvent());

    System.out.println("sendEvent: TerminateEvent");
    runtime.sendEvent(new TerminateEvent());

    System.out.println("sendEvent: SomeEvent");
    runtime.sendEvent(new SomeEvent(4));
}

执行结果

sendEvent: InitialEvent
context.id:0, id:0
sendEvent: SomeEvent
Class:com.esper.context_sample.overlapping.SomeEvent, id:2
sendEvent: InitialEvent
context.id:1, id:1
sendEvent: TerminateEvent
sendEvent: SomeEvent

从结果可以看得出来,每发送一个 InitialEvent,都会新建一个 context,以至于 context.id=0和1。并且当发送 TerminateEvent 后,再发送 SomeEvent 监听器也不会被触发了。

另外,context.id 是每一种 Context 都会有的自带属性,而且针对 OverLapping,还增加了 startTime 和 endTime 两种属性,表明 context 的开始时间和结束时间。

7、Context Condition

Context Condition 主要包含 FilterPatternCrontab 以及 Time Period

  • Filter 是对属性值的过滤,比如:create context NewUser partition by id from User(id > 10)

  • Pattern 是复杂事件流的代表,比如:”A事件到达后跟着B事件到达” 这是一个完整的 Pattern

  • Crontab 是定时任务,主要用于 NoOverLapping,就像前面提到的(0, 9, *, *, *),括号里的五项代表(分,时,天,月,年)

  • Time Period 在这里只有一种表达式,就是 after time_period_expression 时间周期,举例如下。

-- 以 0秒 为时间初始点,新建一个 context,于 10秒 后开始,1 分钟后结束。下一个 context 从 1分20秒 开始  
create context NonOverlap10SecFor1Min start after 10 seconds end after 1 minute

8、Context Nesting

Context 也可以嵌套,意义就是多个 Context 联合在一起组成一个大的 Context,以满足复杂的限制需求。

语法

create context context_name
context nested_context_name [as] nested_context_definition ,
context nested_context_name [as] nested_context_definition [, ...]

举例

create context NineToFiveSegmented
context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *),
context SegmentedByUser partition by userId from User

用法和普通的 Context 没区别。

另外针对嵌套 Context,其自带的属性使用方式会有些变化。比如针对上面这个,若想查看 NineToFive 的 startTime 和 SegmentedByUser 的第一个属性值,要按照下面这样写:

context NineToFiveSegmented select
context.NineToFive.startTime,
context.SegmentedByUser.key1
from User

9、Output When Context Partition Ends

当 Context 销毁时,如果你想同时查看此时 Context 里的东西,那么Esper提供了一种办法来输出其内容。例如:

create context OverLapping initiated InitialEvent terminated TerminateEvent  
context OverLapping select * from User output snapshot when terminated

当终结事件发送到引擎后,会立刻输出 OverLapping 的快照。如果你想以固定的频率查看 Context 的内容,Esper也支持。例如:

-- 每两分钟输出OverLapping的事件  
context OverLapping select * from User output snapshot every 2 minute

以上的内容算是包含了Context的所有方面,可能还有些细节需要各位自己去研读他的手册,并且多加练习。

补充一下 Context和 和 Group by 有什么区别?

其实如果只是很简单的用Context,那么确实没太大区别,无非是在Context下select可以不包含group by修饰的属性。但是Group by明显没有Context强大,很多复杂的分组Group by是没法做到的。不过在能达到同样效果的情况下,我还是建议使用Group by,毕竟Context的名字是不能重复的,而且在高并发的情况下Context会短时间锁住。至于原因,这已经是Esper的高级篇了,这里暂且不说。


未经允许请勿转载:程序喵 » Esper教程 —— Context上下文有哪些(4)

点  赞 (0) 打  赏
分享到: