Elasticsearch(4)Java API的使用

前面已经讲过,我们可以通过9200端口(HTTP)和9300端口(TCP)来与Elasticsearch交互,官方推荐使用HTTP方式,对应的Java api中的RestClient,而TCP方式交互对应的是TransportClient,这两者都是对ES的操作进行了封装,简单易用。但在TransportClient这个类的源码上已注明:Note that {@link TransportClient} will be deprecated in Elasticsearch 7.0 and removed in Elasticsearch 8.0,这是因为http方式的交互对各版本的兼容性更好,并且TransportClient在高并发下会有性能问题,将会被逐渐废弃掉。因此我们在本文中使用的是http的方式,即使用RestClient。

另外,RestClient也有两种版本,一种是普通版本的client,另一种是高级版的client(high-level-client),普通的client需要自己拼装http请求的url和body, high-level-client有现成的api方法可以直接使用,但不是特别完整,还需要增加新的api。由于high-level-client现有的api基本以满足我们日常开发使用,所以本文使用的是high-level-client。

RestClient是原生的ES jar包提供的api,除此之外,Spring还对原生的es api做了一层封装,即依赖包spring-data-elasticsearchspring-data-elasticsearch对TransportClient和RestClient都作了一层新的封装,对应的是ElasticsearchTemplateElasticsearchRestTemplate,因为TransportClient即将废弃,所以本文使用ElasticsearchRestTemplate作为演示示例。

本文使用的es jar包的版本为6.8.0,spring-data-elasticsearch的版本3.2.3.RELEASE,项目基于SpringBoot 2。

high-level-client

这一部分我们先来看看Elasticsearch原生的client该如何使用。

准备工作

1.引入es的依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<properties>
<elasticsearch.version>6.8.0</elasticsearch.version>
</properties>

<!-- elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

2.定义一个JavaBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.lzumetal.springboot.elasticsearch.entity;

import com.lzumetal.springboot.elasticsearch.config.Constants;
import lombok.Getter;
import lombok.Setter;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.io.Serializable;

@Getter
@Setter
public class Account implements Serializable {

/**
* account_number : 0
* firstname : Bradshaw
* address : 244 Columbus Place
* balance : 16623
* gender : F
* city : Hobucken
* employer : Euron
* state : CO
* age : 29
* email : bradshawmckenzie@euron.com
* lastname : Mckenzie
*/
@Id
private Long account_number;

@Field(type = FieldType.Keyword)
private String firstname;

@Field(type = FieldType.Keyword)
private String lastname;

@Field(type = FieldType.Text)
private String address;

@Field(type = FieldType.Long)
private Long balance;

@Field(type = FieldType.Keyword)
private String gender;

@Field(type = FieldType.Keyword)
private String city;

@Field(type = FieldType.Keyword)
private String employer;

@Field(type = FieldType.Keyword)
private String state;

@Field(type = FieldType.Integer)
private Integer age;

@Field(type = FieldType.Text)
private String email;

}

不同类型的FieldType讲解:

  • @Field(type=FieldType.Text, analyzer=”ik_max_word”) 表示该字段是一个文本,并作最大程度拆分,默认建立索引
  • @Field(type=FieldType.Text,index=false) 表示该字段是一个文本,不建立索引
  • @Field(type=FieldType.Date) 表示该字段是一个文本,日期类型,默认不建立索引
  • @Field(type=FieldType.Long) 表示该字段是一个长整型,默认建立索引
  • @Field(type=FieldType.Keyword) 表示该字段内容是一个文本并作为一个整体不可分,默认建立索引
  • @Field(type=FieldType.Float) 表示该字段内容是一个浮点类型并作为一个整体不可分,默认建立索引

注意:date 、float、long都是不能够被拆分的

3.配置RestHighLevelClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
public class EsClientConfig {

private String httpHosts = "192.168.10.1";


@Bean // 高版本客户端
public RestHighLevelClient restHighLevelClient() {
// 解析 hostlist 配置信息
String[] split = httpHosts.split(",");
HttpHost[] httpHostArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String item = split[i];
httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
}
// 创建RestHighLevelClient客户端
return new RestHighLevelClient(RestClient.builder(httpHostArray).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
requestConfigBuilder.setConnectTimeout(2000);
requestConfigBuilder.setSocketTimeout(8000);
requestConfigBuilder.setConnectionRequestTimeout(5000);
return requestConfigBuilder;

}
}));
}

}

操作索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 判断索引是否存在
*/
@Test
public void existsIndexTest() throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest();
getIndexRequest.indices(Constants.INDEX_NAME);
boolean exists = highLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
log.info("判断索引是否存在|exists={}", exists);
}


/**
* 创建索引
*
* @throws IOException
*/
@Test
public void createIndexTest() throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(Constants.INDEX_NAME);
createIndexRequest.settings(Settings.builder()
.put("index.number_of_shards", 3) // 分片数
.put("index.number_of_replicas", 2) // 副本数
);
CreateIndexResponse response = highLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info("创建索引|Acknowledged={},ShardsAcknowledged={}", response.isAcknowledged(), response.isShardsAcknowledged());

}


/**
* 删除索引
*
* @throws IOException
*/
@Test
public void deleteIndexTest() throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest();
deleteIndexRequest.indices(Constants.INDEX_NAME);
AcknowledgedResponse delete = highLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
log.info("删除索引|Acknowledged={}", delete.isAcknowledged());
}

操作文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
    /**
* 向索引中新增一个文档
*/
@Test
public void addDocTest() throws IOException {
String str = "{\"account_number\":1,\"balance\":39225,\"firstname\":\"Amber\",\"lastname\":\"Duke\",\"age\":32,\"gender\":\"M\",\"address\":\"880 Holmes Lane\",\"employer\":\"Pyrami\",\"email\":\"amberduke@pyrami.com\",\"city\":\"Brogan\",\"state\":\"IL\"}\n";
Account account = JsonUtils.fromJSON(str, Account.class);
IndexRequest indexRequest = new IndexRequest()
.index(Constants.INDEX_NAME)
.type(Constants.TYPE)
.id(String.valueOf(account.getAccount_number()))
.source(BeanMapUtil.objectToStringMap(account));
IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
//响应信息
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
String result = indexResponse.getResult().getLowercase();
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
log.info("新增文档|index={}, type={}, id={}, version={}, result={}, shardInfo={}",
index, type, id, version, result, shardInfo);
}


/**
* 根据index_name, type, id获取一个文档
*/
@Test
public void getDocTest() throws IOException {
GetRequest getRequest = new GetRequest()
.index(Constants.INDEX_NAME)
// .index(Constants.INDEX_NAME + "*")
.type(Constants.TYPE)
.id("1");
GetResponse response = highLevelClient.get(getRequest, RequestOptions.DEFAULT);
log.info("获取文档|{}", response.getSource());
}


/**
* 更新一个文档
*
* @throws IOException
*/
@Test
public void updateDocTest() throws IOException {
String str = "{\"account_number\":1,\"balance\":38000,\"firstname\":\"Amber\",\"lastname\":\"Duke\",\"age\":32,\"gender\":\"M\",\"address\":\"880 Holmes Lane\",\"employer\":\"Pyrami\",\"email\":\"amberduke@pyrami.com\",\"city\":\"Brogan\",\"state\":\"IL\"}\n";
Account account = JsonUtils.fromJSON(str, Account.class);
UpdateRequest updateRequest = new UpdateRequest()
.index(Constants.INDEX_NAME)
.type(Constants.TYPE)
.id("1")
.doc(BeanMapUtil.objectToStringMap(account));
UpdateResponse response = highLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("更新文档|{}", response);
}


/**
* 批量操作
*/
@Test
public void bulkTest() throws IOException {
String str = "{\"account_number\":1,\"balance\":38000,\"firstname\":\"Amber\",\"lastname\":\"Duke\",\"age\":32,\"gender\":\"M\",\"address\":\"880 Holmes Lane\",\"employer\":\"Pyrami\",\"email\":\"amberduke@pyrami.com\",\"city\":\"Brogan\",\"state\":\"IL\"}\n";
UpdateRequest updateRequest = new UpdateRequest()
.index(Constants.INDEX_NAME)
.type(Constants.TYPE)
.id("1")
.doc(BeanMapUtil.objectToStringMap(JsonUtils.fromJSON(str, Account.class)));

String str2 = "{\"account_number\":6,\"balance\":5686,\"firstname\":\"Hattie\",\"lastname\":\"Bond\",\"age\":36,\"gender\":\"M\",\"address\":\"671 Bristol Street\",\"employer\":\"Netagy\",\"email\":\"hattiebond@netagy.com\",\"city\":\"Dante\",\"state\":\"TN\"}\n";
Account account = JsonUtils.fromJSON(str2, Account.class);
IndexRequest indexRequest = new IndexRequest()
.index(Constants.INDEX_NAME)
.type(Constants.TYPE)
.id(String.valueOf(account.getAccount_number()))
.source(BeanMapUtil.objectToStringMap(account));

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(updateRequest)
.add(indexRequest);
BulkResponse response = highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : response) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
//TODO 新增成功的处理

} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
//TODO 修改成功的处理

} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
//TODO 删除成功的处理
}
}
}


/**
* 搜索文档(Doc)
*/
@Test
public void searchTest() {
AccountRequest request = new AccountRequest();
request.setFrom(0);
request.setSize(10);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

/* = 查询 */
String gender = request.getGender();
if (gender != null) {
boolQueryBuilder.must(QueryBuilders.termQuery("gender", gender));
}

/* in 查询 */
List<Integer> ages = request.getAges();
if (!CollectionUtils.isEmpty(ages)) {
boolQueryBuilder.must(QueryBuilders.termsQuery("age", ages));
}

/* rang 查询 */
Long startBalance = request.getStartBalance();
if (startBalance != null) {
boolQueryBuilder.must(QueryBuilders.rangeQuery("balance").gte(startBalance));
}
Long endBalance = request.getEndBalance();
if (endBalance != null) {
boolQueryBuilder.must(QueryBuilders.rangeQuery("balance").lte(endBalance));
}

String addressKey = request.getAddressKey();
if (addressKey != null) {
//使用match query进行搜索时,会对你输入的关键词进行分词。
boolQueryBuilder.must(QueryBuilders.matchQuery("address", addressKey));
}

/* like 查询*/
String preAddress = request.getPreAddress();
if (preAddress != null) {
boolQueryBuilder.must(QueryBuilders.prefixQuery("address", preAddress));
}

/* 或查询 */
String nameKey = request.getNameKey();
if (nameKey != null) {
BoolQueryBuilder shouldQuery = QueryBuilders.boolQuery()
.should(QueryBuilders.termQuery("firstname", nameKey))
.should(QueryBuilders.termQuery("lastname", nameKey));
boolQueryBuilder.must(shouldQuery);
}

/* != 查询 */
String notState = request.getNotState();
if (notState != null) {
boolQueryBuilder.mustNot(QueryBuilders.termsQuery("state", notState));
}

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// String[] fields = request.getFields();
// if (fields != null) {
// //指定需要查询的字段,以及不需要查询的字段
// searchSourceBuilder.fetchSource(fields, null);
// }
searchSourceBuilder.query(boolQueryBuilder);

//分页
searchSourceBuilder.from(request.getFrom());
searchSourceBuilder.size(request.getSize());

//指定排序
//searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
/* 排序的类型必须是 integer 类型 或者 date 类型,否则会报一个异常:Fielddata is disabled on text fields by default.... */
// searchSourceBuilder.sort(new FieldSortBuilder("account_number").order(SortOrder.ASC));

SearchRequest searchRequest = new SearchRequest()
.indices(Constants.INDEX_NAME)
.source(searchSourceBuilder);
try {
SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
RestStatus restStatus = searchResponse.status();
if (restStatus != RestStatus.OK) {
log.error("查询结果异常|" + restStatus);
return;
}

SearchHits searchHits = searchResponse.getHits();
long totalHits = searchHits.totalHits;
log.info("搜索结果|totalHits={}", totalHits);
if (totalHits > 0) {
for (SearchHit hit : searchHits.getHits()) {
String source = hit.getSourceAsString();
log.info("搜索结果|{}", source);
}
}
} catch (IOException e) {
e.printStackTrace();
}

}

聚合查询

1.Metric 聚合分析

1.Restful 操作示例
统计员工总数、工资最高值、工资最低值、工资平均工资、工资总和:

1
2
3
4
5
6
7
8
9
10
11
GET /mydlq-user/_search
{
"size": 0,
"aggs": {
"salary_stats": {
"stats": {
"field": "salary"
}
}
}
}

统计员工工资最低值:

1
2
3
4
5
6
7
8
9
10
11
GET /mydlq-user/_search
{
"size": 0,
"aggs": {
"salary_min": {
"min": {
"field": "salary"
}
}
}
}

统计员工工资最高值:

1
2
3
4
5
6
7
8
9
10
11
GET /mydlq-user/_search
{
"size": 0,
"aggs": {
"salary_max": {
"max": {
"field": "salary"
}
}
}
}

统计员工工资平均值:

1
2
3
4
5
6
7
8
9
10
11
GET /mydlq-user/_search
{
"size": 0,
"aggs": {
"salary_avg": {
"avg": {
"field": "salary"
}
}
}
}

统计员工工资总值:

1
2
3
4
5
6
7
8
9
10
11
GET /mydlq-user/_search
{
"size": 0,
"aggs": {
"salary_sum": {
"sum": {
"field": "salary"
}
}
}
}

统计员工总数:

1
2
3
4
5
6
7
8
9
10
11
GET /mydlq-user/_search
{
"size": 0,
"aggs": {
"employee_count": {
"value_count": {
"field": "salary"
}
}
}
}

统计员工工资百分位:

1
2
3
4
5
6
7
8
9
10
11
GET /mydlq-user/_search
{
"size": 0,
"aggs": {
"salary_percentiles": {
"percentiles": {
"field": "salary"
}
}
}
}

2.Java 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259

/**
* stats 统计员工总数、员工工资最高值、员工工资最低值、员工平均工资、员工工资总和
*/
public Object aggregationStats() {
String responseResult = "";
try {
// 设置聚合条件
AggregationBuilder aggr = AggregationBuilders.stats("salary_stats").field("salary");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(aggr);
// 设置查询结果不返回,只返回聚合结果
searchSourceBuilder.size(0);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
// 转换为 Stats 对象
ParsedStats aggregation = aggregations.get("salary_stats");
log.info("-------------------------------------------");
log.info("聚合信息:");
log.info("count:{}", aggregation.getCount());
log.info("avg:{}", aggregation.getAvg());
log.info("max:{}", aggregation.getMax());
log.info("min:{}", aggregation.getMin());
log.info("sum:{}", aggregation.getSum());
log.info("-------------------------------------------");
}
// 根据具体业务逻辑返回不同结果,这里为了方便直接将返回响应对象Json串
responseResult = response.toString();
} catch (IOException e) {
log.error("", e);
}
return responseResult;
}

/**
* min 统计员工工资最低值
*/
public Object aggregationMin() {
String responseResult = "";
try {
// 设置聚合条件
AggregationBuilder aggr = AggregationBuilders.min("salary_min").field("salary");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(aggr);
searchSourceBuilder.size(0);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
// 转换为 Min 对象
ParsedMin aggregation = aggregations.get("salary_min");
log.info("-------------------------------------------");
log.info("聚合信息:");
log.info("min:{}", aggregation.getValue());
log.info("-------------------------------------------");
}
// 根据具体业务逻辑返回不同结果,这里为了方便直接将返回响应对象Json串
responseResult = response.toString();
} catch (IOException e) {
log.error("", e);
}
return responseResult;
}

/**
* max 统计员工工资最高值
*/
public Object aggregationMax() {
String responseResult = "";
try {
// 设置聚合条件
AggregationBuilder aggr = AggregationBuilders.max("salary_max").field("salary");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(aggr);
searchSourceBuilder.size(0);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
// 转换为 Max 对象
ParsedMax aggregation = aggregations.get("salary_max");
log.info("-------------------------------------------");
log.info("聚合信息:");
log.info("max:{}", aggregation.getValue());
log.info("-------------------------------------------");
}
// 根据具体业务逻辑返回不同结果,这里为了方便直接将返回响应对象Json串
responseResult = response.toString();
} catch (IOException e) {
log.error("", e);
}
return responseResult;
}

/**
* avg 统计员工工资平均值
*/
public Object aggregationAvg() {
String responseResult = "";
try {
// 设置聚合条件
AggregationBuilder aggr = AggregationBuilders.avg("salary_avg").field("salary");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(aggr);
searchSourceBuilder.size(0);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
// 转换为 Avg 对象
ParsedAvg aggregation = aggregations.get("salary_avg");
log.info("-------------------------------------------");
log.info("聚合信息:");
log.info("avg:{}", aggregation.getValue());
log.info("-------------------------------------------");
}
// 根据具体业务逻辑返回不同结果,这里为了方便直接将返回响应对象Json串
responseResult = response.toString();
} catch (IOException e) {
log.error("", e);
}
return responseResult;
}

/**
* sum 统计员工工资总值
*/
public Object aggregationSum() {
String responseResult = "";
try {
// 设置聚合条件
SumAggregationBuilder aggr = AggregationBuilders.sum("salary_sum").field("salary");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(aggr);
searchSourceBuilder.size(0);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
// 转换为 Sum 对象
ParsedSum aggregation = aggregations.get("salary_sum");
log.info("-------------------------------------------");
log.info("聚合信息:");
log.info("sum:{}", String.valueOf((aggregation.getValue())));
log.info("-------------------------------------------");
}
// 根据具体业务逻辑返回不同结果,这里为了方便直接将返回响应对象Json串
responseResult = response.toString();
} catch (IOException e) {
log.error("", e);
}
return responseResult;
}

/**
* count 统计员工总数
*/
public Object aggregationCount() {
String responseResult = "";
try {
// 设置聚合条件
AggregationBuilder aggr = AggregationBuilders.count("employee_count").field("salary");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(aggr);
searchSourceBuilder.size(0);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
// 转换为 ValueCount 对象
ParsedValueCount aggregation = aggregations.get("employee_count");
log.info("-------------------------------------------");
log.info("聚合信息:");
log.info("count:{}", aggregation.getValue());
log.info("-------------------------------------------");
}
// 根据具体业务逻辑返回不同结果,这里为了方便直接将返回响应对象Json串
responseResult = response.toString();
} catch (IOException e) {
log.error("", e);
}
return responseResult;
}

/**
* percentiles 统计员工工资百分位
*/
public Object aggregationPercentiles() {
String responseResult = "";
try {
// 设置聚合条件
AggregationBuilder aggr = AggregationBuilders.percentiles("salary_percentiles").field("salary");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(aggr);
searchSourceBuilder.size(0);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
// 转换为 Percentiles 对象
ParsedPercentiles aggregation = aggregations.get("salary_percentiles");
log.info("-------------------------------------------");
log.info("聚合信息:");
for (Percentile percentile : aggregation) {
log.info("百分位:{}:{}", percentile.getPercent(), percentile.getValue());
}
log.info("-------------------------------------------");
}
// 根据具体业务逻辑返回不同结果,这里为了方便直接将返回响应对象Json串
responseResult = response.toString();
} catch (IOException e) {
log.error("", e);
}
return responseResult;
}

2.Bucket 聚合分析

1.Restful 操作示例
按岁数进行聚合分桶,统计各个岁数员工的人数:

1
2
3
4
5
6
7
8
9
10
11
12
GET mydlq-user/_search
{
"size": 0,
"aggs": {
"age_bucket": {
"terms": {
"field": "age",
"size": "10"
}
}
}
}

按工资范围进行聚合分桶,统计工资在 3000-5000、5000-9000 和 9000 以上的员工信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
GET mydlq-user/_search
{
"aggs": {
"salary_range_bucket": {
"range": {
"field": "salary",
"ranges": [
{
"key": "低级员工",
"to": 3000
},{
"key": "中级员工",
"from": 5000,
"to": 9000
},{
"key": "高级员工",
"from": 9000
}
]
}
}
}
}

按照时间范围进行分桶,统计 1985-1990 年和 1990-1995 年出生的员工信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
GET mydlq-user/_search
{
"size": 10,
"aggs": {
"date_range_bucket": {
"date_range": {
"field": "birthDate",
"format": "yyyy",
"ranges": [
{
"key": "出生日期1985-1990的员工",
"from": "1985",
"to": "1990"
},{
"key": "出生日期1990-1995的员工",
"from": "1990",
"to": "1995"
}
]
}
}
}
}

按工资多少进行聚合分桶,设置统计的最小值为 0,最大值为 12000,区段间隔为 3000:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
GET mydlq-user/_search
{
"size": 0,
"aggs": {
"salary_histogram": {
"histogram": {
"field": "salary",
"extended_bounds": {
"min": 0,
"max": 12000
},
"interval": 3000
}
}
}
}

按出生日期进行分桶:

1
2
3
4
5
6
7
8
9
10
11
12
13
GET mydlq-user/_search
{
"size": 0,
"aggs": {
"birthday_histogram": {
"date_histogram": {
"format": "yyyy",
"field": "birthDate",
"interval": "year"
}
}
}
}

2.Java 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/**
* 按岁数进行聚合分桶
*/
public Object aggrBucketTerms() {
try {
AggregationBuilder aggr = AggregationBuilders.terms("age_bucket").field("age");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(10);
searchSourceBuilder.aggregation(aggr);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status())) {
// 分桶
Terms byCompanyAggregation = aggregations.get("age_bucket");
List<? extends Terms.Bucket> buckets = byCompanyAggregation.getBuckets();
// 输出各个桶的内容
log.info("-------------------------------------------");
log.info("聚合信息:");
for (Terms.Bucket bucket : buckets) {
log.info("桶名:{} | 总数:{}", bucket.getKeyAsString(), bucket.getDocCount());
}
log.info("-------------------------------------------");
}
} catch (IOException e) {
log.error("", e);
}
return null;
}

/**
* 按工资范围进行聚合分桶
*/
public Object aggrBucketRange() {
try {
AggregationBuilder aggr = AggregationBuilders.range("salary_range_bucket")
.field("salary")
.addUnboundedTo("低级员工", 3000)
.addRange("中级员工", 5000, 9000)
.addUnboundedFrom("高级员工", 9000);
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(0);
searchSourceBuilder.aggregation(aggr);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status())) {
// 分桶
Range byCompanyAggregation = aggregations.get("salary_range_bucket");
List<? extends Range.Bucket> buckets = byCompanyAggregation.getBuckets();
// 输出各个桶的内容
log.info("-------------------------------------------");
log.info("聚合信息:");
for (Range.Bucket bucket : buckets) {
log.info("桶名:{} | 总数:{}", bucket.getKeyAsString(), bucket.getDocCount());
}
log.info("-------------------------------------------");
}
} catch (IOException e) {
log.error("", e);
}
return null;
}

/**
* 按照时间范围进行分桶
*/
public Object aggrBucketDateRange() {
try {
AggregationBuilder aggr = AggregationBuilders.dateRange("date_range_bucket")
.field("birthDate")
.format("yyyy")
.addRange("1985-1990", "1985", "1990")
.addRange("1990-1995", "1990", "1995");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(0);
searchSourceBuilder.aggregation(aggr);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status())) {
// 分桶
Range byCompanyAggregation = aggregations.get("date_range_bucket");
List<? extends Range.Bucket> buckets = byCompanyAggregation.getBuckets();
// 输出各个桶的内容
log.info("-------------------------------------------");
log.info("聚合信息:");
for (Range.Bucket bucket : buckets) {
log.info("桶名:{} | 总数:{}", bucket.getKeyAsString(), bucket.getDocCount());
}
log.info("-------------------------------------------");
}
} catch (IOException e) {
log.error("", e);
}
return null;
}

/**
* 按工资多少进行聚合分桶
*/
public Object aggrBucketHistogram() {
try {
AggregationBuilder aggr = AggregationBuilders.histogram("salary_histogram")
.field("salary")
.extendedBounds(0, 12000)
.interval(3000);
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(0);
searchSourceBuilder.aggregation(aggr);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status())) {
// 分桶
Histogram byCompanyAggregation = aggregations.get("salary_histogram");
List<? extends Histogram.Bucket> buckets = byCompanyAggregation.getBuckets();
// 输出各个桶的内容
log.info("-------------------------------------------");
log.info("聚合信息:");
for (Histogram.Bucket bucket : buckets) {
log.info("桶名:{} | 总数:{}", bucket.getKeyAsString(), bucket.getDocCount());
}
log.info("-------------------------------------------");
}
} catch (IOException e) {
log.error("", e);
}
return null;
}

/**
* 按出生日期进行分桶
*/
public Object aggrBucketDateHistogram() {
try {
AggregationBuilder aggr = AggregationBuilders.dateHistogram("birthday_histogram")
.field("birthDate")
.interval(1)
.dateHistogramInterval(DateHistogramInterval.YEAR)
.format("yyyy");
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(0);
searchSourceBuilder.aggregation(aggr);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status())) {
// 分桶
Histogram byCompanyAggregation = aggregations.get("birthday_histogram");

List<? extends Histogram.Bucket> buckets = byCompanyAggregation.getBuckets();
// 输出各个桶的内容
log.info("-------------------------------------------");
log.info("聚合信息:");
for (Histogram.Bucket bucket : buckets) {
log.info("桶名:{} | 总数:{}", bucket.getKeyAsString(), bucket.getDocCount());
}
log.info("-------------------------------------------");
}
} catch (IOException e) {
log.error("", e);
}

return null;
}

3.Metric 与 Bucket 聚合分析

1.Restful 操作示例
按照员工岁数分桶、然后统计每个岁数员工工资最高值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
GET mydlq-user/_search
{
"size": 0,
"aggs": {
"salary_bucket": {
"terms": {
"field": "age",
"size": "10"
},
"aggs": {
"salary_max_user": {
"top_hits": {
"size": 1,
"sort": [
{
"salary": {
"order": "desc"
}
}
]
}
}
}
}
}
}

2.java 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* topHits 按岁数分桶、然后统计每个员工工资最高值
*/
public Object aggregationTopHits() {
try {
AggregationBuilder testTop = AggregationBuilders.topHits("salary_max_user")
.size(1)
.sort("salary", SortOrder.DESC);
AggregationBuilder salaryBucket = AggregationBuilders.terms("salary_bucket")
.field("age")
.size(10);
salaryBucket.subAggregation(testTop);
// 查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(0);
searchSourceBuilder.aggregation(salaryBucket);
// 创建查询请求对象,将查询条件配置到其中
SearchRequest request = new SearchRequest("mydlq-user");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 输出内容
if (RestStatus.OK.equals(response.status())) {
// 分桶
Terms byCompanyAggregation = aggregations.get("salary_bucket");
List<? extends Terms.Bucket> buckets = byCompanyAggregation.getBuckets();
// 输出各个桶的内容
log.info("-------------------------------------------");
log.info("聚合信息:");
for (Terms.Bucket bucket : buckets) {
log.info("桶名:{}", bucket.getKeyAsString());
ParsedTopHits topHits = bucket.getAggregations().get("salary_max_user");
for (SearchHit hit:topHits.getHits()){
log.info(hit.getSourceAsString());
}
}
log.info("-------------------------------------------");
}
} catch (IOException e) {
log.error("", e);
}
return null;
}

ElasticsearchRestTemplate

要使用ElasticsearchRestTemplate还需要在上面的基础上引入spring-data-elasticsearch依赖包:

1
2
3
4
5
6
7
8
9
<properties>
<spring.data.elasticsearch.version>3.2.3.RELEASE</spring.data.elasticsearch.version>
</properties>

<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<version>${spring.data.elasticsearch.version}</version>
</dependency>

配置ElasticsearchRestTemplate的示例bean需要传入一个RestHighLevelClient实例:

1
2
3
4
@Bean
public ElasticsearchRestTemplate elasticsearchRestTemplate() {
return new ElasticsearchRestTemplate(restHighLevelClient);
}

api调用的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/**
* 判断索引是否存在
*/
@Test
public void existsIndexTest() throws IOException {
boolean exists = elasticsearchRestTemplate.indexExists(Constants.INDEX_NAME);
log.info("判断索引是否存在|exists={}", exists);
}


/**
* 创建索引
*
* @throws IOException
*/
@Test
public void createIndexTest() throws IOException {
Map<String, Object> map = new HashMap<>();
map.put("index.number_of_shards", 3); // 分片数
map.put("index.number_of_replicas", 2); //副本数
boolean create = elasticsearchRestTemplate.createIndex(Constants.INDEX_NAME, map);
log.info("创建索引|create={}", create);

}


/**
* 删除索引
*
* @throws IOException
*/
@Test
public void deleteIndexTest() throws IOException {
boolean delete = elasticsearchRestTemplate.deleteIndex(Constants.INDEX_NAME);
log.info("删除索引|delete={}", delete);
}


/**
* 向索引中新增一个文档
*/
@Test
public void addDocTest() throws IOException {
String str = "{\"account_number\":1,\"balance\":39225,\"firstname\":\"Amber\",\"lastname\":\"Duke\",\"age\":32,\"gender\":\"M\",\"address\":\"880 Holmes Lane\",\"employer\":\"Pyrami\",\"email\":\"amberduke@pyrami.com\",\"city\":\"Brogan\",\"state\":\"IL\"}\n";
Account account = JsonUtils.fromJSON(str, Account.class);
IndexQuery indexQuery = new IndexQueryBuilder()
.withIndexName(Constants.INDEX_NAME)
.withType(Constants.TYPE)
.withId(String.valueOf(account.getAccount_number()))
.withObject(account)
.build();
String index = elasticsearchRestTemplate.index(indexQuery);
//响应信息
log.info("新增文档|index={}", index);
}


/**
* 根据index_name, type, id获取一个文档
*/
@Test
public void getDocTest() throws IOException {
GetQuery getQuery = GetQuery.getById(String.valueOf(1));
Account account = elasticsearchRestTemplate.queryForObject(getQuery, Account.class);
log.info("获取文档|{}", account);
}


/**
* 更新一个文档
*
* @throws IOException
*/
@Test
public void updateDocTest() throws IOException {
String str = "{\"account_number\":1,\"balance\":38000,\"firstname\":\"Amber\",\"lastname\":\"Duke\",\"age\":32,\"gender\":\"M\",\"address\":\"880 Holmes Lane\",\"employer\":\"Pyrami\",\"email\":\"amberduke@pyrami.com\",\"city\":\"Brogan\",\"state\":\"IL\"}\n";
Account account = JsonUtils.fromJSON(str, Account.class);
UpdateRequest updateRequest = new UpdateRequest()
.index(Constants.INDEX_NAME)
.type(Constants.TYPE)
.id("1")
.doc(BeanMapUtil.objectToStringMap(account));
UpdateQuery updateQuery = new UpdateQueryBuilder()
.withIndexName(Constants.INDEX_NAME)
.withType(Constants.TYPE)
.withId("1")
.withUpdateRequest(updateRequest)
.build();
UpdateResponse response = elasticsearchRestTemplate.update(updateQuery);
log.info("更新文档|{}", response);
}


/**
* 批量操作
*/
@Test
public void bulkTest() throws IOException {
List<Account> accounts = new ArrayList<>();
String str = "{\"account_number\":1,\"balance\":38700,\"firstname\":\"Amber\",\"lastname\":\"Duke\",\"age\":32,\"gender\":\"M\",\"address\":\"880 Holmes Lane\",\"employer\":\"Pyrami\",\"email\":\"amberduke@pyrami.com\",\"city\":\"Brogan\",\"state\":\"IL\"}\n";
Account account1 = JsonUtils.fromJSON(str, Account.class);
accounts.add(account1);

String str2 = "{\"account_number\":6,\"balance\":5686,\"firstname\":\"Hattie\",\"lastname\":\"Bond\",\"age\":36,\"gender\":\"M\",\"address\":\"671 Bristol Street\",\"employer\":\"Netagy\",\"email\":\"hattiebond@netagy.com\",\"city\":\"Dante\",\"state\":\"TN\"}\n";
Account account2 = JsonUtils.fromJSON(str2, Account.class);
accounts.add(account2);

List<IndexQuery> indexQueryList = new ArrayList<>();
for (Account account : accounts) {
IndexQuery indexQuery = new IndexQueryBuilder()
.withIndexName(Constants.INDEX_NAME)
.withType(Constants.TYPE)
.withId(String.valueOf(account.getAccount_number()))
.withObject(account)
.build();
indexQueryList.add(indexQuery);
}
elasticsearchRestTemplate.bulkIndex(indexQueryList);
}



/**
* 搜索文档(Doc)
*/
@Test
public void searchTest() {
AccountRequest request = new AccountRequest();
request.setFrom(0);
request.setSize(10);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

/* = 查询 */
String gender = request.getGender();
if (gender != null) {
boolQueryBuilder.must(QueryBuilders.termQuery("gender", gender));
}

/* in 查询 */
List<Integer> ages = request.getAges();
if (!CollectionUtils.isEmpty(ages)) {
boolQueryBuilder.must(QueryBuilders.termsQuery("age", ages));
}

/* rang 查询 */
Long startBalance = request.getStartBalance();
if (startBalance != null) {
boolQueryBuilder.must(QueryBuilders.rangeQuery("balance").gte(startBalance));
}
Long endBalance = request.getEndBalance();
if (endBalance != null) {
boolQueryBuilder.must(QueryBuilders.rangeQuery("balance").lte(endBalance));
}

String addressKey = request.getAddressKey();
if (addressKey != null) {
//使用match query进行搜索时,会对你输入的关键词进行分词。
boolQueryBuilder.must(QueryBuilders.matchQuery("address", addressKey));
}

/* like 查询*/
String preAddress = request.getPreAddress();
if (preAddress != null) {
boolQueryBuilder.must(QueryBuilders.prefixQuery("address", preAddress));
}

/* 或查询 */
String nameKey = request.getNameKey();
if (nameKey != null) {
BoolQueryBuilder shouldQuery = QueryBuilders.boolQuery()
.should(QueryBuilders.termQuery("firstname", nameKey))
.should(QueryBuilders.termQuery("lastname", nameKey));
boolQueryBuilder.must(shouldQuery);
}

/* != 查询 */
String notState = request.getNotState();
if (notState != null) {
boolQueryBuilder.mustNot(QueryBuilders.termsQuery("state", notState));
}

Integer offset = request.getFrom();
Integer limit = request.getSize();
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withIndices(Constants.INDEX_NAME)
.withQuery(boolQueryBuilder)
.withPageable(PageRequest.of(offset / limit, limit))
.withSort(SortBuilders.fieldSort("age").order(SortOrder.ASC))
.build();

AggregatedPage<Account> page = elasticsearchRestTemplate.queryForPage(searchQuery, Account.class);
log.info("搜索结果|TotalElements={}|data={}", page.getTotalElements(), page.getContent());

}

------ 本文完 ------