EPL 语法讲解
EPL
全称 Event Processing Language
,是一种类似SQL的语言,包含了 SELECT
, FROM
, WHERE
, GROUP BY
, HAVING
和 ORDER BY
子句,同时用事件流代替了 table
作为数据源,并且能像SQL那样 join
,filtering
和 aggregation
。所以如果各位有SQL基础的话,简单的EPL很容易掌握。
除了 select
,EPL 也有 insert into
,update
,delete
,不过含义和 SQL 并不是很接近。另外还有 pattern
和 output
子句,这两个是SQL所没有的。
EPL 还定义了一个叫 view
的东西,类似 SQL 的 table
,来决定哪些数据是可用的,Esper 提供了十多个 view
,并且保证这些 view
可以被重复使用。而且用户还可以扩展 view
成为自定义 view
来满足需求。在 view
的基础上,EPL还提供了 named window
的定义,作用和 view
类似,但是更加灵活。。。。
1、基本语法
[annotations] [expression_declarations] [context context_name] [insert into insert_into_def] select select_list from stream_def [as name] [, stream_def [as name]] [,...] [where search_conditions] [group by grouping_expression_list] [having grouping_search_conditions] [output output_specification] [order by order_by_expression_list] [limit num_rows]
以上大部分的 EPL 都是按照这个格式来定义。
2、Time Periods 时间表达式
Esper 时间的表达形式。语法如下:
time-period : [year-part] [month-part] [week-part] [day-part] [hour-part] [minute-part] [seconds-part] [milliseconds-part] year-part : (number|variable_name) ("years" | "year") month-part : (number|variable_name) ("months" | "month") week-part : (number|variable_name) ("weeks" | "week") day-part : (number|variable_name) ("days" | "day") hour-part : (number|variable_name) ("hours" | "hour") minute-part : (number|variable_name) ("minutes" | "minute" | "min") seconds-part : (number|variable_name) ("seconds" | "second" | "sec") milliseconds-part : (number|variable_name) ("milliseconds" | "millisecond" | "msec")
举几个例子说明下:
-- 计算过去的 5分3秒 中进入改语句的 Fruit 事件的平均 price select avg(price) from Fruit.win:time(5 minute 3 sec) -- 每一天输出一次用户的账户总额 select sum(account) from User output every 1 day
用法比较简单。要注意的是,Esper 规定每月的天数都是 30天
,所以对准确性要求高的业务,以月为单位进行计算会出现误差的。
3、Comments 注释
和Java差不多,只不过只有 /* */
和 //
两种。
//
:只能单行注释,/* */
:可以多行注释。
示例如下:
(1)单行注释
// This comment extends to the end of the line. // Two forward slashes with no whitespace between them begin such comments. select * from MyEvent
(2)多行注释
/* this is a very important Event */ select * from OtherEvent
(3)混合注释
select field1 // first comment /* second comment */ field2 from MyEvent
4、Reserved Keywords 保留关键字
EPL 里如果某个事件属性或者事件流的名称和EPL的关键字一样,那么必须要以 ` 括起来才可用,在键盘上esc的下面,1的左边。比如:
-- 无效示例:insert和Order是关键字 select insert from Order -- 有效示例:`insert`和`Order`是属性名称和事件流名称 select `insert` from `Order`
5、Escaping Strings 转义字符
在EPL中,字符串使用单引号
或者双引号
括起来的,那如果字符串里包含有单引号或者双引号怎么办呢。请看例子:
select * from OrderEvent(name='John') -- 等同于 select * from OrderEvent(name="John")
如果 name=John's
,则就需要反斜杠进行转义了。
select * from OrderEvent(name="John\"s") -- 或者 select * from OrderEvent(name='john\'s') -- 或者 select * from OrderEvent(name='john"s')
除了使用反斜杠,还可以使用 unicode
来表示单引号和双引号。
select * from OrderEvent(name="John\u0022s") -- 或者 select * from OrderEvent(name='john\u0027s')
注意在Java编写EPL的时候,反斜杠和无含义的双引号还得转义,不然会和String的双引号冲突。比如
epService.getEPAdministrator().createEPL("select * from OrderEvent(name='John\\'s')"); // ... and for double quotes... epService.getEPAdministrator().createEPL("select * from OrderEvent(name=\"Quote \\\"Hello\\\"\")");
6、Data Types 数据类型
EPL支持Java所有的数值数据类型,包括基本类型及其包装类,同时还支持 java.math.BigInteger
和 java.math.BigDecimal
,并且能自动转换数据类型不丢失精度(比如short转int
,int转short
则不行)。如果想在EPL内进行数据转换,可以使用 cast函数
。完整例子如下。
1、定义事件模型
@Data @AllArgsConstructor public class Banana { private int price; }
2、测试类
public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); String banana = Banana.class.getName(); String epl1 = "select cast(avg(price),int) as price from " + banana + ".win:length_batch(2)"; String epl2 = "select avg(price) from " + banana + ".win:length_batch(2)"; EPStatement state1 = admin.createEPL(epl1); state1.addListener((newEvents, oldEvents) -> { if (newEvents != null) { EventBean event = newEvents[0]; System.out.println("平均价格: " + event.get("price") + ",数据类型:" + event.get("price").getClass().getName()); } }); EPStatement state2 = admin.createEPL(epl2); state2.addListener((newEvents, oldEvents) -> { if (newEvents != null) { EventBean event = newEvents[0]; System.out.println("平均价格: " + event.get("avg(price)") + ",数据类型:" + event.get("avg(price)").getClass().getName()); } }); EPRuntime runtime = epService.getEPRuntime(); runtime.sendEvent(new Banana(1)); runtime.sendEvent(new Banana(2)); }
执行结果
平均价格: 1,数据类型:java.lang.Integer 平均价格: 1.5,数据类型:java.lang.Double
要提醒的是,如果某个数除以0
,那么默认会返回正无穷大或者负无穷大,不过可以配置这个结果,比如用null
来代替。
7、Annotation
EPL 也可以写注解,种类不多,大部分简单而有效。不过有些注解内容较多,以后会在具体的使用场景进行详细讲解。首先来了解下注解的语法。
// 不包含参数或者单个参数的注解 @annotation_name [(annotation_parameters)] // 包含多个属性名-值对的注解 @annotation_name (attribute_name = attribute_value, [name=value, ...]) // 多个注解联合使用 @annotation_name [(annotation_parameters)] [@annotation_name [(annotation_parameters)]] [...]
讲解具体注解时,结合语法进行说明
@Name
:指定EPL的名称,参数只有一个。例如:@Name("MyEPL")
@Description
:对EPL进行描述,参数只有一个。例如:@Description("This is MyEPL")
@Tag
:对EPL进行额外的说明,参数有两个,分别为 Tag的名称和Tag的值,用逗号分隔。例如:@Tag(name="author",value="luonanqin")
@Priority
:指定EPL的优先级,参数只有一个,并且整数(可负可正)。例如:@Priority(10)
@Drop
:指定事件经过此EPL后不再参与其他的EPL计算,该注解无参数。@Hint
:为EPL加上某些标记,让引擎对此EPL产生其他的操作,会改变EPL实例的内存占用,但通常不会改变输出。其参数固定,由Esper提供。@Audit
:EPL添加此注解后,可以额外输出EPL运行情况,有点类似日志的感觉。@Hook
:与SQL相关。@EventRepresentation
:这是用来指定EPL产生的计算结果事件包含的数据形式。参数只有一个(数据形式为Map,默认值)
示例代码:
1、定义事件模型
@AllArgsConstructor @Getter public class Apple { private int price; }
2、测试
public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); String apple = Apple.class.getName(); String epl1 = "@Name('AppleTest')" + "@Description('获取两个请求的总价')" + "@Tag(name='author',value='程序喵')" + "@Priority(10)" + "@Audit" + "@EventRepresentation(value=objectArray)" + "select sum(price) as price from " + apple + ".win:length_batch(2)"; String epl2 = "select avg(price) as price from " + apple + ".win:length_batch(2)"; String epl3 = "@Drop select sum(price) as price from " + apple + ".win:length_batch(2)"; // 注册监听 UpdateListener listenenr = (newEvents, oldEvents) -> { if (newEvents != null) { EventBean event = newEvents[0]; System.out.println("价格: " + event.get("price") + ", 事件类型:" + event.getEventType().getUnderlyingType()); } }; EPStatement state1 = admin.createEPL(epl1); EPStatement state2 = admin.createEPL(epl2); EPStatement state3 = admin.createEPL(epl3); state1.addListener(listenenr); state2.addListener(listenenr); state3.addListener(listenenr); System.out.println("epl1 名字是 " + state1.getName()); EPRuntime runtime = epService.getEPRuntime(); runtime.sendEvent(new Apple(1)); runtime.sendEvent(new Apple(2)); }
执行结果
epl1 名字是 AppleTest 价格: 3, 事件类型:class [Ljava.lang.Object; 价格: 1.5, 事件类型:interface java.util.Map 价格: 3, 事件类型:interface java.util.Map
可以发现,@Name
和 @EventRepresentation
都起效果了,但是 @Priority
和 @Drop
没用,那是因为这两个是要配置才能生效的。以后讲 Configuration
的时候会说到。
注意点:大多网上资料都是给 @EventRepresentation
设置 array=true(数组) 或 array=false(Map)
。但其实 7.1.0
版本中并不是,会有如下错误。
Caused by: com.espertech.esper.epl.annotation.AnnotationException: Annotation 'EventRepresentation' requires a EventUnderlyingType-typed value for attribute 'value' but received a Boolean-typed value at com.espertech.esper.epl.annotation.AnnotationUtil.getFinalValue(AnnotationUtil.java:221) at com.espertech.esper.epl.annotation.AnnotationUtil.createProxy(AnnotationUtil.java:150) at com.espertech.esper.epl.annotation.AnnotationUtil.compileAnnotations(AnnotationUtil.java:101) at com.espertech.esper.epl.annotation.AnnotationUtil.compileAnnotations(AnnotationUtil.java:78) ... 5 more
打开源码:看得出 EventRepresentation
返回了一个 EventUnderlyingType
枚举类型值,而 EventUnderlyingType
有三个值。
public @interface EventRepresentation { EventUnderlyingType value(); } public enum EventUnderlyingType { /** * Event representation is object-array (Object[]). */ OBJECTARRAY, /** * Event representation is Map (any java.util.Map interface implementation). */ MAP, /** * Event representation is Avro (GenericData.Record). */ AVRO;
如下代码,三种配置结果是相同的,不区分大小写。
@EventRepresentation(map) @EventRepresentation(value=map) @EventRepresentation(value=Map)
8、Expression 表达式
Expression
类似自定义函数,通常用 Lambda
表达式来建立的(也有其他方式)与 Java Lambda 表达式类似,用 “=>
” 符号,表示“gose to”。符号的左边表示输入参数,符号右边表示计算过程,计算结果就是这个表达式的返回值。
语法如下:
expression expression_name { expression_body }
expression:是关键字
expression_name:为表达式名称(唯一)
expression_body:表达式具体内容,空格只是为了看着清晰,不影响结果。
expression_body: (input_param [,input_param [,...]]) => expression
input_param
:必须为事件流的别名,注意不是事件流名称,多个参数用逗号分隔,并用圆括号括起来。
举例如下:
expression middle { x => (x.max+x.min)/2 } select middle(apple) from Apple as apple
执行结果就是表达式 middle
的计算结果,参数 x
表示输入,而 x.max
和 x.min
都是 x
代表的事件流的属性,如果事件流没这个属性,那么表达式定义就是错误的。
对于多个参数的 expression
定义,例子如下:
expression sumage { (x,y) => x.age+y.age } select sumage(me,you) from Me as me, You as you
要是两个age的数据类型不一样是什么结果呢?
expression_body
除了可以用 lambda
表达式之外,还可以用聚合函数,变量,常量,子查询
语句,甚至另一个表达式。子查询语句在没有 in或者exist
关键字的情况下,需要圆括号括起来。示例如下:
expression newsSubq(md) { (select sentiment from NewsEvent.std:unique(symbol) where symbol = md.symbol) } select newsSubq(mdstream) from MarketDataEvent mdstream
针对变量和常量的示例如下:
expression twoPI { Math.PI * 2} select twoPI() from SampleEvent
对于 expression
里用另一个 expression
,EPL不允许在一个句子里建立两个expression,所以就出现了 Global-Expression
。普通的 expression 只作用于定义它的epl,如上面所有的包含 select
子句的epl就是如此。
Global-Expression 的语法如下:
create expression expression_name { expression_body }
和普通的 expression
相比,就是多了个 create
,不过他不能和别的子句放在一起,即他是单独执行的。例如:
epService.getEPAdministrator().createEPL("create expression avgPrice { x => (x.fist+x.last)/2 }");
expression
里用另一个 expression
示例
-- 先定义全局的avgPrice create expression avgPrice { x => (x.fist+x.last)/2 } -- bananaPrice Banana 事件中包含了 first 和 last 属性,否则将报错 expression bananaPrice{ x => avgPrice(x) } select bananaPrice(b) from Banana as b
9、Select Clause 条件选择
(1)查询事件流的所有属性、特定属性
EPL 的 select
和 SQL 相近,SQL用 *
表示查询表的所有字段,而EPL用 *
表示查询事件流的所有属性值。另外 EPL 查询属性也可以设置别名。示例如下:
-- EPL:查询完整的 User 对象 select * from User -- 获取 User 对象 User u = newEvent.getUnderlying(); -- EPL:查询 User 的 name和id,id别名为i select name, id as i from User -- 获取 name和id String name = (String)newEvent.get("name"); int id = (Integer)newEvent.get("i");
这里要注意,如果查询的是一个完整对象,需要调用 getUnderlying()
方法,而 get
方法是针对确定的属性名或者别名。
(2)表达式
EPL还支持属性值的计算,以计算后的值作为结果返回。这个计算的式子就是表达式。除了简单的加减乘除,还可以利用事件流对象的某个方法。
如下示例:
@Data public class Rectangle { private int length; private int width; // 外部传入参数计算面积 public int getArea(int l, int w) { return l*w; } // 计算该对象的面积 public int getArea() { return length * width; } } // 计算长方形的面积(长乘以宽) select length * width as area from Rectangle // 计算长方形的面积(长乘以宽) select r.getArea(r.length,r.width) as area from Rectangle as r select r.getArea() as area from Rectangle as r
如上所示,一个方法需要传参,另一个方法不需要,但是他会利用当前事件的 length
和 width
来计算面积。而且要注意的是事件流需要设置别名才能使用其方法,如:r.getArea()
。
如果 Rectangle
类里没有计算面积的方法,但是提供了一个专门计算面积的静态方法,表达式也可以直接引用。不过要事先加载这个包含方法的类。例如:
// 该类用于计算面积 public class ComputeArea { public static int getArea(int length, int width) { return length * width; } } // 加载 epService.getEPAdministrator().getConfiguration().addImport(ComputeArea.class); // 调用 ComputeArea 的 getArea 方法计算面积 select ComputeArea.getArea(length,width) from Rectangle
注意一定要是静态方法,不然没有实例化是没法引用的。
(3)多事件流的查询
和SQL类似,EPL 也可以同时对多个事件流进行查询,即 join
,但是必须对每个事件流设置别名。例如:
// 当 老师的id 和 学生的id 相同时,查询学生的姓名和老师的姓名 select s.name, t.name from Student.win:time(10) as s, Teacher.win:time(10) as t where s.id=t.id
如果想查询 Student
或者 Teacher
,则EPL改写如下:
select s.* as st, t.* as tr from Student.win:time(10) as s, Teacher.win:time(10) as t where s.id=t.id
关于 join
,下面会介绍到。
(4)insert 和 remove事件流
Esper 对于事件流分输入和移出两种,分别对应监听器的两个参数 newEvents
和 oldEvents
。newEvents
通常对应事件的计算结果,oldEvents
可以理解过上一次计算结果。
默认情况下,Esper 认为你只想让 newEvents
触发监听器,即 istream(insert stream)
。如果想让 oldEvents
触发监听器,那么为 rstream(remove stream)
。如果两个都想,那么为 irstream
。
select rstream * from User
如果使用了该参数,则会将上一次计算结果放入 newEvents
内,而不是 oldEvents
。并且无法获得当前的计算结果。
select irstream * from User
如果使用了该参数,则会将当前的计算结果放入 newEvents
内,上一次的计算结果放入 oldEvents
内。
select istream * from User -- 等同于 select * from User
如果使用了该参数,则会将当前的计算结果放入 newEvents
内,并且无法获得上一次的计算结果。同时该参数也是默认参数,可不写。
(5)Distinct
distinct
的用法和SQL一样,放在需要修饰的属性或者 *
前即可。例如:
select distinct * from User.win:time(3 sec)
(6)查询指定引擎的处理结果
除了上述所说的一些特点外,select还可以针对某个引擎进行查询。因为引擎都有自己的URI,所以可以在select句子中增加URI标识来指定查询哪一个引擎的事件处理情况。例如:
-- 引擎URI为Processor select Processor.MyEvent.myProperty from Processor.MyEvent
10、From Clause 条件选择
(1)语法介绍
From
的语法不难,主要内容是针对事件流的处理。包括事件流过滤,事件流的维持等等。语法如下:
from stream_def [as name] [unidirectional] [retain-union | retain-intersection] [, stream_def [as stream_name]] [, ...] -- 事件流 event_stream_name [(filter_criteria)] [contained_selection] [.view_spec] [.view_spec] [...]
unidirectional
、retain-union
、retain-intersection
、contained_selection
、view_spec
这几个关键字因为涉及到 view
的知识,后面会介绍道。学完 view
之后再来回顾这几个参数会很容易理解的。
下面讲讲怎么过滤事件流
(2)事件流过滤
2.1 过滤属性
事件流过滤通常情况都是对其中某个或多个属性加以限制来达到过滤的目的。注意,过滤表达式是紧跟在事件流名称之后而不是别名之后。例如:
-- 只有 age大于10 的 User 对象才可查询到 name 值 select name from User(age>10) as user -- 当 name=“tingfeng” 时,可获得其 age 值 select age from User(name="tingfeng") -- 错误写法 select name from User as user(age>10)
过滤表达式写法多种多样,可以用符号,又或者使用and,or,between等逻辑语言。例如:
-- 查询年龄大于15小于18的学生的姓名 select name from Student(age between 15 and 18) -- 等同于 select name from Student(age >= 15 and age <= 18) -- 等同于 select name from Student(age >= 15, age <= 18)
看以看到,过滤表达式写法很多,并且多个表达式同时作用于一个事件流,用逗号连接即可。如果说满足其中一个条件即可,则需要用 or
连接。
2.2 过滤范围
过滤表达式使用的符号很多,总结下来基本上有 <
、>
、<=
、>=
、=
、!=
、between
、in
、not in
、and
、or
、[ ]
、( )
。
这里主要说下 between
、in
、( )
、[ ]
。
between……and……
:和SQL语法一样,是一个闭区间。比如说between 10 and 15
,表示10到15之间,并且包含10和15。( )
:表示一个开区间,语法为(low:high)
,如 (10:15),表示10到15之间,并且不包含10和15。[ ]
:表示一个闭区间,语法为[low:high]
。如[10:15],表示10到15之间,并且包含10和15。( )和[ ]
:可以混合用。比如[10:15)或者(10:15]。in
:配合( )
和[ ]
进行使用,表示值在某个范围内。not in
:表示不在此范围内。
in
和 not in
不止可以作用于数字,它还可以作用于字符串。举例
select name from User(age in [10:15)) select age from User(name in ('张三', '李四'))
2.3 静态方法过滤
除了上面说的这些符号以外,类似于select子句中使用的静态方法,过滤表达式中也可以使用,但是返回值必须为布尔值,不然会报错。例如:
// 判断总数是否等于0 public class IsZero { public static boolean isZero(int sum) { return sum==0; } } // 加载 epService.getEPAdministrator().getConfiguration().addImport(IsZero.class); // 查询没有钱的用户的 name 值(User包含name和money属性) select name from User(IsZero.isZero(money))
事件流的过滤并不能弄得很复杂,他有一下几个限制:
要过滤的属性只能是数字和字符串。
过滤表达式中不能使用聚合函数。
11、Aggregation 聚合函数
和SQL一样,EPL也有 Aggregation
,即聚合函数。语法如下:
aggregate_function([all|distinct] expression)
aggregate_function
:聚合函数的名字,比如avg
,sum
等。expression
通常是事件流的某个属性,也可以是不同事件流的多个属性,或者是属性和常量、函数之间的运算。举例如下。
-- 查询最新 5秒 的 Apple 的平均价格 select avg(price) as aPrice from Apple.win:time(5 sec) -- 查询最新 10个 Apple 的价格总和的两倍 select sum(price*2) as sPrice from Apple.win:length(10) -- 查询最新 10个 Apple 的价格,并用函数计算后再算平均值 select avg(Compute.getResult(price)) from Apple.win:length(10)
函数只能是静态方法,普通方法不可用。即使是事件流里包含的静态方法,也必须用“类名.方法名
”的方式进行引用。
可以使用 distinct
关键字对 expression
加以约束,表示去掉表达式重复的值。默认情况下为 all
关键字,即所有的值都参与聚合运算。例如:
-- 查询最新 5秒 的 Apple 的平均价格 select avg(distinct price) as aPrice from Apple.win:time(5 sec)
假如:5秒内进入了三个Apple事件,price分别为2,1,2。则针对该EPL的平均值为(2+1)/2=1.5。因为有distinct的修饰,所以第二个2不参与运算,事件总数即为2,而不是3。
注意点:聚合函数能用于Select和Having,但是不能用于 Where
,而 sum
、avg
、media
、stddev
、avedev
只能计算数值
12、Group by 分组
Group by
通常配合聚合函数使用。语法和SQL基本一样,产生的效果就是以某一个或者多个字段进行分组,然后使聚合函数作用于不同组的数据。简单语法如下:
group by aggregate_free_expression [, aggregate_free_expression] [, ...]
注意点:
Group by
后面的内容不能包含聚合函数。Group by
后面的内容不能是之前select
子句中聚合函数修饰的属性名。通常情况要保证分组数量有限制,以防止内存溢出。但是如果分组分了很多,就需要使用
@Hint
加以控制。
(1)基本用法
-- 根据 color和size 来对 10个Apple 事件进行分组计算平均price select avg(price) as aPrice, color, size from Apple.win:length_batch(10) group by color,size
该句子遵从SQL的标准,如果某个事件的 color和size 和之前进入的事件的一样,则归为一组,否则新建一组,并计算平均 price
-- 根据 size 来对 10个Apple 事件进行分组计算平均 price和color select avg(price) as aPrice, color, size from Apple.win:length_batch(10) group by size
这里group by
的对象只有 size
,而 select
中 color
不聚合,则生成的结果时,聚合函数会根据相同的 size
分组进行平均price的计算,但是color不是分组条件,所以color有多少个就有多少组。
-- 根据 size 来对 10个Apple 事件进行分组计算平均 price和color select avg(price) as aPrice, color from Apple.win:length_batch(10) group by size
这一次 select
子句中没有包含分组的字段 size
,但是效果和上一个句子一样。Esper仍然会根据相同的 size
进行分组计算平均 price
,只不过计算结果中只有平均price和color,并且有十排结果。
-- 根据 size 乘 color 来对 10个Apple 事件进行分组计算平均 price select avg(price) as aPrice, size*color from Apple.win:length_batch(10) group by size*color
(2)@Hint
@Hint
是Esper中注解的其中一个,它是专用于 Group by
的。我们平时使用 Group by
的时候,会遇到分组数量太多的情况。比如以时间单位进行分组,那么内存使用一定是一个大问题。因此 @Hint
为其设计了两个属性,用于限制 Group by
的生存时间,使虚拟机能及时回收内存。这两个属性分别为 reclaim_group_aged
和 reclaim_group_freq
。
2.1 reclaim_group_aged
该属性值是正整数,以秒为单位,表示在 n
秒内,若分组的数据没有进行更新,则分组数据被Esper回收。例如:
-- 根据 color 对 10秒 内进入的 Apple事件 进行分组计算 平均price,并且对 5秒内没有数据更新的分组进行回收 @Hint('reclaim_group_aged=5')select avg(price) as aPrice, color from Apple.win:time(10 sec) group by color
2.2 reclaim_group_freq
该属性值是正整数,以秒为单位,表示每 n
秒清理一次分组,可清理的分组是 reclaim_group_aged
决定的,也就是说要使用该参数,就要配合 reclaim_group_aged
一起使用。可能不是很好理解,先看看例子:
-- 根据 color 对 10秒 内进入的 Apple事件 进行分组计算平均 price。对8秒内没有数据更新的分组进行回收,每2秒回收一次 @Hint('reclaim_group_aged=8,reclaim_group_freq=2')select avg(price) as aPrice, color from Apple.win:time(10 sec) group by color
对上面来说就是回收的条件为8秒内没有数据更新,且每8秒回收一次。这样的话有可能出现这么一种情况,上一个8秒的某个分组在下一个8秒还没到达时就已经持续8秒没有数据更新了,但是必须等到回收的时间点到达时才能回收这个分组。在分组产生很快的情况下,这样的回收不及时很可能会造成内存溢出。reclaim_group_freq
正是为这种情况做准备,回收的频率高一些,在一定程度上能提高内存的使用率。
上面这两个属性的值除了可以使用正整数之外,也可以使用预先定义的变量或者常量
13、Having
Having
的用法和SQL一样,后面跟的是对聚合函数的计算结果进行过滤。Where
子句不能包含聚合函数,所以就由 Having
来完成。示例如下:
-- 根据 size 来对 10个Apple 事件进行分组计算平均 price和color,并且排除平均 price大于5 的分组 select avg(price) as aPrice, color from Apple.win:length_batch(10) group by size having avg(price) > 5
通常 Having
配合 Group by
使用,如果没有使用 Group by
,那么就只有一组。例如:
-- 根据 size 来对 10个Apple 事件计算平均 price和color,如果平均 price大于5,则数据被排除掉 select avg(price) as aPrice, color from Apple.win:length_batch(10) having avg(price) > 5
Having
后面可以跟多个判断式子,并且用 and
,or
或者 not
进行连接。例如:
-- 根据size来对10个Apple事件计算平均price和color,如果平均price大于5并且平均size小于3,则数据被排除掉 select avg(price) as aPrice, color from Apple.win:length_batch(10) having avg(price) > 5 and avg(size) < 3
14、Output 输出
(1)基本语法
Output
是EPL中非常有用的东西,用来控制Esper对事件流计算结果的输出时间和形式,可以以固定频率,也可以是某个时间点输出。简单语法如下:
output [after suppression_def] [[all | first | last | snapshot] every time_period | output_rate events]
after suppression_def
:可选参数,表示先满足一定的条件再输出。all | first | last | snapshot
:表明输出结果的形式,默认值为all
。every output_rate
:表示输出频率,即每达到规定的频率就进行输出。time_period
:表示时间频率,上面文章有介绍到。output_rate events
:表示事件数量。
举例如下:
-- 30分钟内,每进入一个 OrderEvent,统计一次 sum price,并且每 60秒 输出一次统计结果 select sum(price) from OrderEvent.win:time(30 min) output snapshot every 60 seconds
(2)after
after
在 output
里的使用也很简单,语法如下:
output after time_period | number events [...]
time_period
:表示时间段number events
:表示事件数量,表示从EPL可用开始,经过一段时间或者接收到一定数量的事件再进行输出
举例如下:
-- 统计 20个Apple 事件的 sum price,并且在有 5个Apple 事件进入后才开始输出统计结果 select sum(price) from Apple.win:length(20) output after 5 events
上面这个句子从第一个进入的事件进行统计,直到进入了 5
个事件以后才输出统计结果,之后每进入一个事件输出一次(这是win:length
的特性)。
但是要注意的是,after
之后的时间长度和事件数量会影响之后的时间或者事件数量。什么意思?看个完整例子:
1、定义事件模型
@AllArgsConstructor @Getter public class Banana { private int id; private int price; }
2、测试
public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); String banana = Banana.class.getName(); // 统计最新 3个Banana 事件的 sum price,并且从EPL可用起,等待第一个事件进入后,以每两个事件进入的频率输出统计结果 String epl = "select sum(price) as sPrice from " + banana + ".win:length(3) output after 1 events snapshot every 2 events"; EPStatement state = admin.createEPL(epl); state.addListener((newEvents, oldEvents) -> { if (newEvents != null) { int price = (Integer) newEvents[0].get("sPrice"); System.out.println("Banana's sum price is " + price); } }); EPRuntime runtime = epService.getEPRuntime(); System.out.println("Send Banana Event 1"); runtime.sendEvent(new Banana(1, 6)); System.out.println("Send Banana Event 2"); runtime.sendEvent(new Banana(2, 3)); System.out.println("Send Banana Event 3"); runtime.sendEvent(new Banana(3, 1)); System.out.println("Send Banana Event 4"); runtime.sendEvent(new Banana(4, 2)); System.out.println("Send Banana Event 5"); runtime.sendEvent(new Banana(5, 4)); System.out.println("Send Banana Event 6"); runtime.sendEvent(new Banana(6, 5)); System.out.println("Send Banana Event 7"); runtime.sendEvent(new Banana(7, 1)); System.out.println("Send Banana Event 8"); runtime.sendEvent(new Banana(8, 8)); }
执行结果
Send Banana Event 1 Send Banana Event 2 Send Banana Event 3 香蕉总价:10 Send Banana Event 4 Send Banana Event 5 香蕉总价:7 Send Banana Event 6 Send Banana Event 7 香蕉总价:10 Send Banana Event 8
第一个总价:6+3+1 = 10
第二个总价:1+2+4 = 7
第三个总价:4+5+1 = 10
由此可见,after
之后的 every
子句要等到 after
后面的表达式满足后才生效。所以第一个事件进入后,every 2 events
生效,即等待两个事件进入后才输出结果。
对于时间,也是要等到 after
的子句满足后才开始计时。例如:
-- 从EPL可用开始计时,经过1分钟后,每5秒 输出一次当前 100秒 内的所有 Banana的 avg price(即:第一次输出在65秒时) select avg(price) from Banana.win:time(100 sec) after 1 min snapshot every 5 sec
(3)first、last、snapshot、all
每当达到输出时间点时,可以用这四个参数来控制输出内容。
3.1 first 首个
表示每一批可输出的内容中的第一个事件计算结果。
-- 每进入两个 Fruit 事件,输出这两个事件的第一个 select * from Fruit output first every 2 events
3.2 last 最后一个
表示每一批可输出的内容中的最后一个事件计算结果。
-- 每进入两个 Fruit 事件,输出这两个事件的第二个,也就是最后一个 select * from Fruit output last every 2 events
3.3 snapshot 快照
表示输出EPL所保持的所有事件计算结果,通常用来查看 view
或者 window
中现存的事件计算结果。比如:
-- 每进入两个事件输出5 sec内的所有事件,且不会将这些事件从5 sec范围内移除 select * from Fruit.win:time(5 sec) output snapshot every 2 events
3.4 all 全部
默认值。和 snapshot
类似,也是输出所有的事件,但是不同的是,snapshot
相当于对计算结果拍了一张照片,把结果复制出来并输出,而 all
是把计算结果直接输出,不会复制。比如:
select * from Fruit.win:time(5 sec) output all every 2 events
上面的句子表示每进入两个事件输出 5 sec
内包含的所有事件,输出的事件不再保留于 5 sec
范围内。
(4)Crontab Output 定时任务输出
output
的另一个语法可以建立定时输出,关键字是 at
。语法如下:
output [after suppression_def] [[all | first | last | snapshot] at (minutes, hours, days of month, months, days of week [, seconds])]
时间单位
minutes
hours
days of month
months
days of week
[, seconds]
举例:
-- 在 8点到17 点这段时间内,每 15分钟 输出一次 select * from Fruit output at (*/15,8:17,*,*,*)
(5)when 在...时
Output
还可以使用 when
来实现达到某个固定条件再输出的效果,一般通过变量,用户自定义的函数以及 output
内置的属性来实现。
基本语法:
output [after suppression_def] [[all | first | last | snapshot] when trigger_expression [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]]
trigger_expression
:返回true
或者false
,表示输出或者不输出。then set variable_name=assign_expression
:表示是当trigger_expression
被触发时,可对变量重新赋值。
完整例子如下:
1、定义事件模型
@Getter @AllArgsConstructor public class Pink { private int id; private int price; @Override public String toString() { return "id: " + id + ", price: " + price; } }
2、测试
public static void main(String[] args) throws InterruptedException { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); ConfigurationOperations config = admin.getConfiguration(); config.addVariable("exceed", boolean.class, false); String pink = Pink.class.getName(); // 当 exceed 为 true 时,输出所有进入EPL的事件,然后设置 exceed 为 false String epl = "select * from " + pink + " output when exceed then set exceed=false"; EPStatement state = admin.createEPL(epl); state.addListener((newEvents, oldEvents) -> { if (newEvents != null) { for (int i = 0; i < newEvents.length; i++) { Pink p = (Pink) newEvents[i].getUnderlying(); System.out.println("输出 Pink: " + p); } } }); EPRuntime runtime = epService.getEPRuntime(); Random r = new Random(10); for (int i = 1; i <= 10; i++) { int price = r.nextInt(30); Pink p = new Pink(i, price); System.out.println("Send Pink Event: " + p); runtime.sendEvent(p); // 当 price>10 时,exceed 变量为 true if (price > 10) { runtime.setVariableValue("exceed", true); // 因为主线程和输出线程不是同一个,所以这里休息1秒保证输出线程将事件全部输出,方便演示。 Thread.sleep(1000); } } }
执行结果
Send Pink Event: id: 1, price: 3 Send Pink Event: id: 2, price: 0 Send Pink Event: id: 3, price: 3 Send Pink Event: id: 4, price: 0 Send Pink Event: id: 5, price: 16 00:53:21.851 [main] DEBUG com.espertech.esper.epl.view.OutputConditionExpression - .update Ignoring variable callback 输出 Pink: id: 1, price: 3 输出 Pink: id: 2, price: 0 输出 Pink: id: 3, price: 3 输出 Pink: id: 4, price: 0 输出 Pink: id: 5, price: 16 Send Pink Event: id: 6, price: 16 00:53:22.854 [main] DEBUG com.espertech.esper.epl.view.OutputConditionExpression - .update Ignoring variable callback 输出 Pink: id: 6, price: 16 Send Pink Event: id: 7, price: 7 Send Pink Event: id: 8, price: 28 00:53:23.860 [main] DEBUG com.espertech.esper.epl.view.OutputConditionExpression - .update Ignoring variable callback 输出 Pink: id: 7, price: 7 输出 Pink: id: 8, price: 28 Send Pink Event: id: 9, price: 21 00:53:24.862 [main] DEBUG com.espertech.esper.epl.view.OutputConditionExpression - .update Ignoring variable callback 输出 Pink: id: 9, price: 21 Send Pink Event: id: 10, price: 4
从结果可以看出来。当 price>10
的时候,设置 exceed
变量为 true
,即可输出之前进入的所有事件,然后 set
子句将 exceed
设置为 false
,等待下一次 exceed=true
时触发输出。由于输出线程是单独的线程,所以如果不 sleep
,结果可能会和这个不同。
对于 when
关键字,Esper提供了一些内置的属性帮助我们实现更复杂的输出约束。如图所示:
以上5个属性就不多做解释了,使用方式是作为 trigger_expression
跟在 when
关键字的后面。例如:
-- 进入的 Apple 事件总数达到 5 个时才输出,且不清零 count_insert_total 属性,继续累加事件总数 select * from Apple output when count_insert_total=5 -- 移除的 Apple 事件总数达到4个时才输出,并清零 count_remove 属性 select * from Apple output when count_remove=4
另外,在使用 when
的时候,注意如下两点:
当
trigger_expression=true
时,Esper会输出从上一次输出之后到这次输出之间所有的insert stream
和remove stream
。若
trigger_expression
不断被触发并返回true
时,则Esper最短的输出间隔为100毫秒
。
(6)Context Terminated 上下文终止
Output
还针对 Context
专门设计了一个输出条件,即在 Context
终止时输出 Context
中的内容。
语法如下:
output when terminated [and termination_expression] [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]]]
when terminated
:是关键字,之前可以通过and
连接其他的式子一起使用。termination_expression
:是一个返回true
或者false
的表达式,同trigger_expression
一样。
举例如下:
// 在 MyContext 下,查询 context 的 id 并计算 Apple 的 sum price,当 Context 结束且输入的事件总数大于 10 时,输出。然后设置 FinishCompute 变量为 true context MyContext select context.id, sum(price) from Apple output when terminated and count_insert_total > 10 then set FinishCompute = true // 在 MyContext 下,计算 Apple 的 avg size,并每 1分钟 输出第一个进入的事件计算结果,当 context 结束时也输出一次计算结果 context MyContext select avg(size) from Apple output first every 1 min and when terminated
Output
和 Aggregation
、Group by
一起使用时,first
、last
、all
、snapshot
四个关键字产生的效果会比较特别。建议看看Esper的官方文档的 Appendix A,有相当完整的例子做说明。
外针对这四个关键字,只有使用 snapshot
是不会缓存计算结果。其他的关键字会缓存事件直到触发了输出条件才会释放,所以如果输入的数据量比较大,就要注意输出条件被触发前的内存使用量。
未经允许请勿转载:程序喵 » Esper教程 —— EPL 语法讲解(5)