Esper教程 —— EPL 语法讲解(5)

EPL 语法讲解

EPL 全称 Event Processing Language,是一种类似SQL的语言,包含了 SELECTFROMWHEREGROUP BYHAVING 和 ORDER BY 子句,同时用事件流代替了 table 作为数据源,并且能像SQL那样 joinfiltering 和 aggregation。所以如果各位有SQL基础的话,简单的EPL很容易掌握。

除了 select,EPL 也有 insert intoupdatedelete,不过含义和 SQL 并不是很接近。另外还有 pattern 和 output 子句,这两个是SQL所没有的。

EPL 还定义了一个叫 view 的东西,类似 SQL 的 table,来决定哪些数据是可用的,Esper 提供了十多个 view,并且保证这些 view 可以被重复使用。而且用户还可以扩展 view 成为自定义 view 来满足需求。在 view 的基础上,EPL还提供了 named window 的定义,作用和 view 类似,但是更加灵活。。。。

1、基本语法

[annotations]
[expression_declarations]
[context context_name]
[insert into insert_into_def]
select select_list
from stream_def [as name] [, stream_def [as name]] [,...]
[where search_conditions]
[group by grouping_expression_list]
[having grouping_search_conditions]
[output output_specification]
[order by order_by_expression_list]
[limit num_rows]

以上大部分的 EPL 都是按照这个格式来定义。

2、Time Periods 时间表达式

Esper 时间的表达形式。语法如下:

time-period : [year-part] [month-part] [week-part] [day-part] [hour-part] [minute-part] [seconds-part] [milliseconds-part]  
year-part : (number|variable_name) ("years" | "year")  
month-part : (number|variable_name) ("months" | "month")  
week-part : (number|variable_name) ("weeks" | "week")  
day-part : (number|variable_name) ("days" | "day")  
hour-part : (number|variable_name) ("hours" | "hour")  
minute-part : (number|variable_name) ("minutes" | "minute" | "min")  
seconds-part : (number|variable_name) ("seconds" | "second" | "sec")  
milliseconds-part : (number|variable_name) ("milliseconds" | "millisecond" | "msec")

举几个例子说明下:

-- 计算过去的 5分3秒 中进入改语句的 Fruit 事件的平均 price
select avg(price) from Fruit.win:time(5 minute 3 sec)
  
-- 每一天输出一次用户的账户总额
select sum(account) from User output every 1 day

用法比较简单。要注意的是,Esper 规定每月的天数都是 30天,所以对准确性要求高的业务,以月为单位进行计算会出现误差的。

3、Comments 注释

和Java差不多,只不过只有 /* */ 和 // 两种。

  • //:只能单行注释,

  • /* */:可以多行注释。

示例如下:

(1)单行注释

// This comment extends to the end of the line.  
// Two forward slashes with no whitespace between them begin such comments.  
select * from MyEvent

(2)多行注释

/* this is a very important Event */
select * from OtherEvent

(3)混合注释

select field1 // first comment
/* second comment */ field2 from MyEvent

4、Reserved Keywords 保留关键字

EPL 里如果某个事件属性或者事件流的名称和EPL的关键字一样,那么必须要以 ` 括起来才可用,在键盘上esc的下面,1的左边。比如:

-- 无效示例:insert和Order是关键字
select insert from Order  

-- 有效示例:`insert`和`Order`是属性名称和事件流名称
select `insert` from `Order`

5、Escaping Strings 转义字符

在EPL中,字符串使用单引号或者双引号括起来的,那如果字符串里包含有单引号或者双引号怎么办呢。请看例子:

select * from OrderEvent(name='John')
-- 等同于
select * from OrderEvent(name="John")

如果 name=John's,则就需要反斜杠进行转义了。

select * from OrderEvent(name="John\"s")
-- 或者
select * from OrderEvent(name='john\'s')
-- 或者
select * from OrderEvent(name='john"s')

除了使用反斜杠,还可以使用 unicode 来表示单引号和双引号。

select * from OrderEvent(name="John\u0022s")  
-- 或者  
select * from OrderEvent(name='john\u0027s')

注意在Java编写EPL的时候,反斜杠和无含义的双引号还得转义,不然会和String的双引号冲突。比如

epService.getEPAdministrator().createEPL("select * from OrderEvent(name='John\\'s')");  
// ... and for double quotes...  
epService.getEPAdministrator().createEPL("select * from OrderEvent(name=\"Quote \\\"Hello\\\"\")");

6、Data Types 数据类型

EPL支持Java所有的数值数据类型,包括基本类型及其包装类,同时还支持 java.math.BigInteger 和 java.math.BigDecimal,并且能自动转换数据类型不丢失精度(比如short转intint转short则不行)。如果想在EPL内进行数据转换,可以使用 cast函数。完整例子如下。

1、定义事件模型

@Data
@AllArgsConstructor
public class Banana {
    private int price;
}

2、测试类

public static void main(String[] args) {
    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
    EPAdministrator admin = epService.getEPAdministrator();

    String banana = Banana.class.getName();
    String epl1 = "select cast(avg(price),int) as price from " + banana + ".win:length_batch(2)";
    String epl2 = "select avg(price) from " + banana + ".win:length_batch(2)";

    EPStatement state1 = admin.createEPL(epl1);
    state1.addListener((newEvents, oldEvents) -> {
        if (newEvents != null) {
            EventBean event = newEvents[0];
            System.out.println("平均价格: " + event.get("price") + ",数据类型:" + event.get("price").getClass().getName());
        }
    });

    EPStatement state2 = admin.createEPL(epl2);
    state2.addListener((newEvents, oldEvents) -> {
        if (newEvents != null) {
            EventBean event = newEvents[0];
            System.out.println("平均价格: " + event.get("avg(price)") + ",数据类型:" + event.get("avg(price)").getClass().getName());
        }
    });

    EPRuntime runtime = epService.getEPRuntime();
    runtime.sendEvent(new Banana(1));
    runtime.sendEvent(new Banana(2));
}

执行结果

平均价格: 1,数据类型:java.lang.Integer
平均价格: 1.5,数据类型:java.lang.Double

要提醒的是,如果某个数除以0,那么默认会返回正无穷大或者负无穷大,不过可以配置这个结果,比如用null来代替。

7、Annotation

EPL 也可以写注解,种类不多,大部分简单而有效。不过有些注解内容较多,以后会在具体的使用场景进行详细讲解。首先来了解下注解的语法。

// 不包含参数或者单个参数的注解
@annotation_name [(annotation_parameters)]

// 包含多个属性名-值对的注解
@annotation_name (attribute_name = attribute_value, [name=value, ...])

// 多个注解联合使用
@annotation_name [(annotation_parameters)] [@annotation_name [(annotation_parameters)]] [...]

讲解具体注解时,结合语法进行说明

  • @Name:指定EPL的名称,参数只有一个。例如:@Name("MyEPL")

  • @Description:对EPL进行描述,参数只有一个。例如:@Description("This is MyEPL")

  • @Tag:对EPL进行额外的说明,参数有两个,分别为 Tag的名称和Tag的值,用逗号分隔。例如:@Tag(name="author",value="luonanqin")

  • @Priority:指定EPL的优先级,参数只有一个,并且整数(可负可正)。例如:@Priority(10)

  • @Drop:指定事件经过此EPL后不再参与其他的EPL计算,该注解无参数。

  • @Hint:为EPL加上某些标记,让引擎对此EPL产生其他的操作,会改变EPL实例的内存占用,但通常不会改变输出。其参数固定,由Esper提供。

  • @Audit:EPL添加此注解后,可以额外输出EPL运行情况,有点类似日志的感觉。

  • @Hook:与SQL相关。

  • @EventRepresentation:这是用来指定EPL产生的计算结果事件包含的数据形式。参数只有一个(数据形式为Map,默认值)

示例代码:

1、定义事件模型

@AllArgsConstructor
@Getter
public class Apple {
    private int price;
}

2、测试

public static void main(String[] args) {
    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
    EPAdministrator admin = epService.getEPAdministrator();

    String apple = Apple.class.getName();
    String epl1 = "@Name('AppleTest')" +
            "@Description('获取两个请求的总价')" +
            "@Tag(name='author',value='程序喵')" +
            "@Priority(10)" +
            "@Audit" +
            "@EventRepresentation(value=objectArray)" +
            "select sum(price) as price from " + apple + ".win:length_batch(2)";
    String epl2 = "select avg(price) as price from " + apple + ".win:length_batch(2)";
    String epl3 = "@Drop select sum(price) as price from " + apple + ".win:length_batch(2)";

    // 注册监听
    UpdateListener listenenr = (newEvents, oldEvents) -> {
        if (newEvents != null) {
            EventBean event = newEvents[0];
            System.out.println("价格: " + event.get("price") + ", 事件类型:" + event.getEventType().getUnderlyingType());
        }
    };

    EPStatement state1 = admin.createEPL(epl1);
    EPStatement state2 = admin.createEPL(epl2);
    EPStatement state3 = admin.createEPL(epl3);
    state1.addListener(listenenr);
    state2.addListener(listenenr);
    state3.addListener(listenenr);
    System.out.println("epl1 名字是 " + state1.getName());

    EPRuntime runtime = epService.getEPRuntime();
    runtime.sendEvent(new Apple(1));
    runtime.sendEvent(new Apple(2));
}

执行结果

epl1 名字是 AppleTest
价格: 3, 事件类型:class [Ljava.lang.Object;
价格: 1.5, 事件类型:interface java.util.Map
价格: 3, 事件类型:interface java.util.Map

可以发现,@Name 和 @EventRepresentation 都起效果了,但是 @Priority 和 @Drop 没用,那是因为这两个是要配置才能生效的。以后讲 Configuration 的时候会说到。

注意点:大多网上资料都是给 @EventRepresentation 设置 array=true(数组) 或 array=false(Map)。但其实 7.1.0 版本中并不是,会有如下错误。

Caused by: com.espertech.esper.epl.annotation.AnnotationException: Annotation 'EventRepresentation' requires a EventUnderlyingType-typed value for attribute 'value' but received a Boolean-typed value
	at com.espertech.esper.epl.annotation.AnnotationUtil.getFinalValue(AnnotationUtil.java:221)
	at com.espertech.esper.epl.annotation.AnnotationUtil.createProxy(AnnotationUtil.java:150)
	at com.espertech.esper.epl.annotation.AnnotationUtil.compileAnnotations(AnnotationUtil.java:101)
	at com.espertech.esper.epl.annotation.AnnotationUtil.compileAnnotations(AnnotationUtil.java:78)
	... 5 more

打开源码:看得出 EventRepresentation 返回了一个 EventUnderlyingType 枚举类型值,而 EventUnderlyingType 有三个值。

public @interface EventRepresentation {
    EventUnderlyingType value();
}

public enum EventUnderlyingType {
    /**
     * Event representation is object-array (Object[]).
     */
    OBJECTARRAY,

    /**
     * Event representation is Map (any java.util.Map interface implementation).
     */
    MAP,

    /**
     * Event representation is Avro (GenericData.Record).
     */
    AVRO;

如下代码,三种配置结果是相同的,不区分大小写。

@EventRepresentation(map)
@EventRepresentation(value=map)
@EventRepresentation(value=Map)

8、Expression 表达式

Expression 类似自定义函数,通常用 Lambda 表达式来建立的(也有其他方式)与 Java Lambda 表达式类似,用 “=>” 符号,表示“gose to”。符号的左边表示输入参数,符号右边表示计算过程,计算结果就是这个表达式的返回值。

语法如下:

expression expression_name { expression_body }
  • expression:是关键字

  • expression_name:为表达式名称(唯一)

  • expression_body:表达式具体内容,空格只是为了看着清晰,不影响结果。

expression_body: (input_param [,input_param [,...]]) => expression
  • input_param:必须为事件流的别名,注意不是事件流名称,多个参数用逗号分隔,并用圆括号括起来。

举例如下:

expression middle { x => (x.max+x.min)/2 } select middle(apple) from Apple as apple

执行结果就是表达式 middle的计算结果,参数 x 表示输入,而 x.max 和 x.min 都是 x 代表的事件流的属性,如果事件流没这个属性,那么表达式定义就是错误的。

对于多个参数的 expression定义,例子如下:

expression sumage { (x,y) => x.age+y.age } select sumage(me,you) from Me as me, You as you

要是两个age的数据类型不一样是什么结果呢?

expression_body 除了可以用 lambda 表达式之外,还可以用聚合函数,变量,常量,子查询语句,甚至另一个表达式。子查询语句在没有 in或者exist 关键字的情况下,需要圆括号括起来。示例如下:

expression newsSubq(md) {  
    (select sentiment from NewsEvent.std:unique(symbol) where symbol = md.symbol)
}
select newsSubq(mdstream) from MarketDataEvent mdstream

针对变量和常量的示例如下:

expression twoPI { Math.PI * 2} select twoPI() from SampleEvent

对于 expression 里用另一个 expression,EPL不允许在一个句子里建立两个expression,所以就出现了 Global-Expression。普通的 expression 只作用于定义它的epl,如上面所有的包含 select 子句的epl就是如此。

Global-Expression 的语法如下:

create expression expression_name { expression_body }

和普通的 expression 相比,就是多了个 create,不过他不能和别的子句放在一起,即他是单独执行的。例如:

epService.getEPAdministrator().createEPL("create expression avgPrice { x => (x.fist+x.last)/2 }");

expression 里用另一个 expression示例

-- 先定义全局的avgPrice
create expression avgPrice { x => (x.fist+x.last)/2 }
  
-- bananaPrice Banana 事件中包含了 first 和 last 属性,否则将报错  
expression bananaPrice{ x => avgPrice(x) } select bananaPrice(b) from Banana as b

9、Select Clause 条件选择

(1)查询事件流的所有属性、特定属性

EPL 的 select 和 SQL 相近,SQL用 * 表示查询表的所有字段,而EPL用 * 表示查询事件流的所有属性值。另外 EPL 查询属性也可以设置别名。示例如下:

-- EPL:查询完整的 User 对象
select * from User

-- 获取 User 对象
User u = newEvent.getUnderlying();
  
-- EPL:查询 User 的 name和id,id别名为i
select name, id as i from User

-- 获取 name和id
String name = (String)newEvent.get("name");  
int id = (Integer)newEvent.get("i");

这里要注意,如果查询的是一个完整对象,需要调用 getUnderlying() 方法,而 get 方法是针对确定的属性名或者别名。

(2)表达式

EPL还支持属性值的计算,以计算后的值作为结果返回。这个计算的式子就是表达式。除了简单的加减乘除,还可以利用事件流对象的某个方法。

如下示例:

@Data
public class Rectangle {
    private int length;
    private int width;
  
    // 外部传入参数计算面积
    public int getArea(int l, int w) {
        return l*w;
    }

    // 计算该对象的面积
    public int getArea() {
        return length * width;
    }
}

// 计算长方形的面积(长乘以宽)
select length * width as area from Rectangle

// 计算长方形的面积(长乘以宽)
select r.getArea(r.length,r.width) as area from Rectangle as r
select r.getArea() as area from Rectangle as r

如上所示,一个方法需要传参,另一个方法不需要,但是他会利用当前事件的 length 和 width 来计算面积。而且要注意的是事件流需要设置别名才能使用其方法,如:r.getArea()

如果 Rectangle 类里没有计算面积的方法,但是提供了一个专门计算面积的静态方法,表达式也可以直接引用。不过要事先加载这个包含方法的类。例如:

// 该类用于计算面积  
public class ComputeArea {

    public static int getArea(int length, int width) {
        return length * width;
    }
}

// 加载
epService.getEPAdministrator().getConfiguration().addImport(ComputeArea.class);

// 调用 ComputeArea 的 getArea 方法计算面积
select ComputeArea.getArea(length,width) from Rectangle

注意一定要是静态方法,不然没有实例化是没法引用的。

(3)多事件流的查询

和SQL类似,EPL 也可以同时对多个事件流进行查询,即 join,但是必须对每个事件流设置别名。例如:

// 当 老师的id 和 学生的id 相同时,查询学生的姓名和老师的姓名
select s.name, t.name from Student.win:time(10) as s, Teacher.win:time(10) as t where s.id=t.id

如果想查询 Student 或者 Teacher,则EPL改写如下:

select s.* as st, t.* as tr from Student.win:time(10) as s, Teacher.win:time(10) as t where s.id=t.id

关于 join,下面会介绍到。

(4)insert 和 remove事件流

Esper 对于事件流分输入和移出两种,分别对应监听器的两个参数 newEvents 和 oldEventsnewEvents 通常对应事件的计算结果,oldEvents 可以理解过上一次计算结果。

默认情况下,Esper 认为你只想让 newEvents 触发监听器,即 istream(insert stream)。如果想让 oldEvents 触发监听器,那么为 rstream(remove stream)。如果两个都想,那么为 irstream

select rstream * from User

如果使用了该参数,则会将上一次计算结果放入 newEvents 内,而不是 oldEvents。并且无法获得当前的计算结果。

select irstream * from User

如果使用了该参数,则会将当前的计算结果放入 newEvents 内,上一次的计算结果放入 oldEvents 内。

select istream * from User
-- 等同于
select * from User

如果使用了该参数,则会将当前的计算结果放入 newEvents 内,并且无法获得上一次的计算结果。同时该参数也是默认参数,可不写。

(5)Distinct

distinct 的用法和SQL一样,放在需要修饰的属性或者 * 前即可。例如:

select distinct * from User.win:time(3 sec)

(6)查询指定引擎的处理结果

除了上述所说的一些特点外,select还可以针对某个引擎进行查询。因为引擎都有自己的URI,所以可以在select句子中增加URI标识来指定查询哪一个引擎的事件处理情况。例如:

-- 引擎URI为Processor  
select Processor.MyEvent.myProperty from Processor.MyEvent

10、From Clause 条件选择

(1)语法介绍

From 的语法不难,主要内容是针对事件流的处理。包括事件流过滤,事件流的维持等等。语法如下:

from stream_def [as name] [unidirectional] [retain-union | retain-intersection] [, stream_def [as stream_name]] [, ...]

-- 事件流
event_stream_name [(filter_criteria)] [contained_selection] [.view_spec] [.view_spec] [...]

unidirectionalretain-unionretain-intersectioncontained_selectionview_spec 这几个关键字因为涉及到 view 的知识,后面会介绍道。学完 view 之后再来回顾这几个参数会很容易理解的。

下面讲讲怎么过滤事件流

(2)事件流过滤

2.1 过滤属性

事件流过滤通常情况都是对其中某个或多个属性加以限制来达到过滤的目的。注意,过滤表达式是紧跟在事件流名称之后而不是别名之后。例如:

--  只有 age大于10 的 User 对象才可查询到 name 值
select name from User(age>10) as user

-- 当 name=“tingfeng” 时,可获得其 age 值
select age from User(name="tingfeng")
  
-- 错误写法
select name from User as user(age>10)

过滤表达式写法多种多样,可以用符号,又或者使用and,or,between等逻辑语言。例如:

-- 查询年龄大于15小于18的学生的姓名
select name from Student(age between 15 and 18)

-- 等同于
select name from Student(age >= 15 and age <= 18)

--  等同于
select name from Student(age >= 15, age <= 18)

看以看到,过滤表达式写法很多,并且多个表达式同时作用于一个事件流,用逗号连接即可。如果说满足其中一个条件即可,则需要用 or 连接。

2.2 过滤范围

过滤表达式使用的符号很多,总结下来基本上有 <><=>==!=betweeninnot inandor[ ]( )

这里主要说下 betweenin( )[ ]

  • between……and……:和SQL语法一样,是一个闭区间。比如说between 10 and 15,表示10到15之间,并且包含10和15。

  • ( ):表示一个开区间,语法为 (low:high),如 (10:15),表示10到15之间,并且不包含10和15。

  • [ ]:表示一个闭区间,语法为[low:high]。如[10:15],表示10到15之间,并且包含10和15。

  • ( )和[ ]:可以混合用。比如[10:15)或者(10:15]。

  • in:配合 ( ) 和 [ ] 进行使用,表示值在某个范围内。

  • not in:表示不在此范围内。

in 和 not in 不止可以作用于数字,它还可以作用于字符串。举例

select name from User(age in [10:15))
select age from User(name in ('张三', '李四'))
2.3 静态方法过滤

除了上面说的这些符号以外,类似于select子句中使用的静态方法,过滤表达式中也可以使用,但是返回值必须为布尔值,不然会报错。例如:

// 判断总数是否等于0  
public class IsZero {
    public static boolean isZero(int sum) {
        return sum==0;
    }
}

// 加载
epService.getEPAdministrator().getConfiguration().addImport(IsZero.class);

// 查询没有钱的用户的 name 值(User包含name和money属性)
select name from User(IsZero.isZero(money))

事件流的过滤并不能弄得很复杂,他有一下几个限制:

  • 要过滤的属性只能是数字和字符串。

  • 过滤表达式中不能使用聚合函数。

11、Aggregation 聚合函数

和SQL一样,EPL也有 Aggregation,即聚合函数。语法如下:

aggregate_function([all|distinct] expression)
  • aggregate_function:聚合函数的名字,比如 avgsum 等。

  • expression 通常是事件流的某个属性,也可以是不同事件流的多个属性,或者是属性和常量、函数之间的运算。举例如下。

-- 查询最新 5秒 的 Apple 的平均价格
select avg(price) as aPrice from Apple.win:time(5 sec)

-- 查询最新 10个 Apple 的价格总和的两倍
select sum(price*2) as sPrice from Apple.win:length(10)

-- 查询最新 10个 Apple 的价格,并用函数计算后再算平均值
select avg(Compute.getResult(price)) from Apple.win:length(10)

函数只能是静态方法,普通方法不可用。即使是事件流里包含的静态方法,也必须用“类名.方法名”的方式进行引用。

可以使用 distinct 关键字对 expression 加以约束,表示去掉表达式重复的值。默认情况下为 all 关键字,即所有的值都参与聚合运算。例如:

-- 查询最新 5秒 的 Apple 的平均价格  
select avg(distinct price) as aPrice from Apple.win:time(5 sec)

假如:5秒内进入了三个Apple事件,price分别为2,1,2。则针对该EPL的平均值为(2+1)/2=1.5。因为有distinct的修饰,所以第二个2不参与运算,事件总数即为2,而不是3。

注意点:聚合函数能用于Select和Having,但是不能用于 Where,而 sumavgmediastddevavedev 只能计算数值

12、Group by 分组

Group by 通常配合聚合函数使用。语法和SQL基本一样,产生的效果就是以某一个或者多个字段进行分组,然后使聚合函数作用于不同组的数据。简单语法如下:

group by aggregate_free_expression [, aggregate_free_expression] [, ...]

注意点:

  • Group by 后面的内容不能包含聚合函数。

  • Group by 后面的内容不能是之前 select 子句中聚合函数修饰的属性名。

  • 通常情况要保证分组数量有限制,以防止内存溢出。但是如果分组分了很多,就需要使用 @Hint 加以控制。

(1)基本用法

-- 根据 color和size 来对 10个Apple 事件进行分组计算平均price
select avg(price) as aPrice, color, size from Apple.win:length_batch(10) group by color,size

该句子遵从SQL的标准,如果某个事件的 color和size 和之前进入的事件的一样,则归为一组,否则新建一组,并计算平均 price

-- 根据 size 来对 10个Apple 事件进行分组计算平均 price和color
select avg(price) as aPrice, color, size from Apple.win:length_batch(10) group by size

这里group by 的对象只有 size,而 select 中 color 不聚合,则生成的结果时,聚合函数会根据相同的 size 分组进行平均price的计算,但是color不是分组条件,所以color有多少个就有多少组。

-- 根据 size 来对 10个Apple 事件进行分组计算平均 price和color
select avg(price) as aPrice, color from Apple.win:length_batch(10) group by size

这一次 select 子句中没有包含分组的字段 size,但是效果和上一个句子一样。Esper仍然会根据相同的 size 进行分组计算平均 price,只不过计算结果中只有平均price和color,并且有十排结果。

-- 根据 size 乘 color 来对 10个Apple 事件进行分组计算平均 price
select avg(price) as aPrice, size*color from Apple.win:length_batch(10) group by size*color

(2)@Hint

@Hint 是Esper中注解的其中一个,它是专用于 Group by 的。我们平时使用 Group by 的时候,会遇到分组数量太多的情况。比如以时间单位进行分组,那么内存使用一定是一个大问题。因此 @Hint 为其设计了两个属性,用于限制 Group by 的生存时间,使虚拟机能及时回收内存。这两个属性分别为 reclaim_group_aged 和 reclaim_group_freq

2.1 reclaim_group_aged

该属性值是正整数,以秒为单位,表示在 n 秒内,若分组的数据没有进行更新,则分组数据被Esper回收。例如:

-- 根据 color 对 10秒 内进入的 Apple事件 进行分组计算 平均price,并且对 5秒内没有数据更新的分组进行回收  
@Hint('reclaim_group_aged=5')select avg(price) as aPrice, color from Apple.win:time(10 sec) group by color
2.2 reclaim_group_freq

该属性值是正整数,以秒为单位,表示每 n 秒清理一次分组,可清理的分组是 reclaim_group_aged 决定的,也就是说要使用该参数,就要配合 reclaim_group_aged 一起使用。可能不是很好理解,先看看例子:

-- 根据 color 对 10秒 内进入的 Apple事件 进行分组计算平均 price。对8秒内没有数据更新的分组进行回收,每2秒回收一次  
@Hint('reclaim_group_aged=8,reclaim_group_freq=2')select avg(price) as aPrice, color from Apple.win:time(10 sec) group by color

对上面来说就是回收的条件为8秒内没有数据更新,且每8秒回收一次。这样的话有可能出现这么一种情况,上一个8秒的某个分组在下一个8秒还没到达时就已经持续8秒没有数据更新了,但是必须等到回收的时间点到达时才能回收这个分组。在分组产生很快的情况下,这样的回收不及时很可能会造成内存溢出。reclaim_group_freq 正是为这种情况做准备,回收的频率高一些,在一定程度上能提高内存的使用率。

上面这两个属性的值除了可以使用正整数之外,也可以使用预先定义的变量或者常量

13、Having

Having 的用法和SQL一样,后面跟的是对聚合函数的计算结果进行过滤。Where 子句不能包含聚合函数,所以就由 Having 来完成。示例如下:

-- 根据 size 来对 10个Apple 事件进行分组计算平均 price和color,并且排除平均 price大于5 的分组
select avg(price) as aPrice, color from Apple.win:length_batch(10) group by size having avg(price) > 5

通常 Having 配合 Group by 使用,如果没有使用 Group by,那么就只有一组。例如:

-- 根据 size 来对 10个Apple 事件计算平均 price和color,如果平均 price大于5,则数据被排除掉
select avg(price) as aPrice, color from Apple.win:length_batch(10) having avg(price) > 5

Having 后面可以跟多个判断式子,并且用 andor 或者 not 进行连接。例如:

-- 根据size来对10个Apple事件计算平均price和color,如果平均price大于5并且平均size小于3,则数据被排除掉
select avg(price) as aPrice, color from Apple.win:length_batch(10) having avg(price) > 5 and avg(size) < 3

14、Output 输出

(1)基本语法

Output 是EPL中非常有用的东西,用来控制Esper对事件流计算结果的输出时间和形式,可以以固定频率,也可以是某个时间点输出。简单语法如下:

output [after suppression_def]  
[[all | first | last | snapshot] every time_period | output_rate events]
  • after suppression_def:可选参数,表示先满足一定的条件再输出。

  • all | first | last | snapshot:表明输出结果的形式,默认值为 all

  • every output_rate:表示输出频率,即每达到规定的频率就进行输出。

  • time_period:表示时间频率,上面文章有介绍到。

  • output_rate events:表示事件数量。

举例如下:

-- 30分钟内,每进入一个 OrderEvent,统计一次 sum price,并且每 60秒 输出一次统计结果 
select sum(price) from OrderEvent.win:time(30 min) output snapshot every 60 seconds

(2)after

after 在 output 里的使用也很简单,语法如下:

output after time_period | number events [...]
  • time_period:表示时间段

  • number events:表示事件数量,表示从EPL可用开始,经过一段时间或者接收到一定数量的事件再进行输出

举例如下:

-- 统计 20个Apple 事件的 sum price,并且在有 5个Apple 事件进入后才开始输出统计结果
select sum(price) from Apple.win:length(20) output after 5 events

上面这个句子从第一个进入的事件进行统计,直到进入了 5 个事件以后才输出统计结果,之后每进入一个事件输出一次(这是win:length的特性)。

但是要注意的是,after 之后的时间长度和事件数量会影响之后的时间或者事件数量。什么意思?看个完整例子:

1、定义事件模型

@AllArgsConstructor
@Getter
public class Banana {
    private int id;
    private int price;
}

2、测试

public static void main(String[] args) {
    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();

    EPAdministrator admin = epService.getEPAdministrator();

    String banana = Banana.class.getName();
    // 统计最新 3个Banana 事件的 sum price,并且从EPL可用起,等待第一个事件进入后,以每两个事件进入的频率输出统计结果
    String epl = "select sum(price) as sPrice from " + banana + ".win:length(3) output after 1 events snapshot every 2 events";

    EPStatement state = admin.createEPL(epl);
    state.addListener((newEvents, oldEvents) -> {
        if (newEvents != null) {
            int price = (Integer) newEvents[0].get("sPrice");
            System.out.println("Banana's sum price is " + price);
        }
    });

    EPRuntime runtime = epService.getEPRuntime();

    System.out.println("Send Banana Event 1");
    runtime.sendEvent(new Banana(1, 6));

    System.out.println("Send Banana Event 2");
    runtime.sendEvent(new Banana(2, 3));

    System.out.println("Send Banana Event 3");
    runtime.sendEvent(new Banana(3, 1));

    System.out.println("Send Banana Event 4");
    runtime.sendEvent(new Banana(4, 2));

    System.out.println("Send Banana Event 5");
    runtime.sendEvent(new Banana(5, 4));

    System.out.println("Send Banana Event 6");
    runtime.sendEvent(new Banana(6, 5));

    System.out.println("Send Banana Event 7");
    runtime.sendEvent(new Banana(7, 1));

    System.out.println("Send Banana Event 8");
    runtime.sendEvent(new Banana(8, 8));
}

执行结果

Send Banana Event 1
Send Banana Event 2
Send Banana Event 3
香蕉总价:10
Send Banana Event 4
Send Banana Event 5
香蕉总价:7
Send Banana Event 6
Send Banana Event 7
香蕉总价:10
Send Banana Event 8
  • 第一个总价:6+3+1 = 10

  • 第二个总价:1+2+4 = 7

  • 第三个总价:4+5+1 = 10

由此可见,after 之后的 every 子句要等到 after 后面的表达式满足后才生效。所以第一个事件进入后,every 2 events 生效,即等待两个事件进入后才输出结果。

对于时间,也是要等到 after 的子句满足后才开始计时。例如:

-- 从EPL可用开始计时,经过1分钟后,每5秒 输出一次当前 100秒 内的所有 Banana的 avg price(即:第一次输出在65秒时)  
select avg(price) from Banana.win:time(100 sec) after 1 min snapshot every 5 sec

(3)first、last、snapshot、all

每当达到输出时间点时,可以用这四个参数来控制输出内容。

3.1 first 首个

表示每一批可输出的内容中的第一个事件计算结果。

-- 每进入两个 Fruit 事件,输出这两个事件的第一个
select * from Fruit output first every 2 events
3.2 last 最后一个

表示每一批可输出的内容中的最后一个事件计算结果。

-- 每进入两个 Fruit 事件,输出这两个事件的第二个,也就是最后一个
select * from Fruit output last every 2 events
3.3 snapshot 快照

表示输出EPL所保持的所有事件计算结果,通常用来查看 view 或者 window 中现存的事件计算结果。比如:

-- 每进入两个事件输出5 sec内的所有事件,且不会将这些事件从5 sec范围内移除
select * from Fruit.win:time(5 sec) output snapshot every 2 events
3.4 all 全部

默认值。和 snapshot 类似,也是输出所有的事件,但是不同的是,snapshot 相当于对计算结果拍了一张照片,把结果复制出来并输出,而 all 是把计算结果直接输出,不会复制。比如:

select * from Fruit.win:time(5 sec) output all every 2 events

上面的句子表示每进入两个事件输出 5 sec 内包含的所有事件,输出的事件不再保留于 5 sec 范围内。

(4)Crontab Output 定时任务输出

output 的另一个语法可以建立定时输出,关键字是 at。语法如下:

output [after suppression_def]  
[[all | first | last | snapshot] at  
(minutes, hours, days of month, months, days of week [, seconds])]

时间单位

  • minutes

  • hours

  • days of month

  • months

  • days of week

  • [, seconds]

举例:

-- 在 8点到17 点这段时间内,每 15分钟 输出一次  
select * from Fruit output at (*/15,8:17,*,*,*)

(5)when 在...时

Output 还可以使用 when 来实现达到某个固定条件再输出的效果,一般通过变量,用户自定义的函数以及 output 内置的属性来实现。

基本语法:

output [after suppression_def]
[[all | first | last | snapshot] when trigger_expression
[then set variable_name = assign_expression [, variable_name = assign_expression [,...]]]
  • trigger_expression:返回 true 或者 false,表示输出或者不输出。

  • then set variable_name=assign_expression:表示是当 trigger_expression 被触发时,可对变量重新赋值。

完整例子如下:

1、定义事件模型

@Getter
@AllArgsConstructor
public class Pink {
    private int id;
    private int price;

    @Override
    public String toString() {
        return "id: " + id + ", price: " + price;
    }
}

2、测试

public static void main(String[] args) throws InterruptedException {
    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
    EPAdministrator admin = epService.getEPAdministrator();
    ConfigurationOperations config = admin.getConfiguration();
    config.addVariable("exceed", boolean.class, false);

    String pink = Pink.class.getName();
    // 当 exceed 为 true 时,输出所有进入EPL的事件,然后设置 exceed 为 false
    String epl = "select * from " + pink + " output when exceed then set exceed=false";

    EPStatement state = admin.createEPL(epl);
    state.addListener((newEvents, oldEvents) -> {
        if (newEvents != null) {
            for (int i = 0; i < newEvents.length; i++) {
                Pink p = (Pink) newEvents[i].getUnderlying();
                System.out.println("输出 Pink: " + p);
            }
        }
    });

    EPRuntime runtime = epService.getEPRuntime();

    Random r = new Random(10);
    for (int i = 1; i <= 10; i++) {
        int price = r.nextInt(30);
        Pink p = new Pink(i, price);
        System.out.println("Send Pink Event: " + p);
        runtime.sendEvent(p);
        // 当 price>10 时,exceed 变量为 true
        if (price > 10) {
            runtime.setVariableValue("exceed", true);
            // 因为主线程和输出线程不是同一个,所以这里休息1秒保证输出线程将事件全部输出,方便演示。
            Thread.sleep(1000);
        }
    }
}

执行结果

Send Pink Event: id: 1, price: 3
Send Pink Event: id: 2, price: 0
Send Pink Event: id: 3, price: 3
Send Pink Event: id: 4, price: 0
Send Pink Event: id: 5, price: 16
00:53:21.851 [main] DEBUG com.espertech.esper.epl.view.OutputConditionExpression - .update Ignoring variable callback
输出 Pink: id: 1, price: 3
输出 Pink: id: 2, price: 0
输出 Pink: id: 3, price: 3
输出 Pink: id: 4, price: 0
输出 Pink: id: 5, price: 16
Send Pink Event: id: 6, price: 16
00:53:22.854 [main] DEBUG com.espertech.esper.epl.view.OutputConditionExpression - .update Ignoring variable callback
输出 Pink: id: 6, price: 16
Send Pink Event: id: 7, price: 7
Send Pink Event: id: 8, price: 28
00:53:23.860 [main] DEBUG com.espertech.esper.epl.view.OutputConditionExpression - .update Ignoring variable callback
输出 Pink: id: 7, price: 7
输出 Pink: id: 8, price: 28
Send Pink Event: id: 9, price: 21
00:53:24.862 [main] DEBUG com.espertech.esper.epl.view.OutputConditionExpression - .update Ignoring variable callback
输出 Pink: id: 9, price: 21
Send Pink Event: id: 10, price: 4

从结果可以看出来。当 price>10 的时候,设置 exceed 变量为 true,即可输出之前进入的所有事件,然后 set 子句将 exceed 设置为 false,等待下一次 exceed=true 时触发输出。由于输出线程是单独的线程,所以如果不 sleep,结果可能会和这个不同。

对于 when 关键字,Esper提供了一些内置的属性帮助我们实现更复杂的输出约束。如图所示:

epl-when.png

以上5个属性就不多做解释了,使用方式是作为 trigger_expression 跟在 when 关键字的后面。例如:

-- 进入的 Apple 事件总数达到 5 个时才输出,且不清零 count_insert_total 属性,继续累加事件总数
select * from Apple output when count_insert_total=5
  
-- 移除的 Apple 事件总数达到4个时才输出,并清零 count_remove 属性
select * from Apple output when count_remove=4

另外,在使用 when 的时候,注意如下两点:

  • 当 trigger_expression=true 时,Esper会输出从上一次输出之后到这次输出之间所有的 insert stream 和 remove stream

  • 若 trigger_expression 不断被触发并返回 true 时,则Esper最短的输出间隔为 100毫秒

(6)Context Terminated 上下文终止

Output 还针对 Context 专门设计了一个输出条件,即在 Context 终止时输出 Context 中的内容。

语法如下:

output when terminated [and termination_expression]  
[then set variable_name = assign_expression [, variable_name = assign_expression [,...]]]]
  • when terminated:是关键字,之前可以通过 and 连接其他的式子一起使用。

  • termination_expression:是一个返回 true 或者 false 的表达式,同trigger_expression一样。

举例如下:

// 在 MyContext 下,查询 context 的 id 并计算 Apple 的 sum price,当 Context 结束且输入的事件总数大于 10 时,输出。然后设置 FinishCompute 变量为 true  
context MyContext select context.id, sum(price) from Apple output when terminated and count_insert_total > 10 then set FinishCompute = true  
  
// 在 MyContext 下,计算 Apple 的 avg size,并每 1分钟 输出第一个进入的事件计算结果,当 context 结束时也输出一次计算结果  
context MyContext select avg(size) from Apple output first every 1 min and when terminated

Output 和 AggregationGroup by 一起使用时,firstlastallsnapshot 四个关键字产生的效果会比较特别。建议看看Esper的官方文档的 Appendix A,有相当完整的例子做说明。

外针对这四个关键字,只有使用 snapshot 是不会缓存计算结果。其他的关键字会缓存事件直到触发了输出条件才会释放,所以如果输入的数据量比较大,就要注意输出条件被触发前的内存使用量。


未经允许请勿转载:程序喵 » Esper教程 —— EPL 语法讲解(5)

点  赞 (3) 打  赏
分享到: