来聊一聊 ElasticSearch 最新版的 Java 客户端

可能不少小伙伴都注意到了,从 ElasticSearch7.17 这个版本开始,原先的 Java 高级客户端
Java High Level REST Client 废弃了,不支持了。老实说,ElasticSearch 算是我用过的所有 Java 工具中,更新最为激进的一个了,在 Es7 中废弃了 TransportClient,7.17 又废弃了 TransportClient,那么现在用啥呢?现在的客户端叫做 Elasticsearch Java API Client。

一直偷懒选择无视 Elasticsearch Java API Client,不过最近工作中用到了,所以还是整篇文章和小伙伴们简单梳理一下 Elasticsearch Java API Client 的玩法。

下面的介绍我主要从索引操作和文档操作两个方面来给大家介绍。

不过需要跟大家强调的是,ElasticSearch 的 Java 客户端想要用的 6,必须要熟悉 ElasticSearch 的查询脚本,大家平时在工作中遇到 Es 相关的问题,我也都是建议先在 Kibana 中把操作脚本写好,然后再翻译成 Java 代码,或者直接拷贝到 Java 代码中,非常不建议上来就整 Java 代码,那样很容易出错。

如果你对 Es 的操作不熟悉,松哥录了免费的视频教程,大家可以参考:

不想看视频,也可以在微信公众号后台回复 es,有文档教程。

1. Elasticsearch Java API Client

Elasticsearch Java API Client 是 Elasticsearch 的官方 Java API,这个客户端为所有 Elasticsearch APIs 提供强类型的请求和响应。

这里跟大家解释下什么是强类型的请求和响应:因为所有的 Elasticsearch APIs 本质上都是一个 RESTful 风格的 HTTP 请求,所以当我们调用这些 Elasticsearch APIs 的时候,可以就当成普通的 HTTP 接口来对待,例如使用 HttpUrlConnection 或者 RestTemplate 等工具来直接调用,如果使用这些工具直接调用,就需要我们自己组装 JSON 参数,然后自己解析服务端返回的 JSON。而强类型的请求和响应则是系统把请求参数封装成一个对象了,我们调用对象中的方法去设置就可以了,不需要自己手动拼接 JSON 参数了,请求的结果系统也会封装成一个对象,不需要自己手动去解析 JSON 参数了。

小伙伴们看一下下面这个例子,我想查询 books 索引中,书名中包含 Java 关键字的图书:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class EsDemo02 {
public static void main(String[] args) throws IOException {
URL url = new URL("http://localhost:9200/books/_search?pretty");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("GET");
con.setRequestProperty("content-type","application/json;charset=utf-8");
//允许输出流/允许参数
con.setDoOutput(true);
//获取输出流
OutputStream out = con.getOutputStream();
String params = "{\n" +
" \"query\": {\n" +
" \"term\": {\n" +
" \"name\": {\n" +
" \"value\": \"java\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
out.write(params.getBytes());
if (con.getResponseCode() == 200) {
BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream()));
String str = null;
while ((str = br.readLine()) != null) {
System.out.println(str);
}
br.close();
}
}
}

小伙伴们看到,这就是一个普通的 HTTP 请求,请求参数就是查询的条件,这个条件是一个 JSON 字符串,需要我们自己组装,请求的返回值也是一个 JSON 字符串,这个 JSON 字符串也需要我们自己手动去解析,这种可以算是弱类型的请求和响应。

Elasticsearch Java API Client 具有如下特性:

  • 为所有 Elasticsearch APIs 提供强类型的请求和响应。
  • 所有 API 都有阻塞和异步版本。
  • 使用构建器模式,在创建复杂的嵌套结构时,可以编写简洁而可读的代码。
  • 通过使用对象映射器(如 Jackson 或任何实现了 JSON-B 的解析器),实现应用程序类的无缝集成。
  • 将协议处理委托给一个 http 客户端,如 Java Low Level REST Client,它负责所有传输级的问题。HTTP 连接池、重试、节点发现等等由它去完成。

关于第三点,松哥吐槽一句,确实简洁,但是可读性一般般吧。

另外还有两点需要注意:

  • Elasticsearch Java 客户端是向前兼容的,即该客户端支持与 Elasticsearch 的更大或相等的次要版本进行通信。
  • Elasticsearch Java 客户端只向后兼容默认的发行版本,并且没有做出保证。

好了,那就不废话了,开整吧。

2. 引入 Elasticsearch Java API Client

首先需要我们加依赖,对 JDK 的版本要求是 1.8,我们需要添加如下两个依赖:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.5.1</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>

如果是 Spring Boot 项目,就不用添加第二个依赖了,因为 Spring Boot 的 Web 中默认已经加了这个依赖了,但是 Spring Boot 一般需要额外添加下面这个依赖,出现这个原因是由于从 JavaEE 过渡到 JakartaEE 时衍生出来的一些问题,这里我就不啰嗦了,咱们直接加依赖即可:

1
2
3
4
5
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>

3. 建立连接

接下来我们需要用我们的 Java 客户端和 ElasticSearch 之间建立连接,建立连接的方式如下:

1
2
3
4
5
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

小伙伴们看到,这里一共有三个步骤:

  1. 首先创建一个低级客户端,这个其实松哥之前的视频中和大家讲过低级客户端的用法,这里就不再赘述。
  2. 接下来创建一个通信 Transport,并利用 JacksonJsonpMapper 做数据的解析。
  3. 最后创建一个阻塞的 Java 客户端。

上面这个是创建了一个阻塞的 Java 客户端,当然我们也可以创建非阻塞的 Java 客户端,如下:

1
2
3
4
5
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);

只有第三步和前面的不一样,其他都一样。

利用阻塞的 Java 客户端操作 Es 的时候会发生阻塞,也就是必须等到 Es 给出响应之后,代码才会继续执行;非阻塞的 Java 客户端则不会阻塞后面的代码执行,非阻塞的 Java 客户端一般通过回调函数处理请求的响应值。

有时候,我们可能还需要和 Es 之间建立 HTTPS 连接,那么需要在前面代码的基础之上,再套上一层 SSL,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
String fingerprint = "<certificate fingerprint>";
SSLContext sslContext = TransportUtils
.sslContextFromCaFingerprint(fingerprint);
BasicCredentialsProvider credsProv = new BasicCredentialsProvider();
credsProv.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(login, password)
);
RestClient restClient = RestClient
.builder(new HttpHost(host, port, "https"))
.setHttpClientConfigCallback(hc -> hc
.setSSLContext(sslContext)
.setDefaultCredentialsProvider(credsProv)
)
.build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

好了,关于建立连接,差不多就这些点。

4. 索引操作

Elasticsearch Java API Client 中最大的特色就是建造者模式+Lambda 表达式。例如,我想创建一个索引,方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void test99() throws IOException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
CreateIndexResponse createIndexResponse = client.indices().create(
c ->
c.index("javaboy_books")
.settings(s ->
s.numberOfShards("3")
.numberOfReplicas("1"))
.mappings(m ->
m.properties("name", p -> p.text(f -> f.analyzer("ik_max_word")))
.properties("birthday", p -> p.date(d -> d.format("yyyy-MM-dd"))))
.aliases("books_alias", f -> f.isWriteIndex(true)));
System.out.println("createResponse.acknowledged() = " + createIndexResponse.acknowledged());
System.out.println("createResponse.index() = " + createIndexResponse.index());
System.out.println("createResponse.shardsAcknowledged() = " + createIndexResponse.shardsAcknowledged());
}

小伙伴们看到,这里都是建造者模式和 Lambda 表达式,方法名称其实都很好理解(前提是你得熟悉 ElasticSearch 操作脚本),例如:

  • index 方法表示设置索引名称
  • settings 方法表示配置 setting 中的参数
  • numberOfShards 表示索引的分片数
  • numberOfReplicas 表示配置索引的副本数
  • mapping 表示配置索引中的映射规则
  • properties 表示配置索引中的具体字段
  • text 方法表示字段是 text 类型的
  • analyzer 表示配置字段的分词器
  • aliases 表示配置索引的别名

反正这里的方法都是见名知义的,上面这个就类似于下面这个请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
PUT javaboy_books
{
"settings": {
"number_of_replicas": 1,
"number_of_shards": 3
},
"mappings": {
"properties": {
"name":{
"type": "text",
"analyzer": "ik_max_word"
},
"birthday":{
"type": "date",
"format": "yyyy-MM-dd"
}
}
},
"aliases": {
"xxxx":{

}
}
}

小伙伴们在写的时候,脑子里要先有下面这个脚本,然后 Java 方法可以顺手拈来了。

最终创建好的索引如下图:

有的小伙伴可能觉得调这一大堆方法太啰里啰唆了,来个简单的,直接上 JSON,那也不是不可以,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Test
public void test98() throws IOException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
StringReader json = new StringReader("{\n" +
" \"settings\": {\n" +
" \"number_of_replicas\": 1,\n" +
" \"number_of_shards\": 3\n" +
" },\n" +
" \"mappings\": {\n" +
" \"properties\": {\n" +
" \"name\":{\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_max_word\"\n" +
" },\n" +
" \"birthday\":{\n" +
" \"type\": \"date\",\n" +
" \"format\": \"yyyy-MM-dd\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\": {\n" +
" \"xxxx\":{\n" +
" \n" +
" }\n" +
" }\n" +
"}");
CreateIndexResponse createIndexResponse = client.indices().create(
c ->
c.index("javaboy_books").withJson(json));
System.out.println("createResponse.acknowledged() = " + createIndexResponse.acknowledged());
System.out.println("createResponse.index() = " + createIndexResponse.index());
System.out.println("createResponse.shardsAcknowledged() = " + createIndexResponse.shardsAcknowledged());
}

这是直接把 JSON 参数给拼接出来,就不需要一堆建造者+Lambda 了。

如果你想删除索引呢?如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void test06() throws IOException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
//删除一个索引
DeleteIndexResponse delete = client.indices().delete(f ->
f.index("my-index")
);
System.out.println("delete.acknowledged() = " + delete.acknowledged());
}

这个表示删除一个名为 my-index 的索引。

好了,关于索引的操作我就说这两点。

可能有的小伙伴会说,ElasticSearch 中创建索引可以配置很多参数你都没讲。在我看来,哪些很多参数其实跟这个 Java API 没有多大关系,只要你会写查询脚本,就自然懂得 Java API 中该调用哪个方法,退一万步讲,你会脚本,不懂 Java API 的方法,那么就像上面那样,直接把你的 JSON 拷贝过来,作为 Java API 的参数即可。

5. 文档操作

5.1 添加文档

先来看文档的添加操作。

如下表示我想给一个名为 books 的索引中添加一个 id 为 890 的书:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void test07() throws IOException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
Book book = new Book();
book.setId(890);
book.setName("深入理解Java虚拟机");
book.setAuthor("xxx");
//添加一个文档
//这是一个同步请求,请求会卡在这里
IndexResponse response = client.index(i -> i.index("books").document(book).id("890"));
System.out.println("response.result() = " + response.result());
System.out.println("response.id() = " + response.id());
System.out.println("response.seqNo() = " + response.seqNo());
System.out.println("response.index() = " + response.index());
System.out.println("response.shards() = " + response.shards());
}

添加成功之后,返回的 IndexResponse 对象其实就是对下面这个 JSON 的封装:

现在我们只需要调用相应的方法,就可以获取到 JSON 相关的属性了。

5.2 删除文档

如下表示删除 books 索引中 id 为 891 的文档:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void test09() {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
client.delete(d -> d.index("books").id("891")).whenComplete((resp, e) -> {
System.out.println("resp.result() = " + resp.result());
});
}

删除这里我用了异步非阻塞的客户端来给小伙伴们演示的,异步非阻塞的话,就使用 whenComplete 方法处理回调就行了,里边有两个参数,一个是正常情况下返回的对象,另外一个则是出错时候的异常。

5.3 查询文档

最后,就是查询了。这应该是大家日常开发中使用较多的功能项了,不过我还是前面的态度,查询的关键不在 Java API,而在于你对 ElasticSearch 脚本的掌握程度。

所以我这里举个简单的例子,小伙伴们大致了解下 Java API 的方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
public void test01() throws IOException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
SearchRequest request = new SearchRequest.Builder()
//去哪个索引里搜索
.index("books")
.query(QueryBuilders.term().field("name").value("java").build()._toQuery())
.build();
SearchResponse<Book> search = client.search(request, Book.class);
System.out.println("search.toString() = " + search.toString());
long took = search.took();
System.out.println("took = " + took);
boolean b = search.timedOut();
System.out.println("b = " + b);
ShardStatistics shards = search.shards();
System.out.println("shards = " + shards);
HitsMetadata<Book> hits = search.hits();
TotalHits total = hits.total();
System.out.println("total = " + total);
Double maxScore = hits.maxScore();
System.out.println("maxScore = " + maxScore);
List<Hit<Book>> list = hits.hits();
for (Hit<Book> bookHit : list) {
System.out.println("bookHit.source() = " + bookHit.source());
System.out.println("bookHit.score() = " + bookHit.score());
System.out.println("bookHit.index() = " + bookHit.index());
}
}

上面这个例子是一个 term 查询,查询 books 索引中书名 name 中包含 java 关键字的图书,等价于下面这个查询:

1
2
3
4
5
6
7
8
9
10
GET books/_search
{
"query": {
"term": {
"name": {
"value": "java"
}
}
}
}

如果希望能够对查询关键字分词之后查询,那么可以使用 match 查询,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Test
public void test03() throws IOException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
SearchResponse<Book> search = client.search(s -> {
s.index("books")
.query(q -> {
q.match(m -> {
m.field("name").query("美术计算机");
return m;
});
return q;
});
return s;
}, Book.class);
System.out.println("search.toString() = " + search.toString());
long took = search.took();
System.out.println("took = " + took);
boolean b = search.timedOut();
System.out.println("b = " + b);
ShardStatistics shards = search.shards();
System.out.println("shards = " + shards);
HitsMetadata<Book> hits = search.hits();
TotalHits total = hits.total();
System.out.println("total = " + total);
Double maxScore = hits.maxScore();
System.out.println("maxScore = " + maxScore);
List<Hit<Book>> list = hits.hits();
for (Hit<Book> bookHit : list) {
System.out.println("bookHit.source() = " + bookHit.source());
System.out.println("bookHit.score() = " + bookHit.score());
System.out.println("bookHit.index() = " + bookHit.index());
}
}

为了让小伙伴们看到这个 Java 客户端的不同用法,上面两个查询的例子,我分别使用了构造查询请求和建造者+Lambda 的方式。

match 查询就调用 match 方法就行了,设置查询关键字即可,这个查询等价于下面这个查询:

1
2
3
4
5
6
7
8
GET books/_search
{
"query": {
"match": {
"name": "美术计算机"
}
}
}

如果你觉得这种调用各种方法拼接参数的方式不习惯,那么也可以直接上 JSON,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Test
public void test04() throws IOException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
String key = "java";
StringReader sr = new StringReader("{\n" +
" \"query\": {\n" +
" \"term\": {\n" +
" \"name\": {\n" +
" \"value\": \"" + key + "\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}");
SearchRequest request = new SearchRequest.Builder()
.withJson(sr)
.build();
SearchResponse<Book> search = client.search(request, Book.class);
System.out.println("search.toString() = " + search.toString());
long took = search.took();
System.out.println("took = " + took);
boolean b = search.timedOut();
System.out.println("b = " + b);
ShardStatistics shards = search.shards();
System.out.println("shards = " + shards);
HitsMetadata<Book> hits = search.hits();
TotalHits total = hits.total();
System.out.println("total = " + total);
Double maxScore = hits.maxScore();
System.out.println("maxScore = " + maxScore);
List<Hit<Book>> list = hits.hits();
for (Hit<Book> bookHit : list) {
System.out.println("bookHit.source() = " + bookHit.source());
System.out.println("bookHit.score() = " + bookHit.score());
System.out.println("bookHit.index() = " + bookHit.index());
}
}

可以看到,直接把查询的 JSON 参数传进来也是可以的。这样我们就可以先在 Kibana 中写好脚本,然后直接将脚本拷贝到 Java 代码中来执行就行了。

好啦,关于 Es 中新的 Java 客户端,我就和大家说这么多,最后再强调一下,这其实不是重点,玩 Es 的重点是把 Es 的各种查询参数搞懂,那么 Java 代码其实就是顺手拈来的事了。

最后,如果大家对 Es 不熟悉,可以看看松哥录的这个免费视频教程: