一、ES Client 简介 1. ES是一个服务,采用C/S结构
2. 回顾 ES的架构
3. ES支持的客户端连接方式 3.1 REST API ,端口 9200
这种连接方式对应于架构图中的RESTful style API这一层,这种客户端的连接方式是RESTful风格的,使用http的方式进行连接
3.2 Transport 连接 端口 9300
这种连接方式对应于架构图中的Transport这一层,这种客户端连接方式是直接连接ES的节点,使用TCP的方式进行连接
4. ES提供了多种编程语言客户端
官网可以了解详情:
https://www.elastic.co/guide/en/elasticsearch/client/index.html
二、Java REST Client介绍 1. ES提供了两个JAVA REST client 版本 Java Low Level REST Client: 低级别的REST客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串。兼容所有ES版本。
Java High Level REST Client: 高级别的REST客户端,基于低级别的REST客户端,增加了编组请求JSON串、解析响应JSON串等相关api。使用的版本需要保持和ES服务端的版本一致,否则会有版本问题。
2. Java Low Level REST Client 说明 特点,maven 引入、使用介绍: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html
API doc :https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-client/6.2.4/index.html
3. Java High Level REST Client 说明 从6.0.0开始加入的,目的是以java面向对象的方式来进行请求、响应处理。每个API 支持 同步/异步 两种方式,同步方法直接返回一个结果对象。异步的方法以async为后缀,通过listener参数来通知结果。高级java REST 客户端依赖Elasticsearch core project
兼容性说明:
依赖 java1.8 和 Elasticsearch core project请使用与服务端ES版本一致的客户端版本
4. Java High Level REST Client maven 集成 1 2 3 4 5 <dependency > <groupId > org.elasticsearch.client</groupId > <artifactId > elasticsearch-rest-high-level-client</artifactId > <version > 6.2.4</version > </dependency >
5. Java High Level REST Client 初始化 1 2 3 4 RestHighLevelClient client = new RestHighLevelClient ( RestClient.builder( new HttpHost ("localhost" , 9200 , "http" ), new HttpHost ("localhost" , 9201 , "http" )));
给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求
Client 不再使用了,记得关闭它:
API及用法示例,请参考:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-supported-apis.html
三、Java High Level REST Client 使用示例 准备:
编写代码之前首先在maven工程里面引入和ES服务端版本一样的Java客户端
1 2 3 4 5 <dependency > <groupId > org.elasticsearch.client</groupId > <artifactId > elasticsearch-rest-high-level-client</artifactId > <version > 6.2.4</version > </dependency >
给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求:
InitDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient;public class InitDemo { public static RestHighLevelClient getClient () { RestHighLevelClient client = new RestHighLevelClient ( RestClient.builder(new HttpHost ("localhost" , 9200 , "http" ), new HttpHost ("localhost" , 9201 , "http" ))); return client; } }
注意使用ES的客户端时类比之前我们在Kibana进行的ES的相关操作,这样使用起来更加有效果:
1. Create index 创建索引 CreateIndexDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import java.io.IOException;import org.elasticsearch.action.admin.indices.alias.Alias;import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.xcontent.XContentType;public class CreateIndexDemo { public static void main (String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { CreateIndexRequest request = new CreateIndexRequest ("mess" ); request.settings(Settings.builder().put("index.number_of_shards" , 3 ) .put("index.number_of_replicas" , 2 ) .put("analysis.analyzer.default.tokenizer" , "ik_smart" ) ); request.mapping("_doc" , " {\n" + " \"_doc\": {\n" + " \"properties\": {\n" + " \"message\": {\n" + " \"type\": \"text\"\n" + " }\n" + " }\n" + " }\n" + " }" , XContentType.JSON); request.alias(new Alias ("mmm" )); CreateIndexResponse createIndexResponse = client.indices() .create(request); boolean acknowledged = createIndexResponse.isAcknowledged(); boolean shardsAcknowledged = createIndexResponse .isShardsAcknowledged(); System.out.println("acknowledged = " + acknowledged); System.out.println("shardsAcknowledged = " + shardsAcknowledged); } catch (IOException e) { e.printStackTrace(); } } }
运行结果:
acknowledged = true
shardsAcknowledged = true
2. index document 索引文档,即往索引里面放入文档数据.类似于数据库里面向表里面插入一行数据,一行数据就是一个文档
IndexDocumentDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 import java.io.IOException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.ElasticsearchException;import org.elasticsearch.action.DocWriteResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.support.replication.ReplicationResponse;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;import org.elasticsearch.rest.RestStatus;public class IndexDocumentDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { IndexRequest request = new IndexRequest ( "mess" , "_doc" , "1" ); String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}" ; request.source(jsonString, XContentType.JSON); IndexResponse indexResponse = null ; try { indexResponse = client.index(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { logger.error("冲突了,请在此写冲突处理逻辑!\n" + e.getDetailedMessage()); } logger.error("索引异常" , e); } if (indexResponse != null ) { String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { System.out.println("新增文档成功,处理逻辑代码写到这里。" ); } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { System.out.println("修改文档成功,处理逻辑代码写到这里。" ); } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { } if (shardInfo.getFailed() > 0 ) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); System.out.println("副本失败原因:" + reason); } } } } catch (IOException e) { e.printStackTrace(); } } }
运行结果:
新增文档成功,处理逻辑代码写到这里。
3. get document 获取文档数据
GetDocumentDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 import java.io.IOException;import java.util.Map;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.ElasticsearchException;import org.elasticsearch.action.get.GetRequest;import org.elasticsearch.action.get.GetResponse;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.Strings;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.fetch.subphase.FetchSourceContext;public class GetDocumentDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { GetRequest request = new GetRequest ( "mess" , "_doc" , "1" ); String[] includes = new String []{"message" , "*Date" }; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext (true , includes, excludes); request.fetchSourceContext(fetchSourceContext); GetResponse getResponse = null ; try { getResponse = client.get(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { logger.error("没有找到该id的文档" ); } if (e.status() == RestStatus.CONFLICT) { logger.error("获取时版本冲突了,请在此写冲突处理逻辑!" ); } logger.error("获取文档异常" , e); } if (getResponse != null ) { String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); byte [] sourceAsBytes = getResponse.getSourceAsBytes(); logger.info("index:" + index + " type:" + type + " id:" + id); logger.info(sourceAsString); } else { logger.error("没有找到该id的文档" ); } } } catch (IOException e) { e.printStackTrace(); } } }
4. Bulk 批量索引文档,即批量往索引里面放入文档数据.类似于数据库里面批量向表里面插入多行数据,一行数据就是一个文档
BulkDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 import java.io.IOException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.DocWriteRequest;import org.elasticsearch.action.DocWriteResponse;import org.elasticsearch.action.bulk.BulkItemResponse;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.delete.DeleteResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.update.UpdateResponse;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;public class BulkDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { BulkRequest request = new BulkRequest (); request.add(new IndexRequest ("mess" , "_doc" , "1" ) .source(XContentType.JSON,"field" , "foo" )); request.add(new IndexRequest ("mess" , "_doc" , "2" ) .source(XContentType.JSON,"field" , "bar" )); request.add(new IndexRequest ("mess" , "_doc" , "3" ) .source(XContentType.JSON,"field" , "baz" )); BulkResponse bulkResponse = client.bulk(request); if (bulkResponse != null ) { for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } } } } catch (IOException e) { e.printStackTrace(); } } }
5. search 搜索数据
SearchDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 import java.io.IOException;import java.util.List;import java.util.Map;import java.util.concurrent.TimeUnit;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.action.search.ShardSearchFailure;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.unit.TimeValue;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.elasticsearch.search.suggest.Suggest;import org.elasticsearch.search.suggest.term.TermSuggestion;public class SearchDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { SearchRequest searchRequest = new SearchRequest ("bank" ); searchRequest.types("_doc" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); sourceBuilder.query(QueryBuilders.termQuery("age" , 24 )); sourceBuilder.from(0 ); sourceBuilder.size(10 ); sourceBuilder.timeout(new TimeValue (60 , TimeUnit.SECONDS)); searchRequest.source(sourceBuilder); SearchResponse searchResponse = client.search(searchRequest); RestStatus status = searchResponse.status(); TimeValue took = searchResponse.getTook(); Boolean terminatedEarly = searchResponse.isTerminatedEarly(); boolean timedOut = searchResponse.isTimedOut(); int totalShards = searchResponse.getTotalShards(); int successfulShards = searchResponse.getSuccessfulShards(); int failedShards = searchResponse.getFailedShards(); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { } SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits(); float maxScore = hits.getMaxScore(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore(); String sourceAsString = hit.getSourceAsString(); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); logger.info("index:" + index + " type:" + type + " id:" + id); logger.info(sourceAsString); } } catch (IOException e) { logger.error(e); } } }
6. highlight 高亮 HighlightDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 import java.io.IOException;import java.util.Map;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.text.Text;import org.elasticsearch.index.query.QueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;public class HighlightDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { SearchRequest searchRequest = new SearchRequest ("hl_test" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title" , "lucene solr" ); sourceBuilder.query(matchQueryBuilder); HighlightBuilder highlightBuilder = new HighlightBuilder (); highlightBuilder.requireFieldMatch(false ).field("title" ).field("content" ) .preTags("<strong>" ).postTags("</strong>" ); sourceBuilder.highlighter(highlightBuilder); searchRequest.source(sourceBuilder); SearchResponse searchResponse = client.search(searchRequest); if (RestStatus.OK.equals(searchResponse.status())) { SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore(); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); logger.info("index:" + index + " type:" + type + " id:" + id); logger.info("sourceMap : " + sourceAsMap); Map<String, HighlightField> highlightFields = hit.getHighlightFields(); HighlightField highlight = highlightFields.get("title" ); if (highlight != null ) { Text[] fragments = highlight.fragments(); if (fragments != null ) { String fragmentString = fragments[0 ].string(); logger.info("title highlight : " + fragmentString); } } highlight = highlightFields.get("content" ); if (highlight != null ) { Text[] fragments = highlight.fragments(); if (fragments != null ) { String fragmentString = fragments[0 ].string(); logger.info("content highlight : " + fragmentString); } } } } } catch (IOException e) { logger.error(e); } } }
7. suggest 查询建议 SuggestDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 import java.io.IOException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.elasticsearch.search.suggest.Suggest;import org.elasticsearch.search.suggest.SuggestBuilder;import org.elasticsearch.search.suggest.SuggestBuilders;import org.elasticsearch.search.suggest.SuggestionBuilder;import org.elasticsearch.search.suggest.completion.CompletionSuggestion;import org.elasticsearch.search.suggest.term.TermSuggestion;public class SuggestDemo { private static Logger logger = LogManager.getRootLogger(); public static void termSuggest () { try (RestHighLevelClient client = InitDemo.getClient();) { SearchRequest searchRequest = new SearchRequest ("mess" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); sourceBuilder.size(0 ); SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user" ).text("kmichy" ); SuggestBuilder suggestBuilder = new SuggestBuilder (); suggestBuilder.addSuggestion("suggest_user" , termSuggestionBuilder); sourceBuilder.suggest(suggestBuilder); searchRequest.source(sourceBuilder); SearchResponse searchResponse = client.search(searchRequest); if (RestStatus.OK.equals(searchResponse.status())) { Suggest suggest = searchResponse.getSuggest(); TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user" ); for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { logger.info("text: " + entry.getText().string()); for (TermSuggestion.Entry.Option option : entry) { String suggestText = option.getText().string(); logger.info(" suggest option : " + suggestText); } } } } catch (IOException e) { logger.error(e); } } public static void completionSuggester () { try (RestHighLevelClient client = InitDemo.getClient();) { SearchRequest searchRequest = new SearchRequest ("music" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); sourceBuilder.size(0 ); SuggestionBuilder termSuggestionBuilder = SuggestBuilders.completionSuggestion("suggest" ).prefix("lucene s" ) .skipDuplicates(true ); SuggestBuilder suggestBuilder = new SuggestBuilder (); suggestBuilder.addSuggestion("song-suggest" , termSuggestionBuilder); sourceBuilder.suggest(suggestBuilder); searchRequest.source(sourceBuilder); SearchResponse searchResponse = client.search(searchRequest); if (RestStatus.OK.equals(searchResponse.status())) { Suggest suggest = searchResponse.getSuggest(); CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest" ); for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) { logger.info("text: " + entry.getText().string()); for (CompletionSuggestion.Entry.Option option : entry) { String suggestText = option.getText().string(); logger.info(" suggest option : " + suggestText); } } } } catch (IOException e) { logger.error(e); } } public static void main (String[] args) { termSuggest(); logger.info("--------------------------------------" ); completionSuggester(); } }
8. aggregation 聚合分析 AggregationDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 import java.io.IOException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.aggregations.AggregationBuilders;import org.elasticsearch.search.aggregations.Aggregations;import org.elasticsearch.search.aggregations.BucketOrder;import org.elasticsearch.search.aggregations.bucket.terms.Terms;import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;import org.elasticsearch.search.aggregations.metrics.avg.Avg;import org.elasticsearch.search.builder.SearchSourceBuilder;public class AggregationDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { SearchRequest searchRequest = new SearchRequest ("bank" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); sourceBuilder.size(0 ); TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age" ) .field("age" ).order(BucketOrder.aggregation("average_balance" , true )); aggregation.subAggregation(AggregationBuilders.avg("average_balance" ) .field("balance" )); sourceBuilder.aggregation(aggregation); searchRequest.source(sourceBuilder); SearchResponse searchResponse = client.search(searchRequest); if (RestStatus.OK.equals(searchResponse.status())) { Aggregations aggregations = searchResponse.getAggregations(); Terms byAgeAggregation = aggregations.get("by_age" ); logger.info("aggregation by_age 结果" ); logger.info("docCountError: " + byAgeAggregation.getDocCountError()); logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts()); logger.info("------------------------------------" ); for (Bucket buck : byAgeAggregation.getBuckets()) { logger.info("key: " + buck.getKeyAsNumber()); logger.info("docCount: " + buck.getDocCount()); logger.info("docCountError: " + buck.getDocCountError()); Avg averageBalance = buck.getAggregations().get("average_balance" ); logger.info("average_balance: " + averageBalance.getValue()); logger.info("------------------------------------" ); } } } catch (IOException e) { logger.error(e); } } }
9. 官网资料 各种查询对应的QueryBuilder:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-query-builders.html
各种聚合对应的AggregationBuilder:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-aggregation-builders.html
四、Java Client 1. Java Client 说明 java client 使用 TransportClient,各种操作本质上都是异步的(可以用 listener,或返回 Future )。
注意:ES的发展规划中在7.0版本开始将废弃 TransportClient,8.0版本中将完全移除 TransportClient,取而代之的是High Level REST Client。
High Level REST Client 中的操作API和java client 大多是一样的。
2. 官方学习链接 https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html
3. 兼容性说明 请使用与服务端ES版本一致的客户端版本
4. Java Client maven 集成 1 2 3 4 5 <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.2 .4 </version> </dependency>
5. Java Client logger 日志器说明 使用的是 log4j2 日志组件。如果要使用其他的日志组件,可使用slf4j作桥
6. Init Client
Init Client setting 可用参数说明:
cluster.name 指定集群的名字,如果集群的名字不是默认的elasticsearch,需指定。
client.transport.sniff 设置为true,将自动嗅探整个集群,自动加入集群的节点到连接列表中。
client.transport.ignore_cluster_name 设置为true将忽略连接节点的集群名称验证。(因为0.19.4)
client.transport.ping_timeout 等待节点ping响应的时间。默认为5s。
client.transport.nodes_sampler_interval 对列出和连接的节点进行采样/ ping的频率。默认为5s。
五、Java Client使用示例 注意:TransPort客户端的使用和RESTful风格的使用基本一致,除了获取客户端不一样,还有发送请求有的不一样外
准备:
编写示例之前首先在maven工程里面引入和ES服务端版本一样的Java客户端。
1 2 3 4 5 <dependency > <groupId > org.elasticsearch.client</groupId > <artifactId > transport</artifactId > <version > 6.2.4</version > </dependency >
给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求:
InitDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import java.net.InetAddress;import java.net.UnknownHostException;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.transport.client.PreBuiltTransportClient;public class InitDemo { private static TransportClient client; public static TransportClient getClient () throws UnknownHostException { if (client == null ) { Settings settings = Settings.builder() .put("client.transport.sniff" , true ) .build(); client = new PreBuiltTransportClient (settings) .addTransportAddress(new TransportAddress (InetAddress.getByName("127.0.0.1" ), 9300 )); } return client; } }
注意使用ES的客户端时类比之前我们在Kibana进行的ES的相关操作,这样使用起来更加有效果:
1. Create index 创建索引 CreateIndexDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 import java.io.IOException;import java.util.concurrent.ExecutionException;import org.elasticsearch.action.admin.indices.alias.Alias;import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.xcontent.XContentType;public class CreateIndexDemo { public static void main (String[] args) { try (TransportClient client = InitDemo.getClient();) { CreateIndexRequest request = new CreateIndexRequest ("mess" ); request.settings(Settings.builder().put("index.number_of_shards" , 3 ) .put("index.number_of_replicas" , 2 ) .put("analysis.analyzer.default.tokenizer" , "ik_smart" ) ); request.mapping("_doc" , " {\n" + " \"_doc\": {\n" + " \"properties\": {\n" + " \"message\": {\n" + " \"type\": \"text\"\n" + " }\n" + " }\n" + " }\n" + " }" , XContentType.JSON); request.alias(new Alias ("mmm" )); CreateIndexResponse createIndexResponse = client.admin().indices() .create(request).get(); boolean acknowledged = createIndexResponse.isAcknowledged(); boolean shardsAcknowledged = createIndexResponse .isShardsAcknowledged(); System.out.println("acknowledged = " + acknowledged); System.out.println("shardsAcknowledged = " + shardsAcknowledged); } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
2. index document 索引文档,即往索引里面放入文档数据.类似于数据库里面向表里面插入一行数据,一行数据就是一个文档
IndexDocumentDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 import java.io.IOException;import java.util.concurrent.ExecutionException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.ElasticsearchException;import org.elasticsearch.action.DocWriteResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.support.replication.ReplicationResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.xcontent.XContentType;import org.elasticsearch.rest.RestStatus;public class IndexDocumentDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (TransportClient client = InitDemo.getClient();) { IndexRequest request = new IndexRequest ( "mess" , "_doc" , "11" ); String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}" ; request.source(jsonString, XContentType.JSON); IndexResponse indexResponse = null ; try { indexResponse = client.index(request).get(); } catch (ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { logger.error("冲突了,请在此写冲突处理逻辑!\n" + e.getDetailedMessage()); } logger.error("索引异常" , e); }catch (InterruptedException | ExecutionException e) { logger.error("索引异常" , e); } if (indexResponse != null ) { String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { System.out.println("新增文档成功,处理逻辑代码写到这里。" ); } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { System.out.println("修改文档成功,处理逻辑代码写到这里。" ); } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { } if (shardInfo.getFailed() > 0 ) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); System.out.println("副本失败原因:" + reason); } } } } catch (IOException e) { e.printStackTrace(); } } }
3. get document 获取文档数据 GetDocumentDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 import java.io.IOException;import java.util.Map;import java.util.concurrent.ExecutionException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.ElasticsearchException;import org.elasticsearch.action.get.GetRequest;import org.elasticsearch.action.get.GetResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.Strings;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.fetch.subphase.FetchSourceContext;public class GetDocumentDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (TransportClient client = InitDemo.getClient();) { GetRequest request = new GetRequest ( "mess" , "_doc" , "11" ); String[] includes = new String []{"message" , "*Date" }; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext (true , includes, excludes); request.fetchSourceContext(fetchSourceContext); GetResponse getResponse = null ; try { getResponse = client.get(request).get(); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { logger.error("没有找到该id的文档" ); } if (e.status() == RestStatus.CONFLICT) { logger.error("获取时版本冲突了,请在此写冲突处理逻辑!" ); } logger.error("获取文档异常" , e); }catch (InterruptedException | ExecutionException e) { logger.error("索引异常" , e); } if (getResponse != null ) { String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); byte [] sourceAsBytes = getResponse.getSourceAsBytes(); logger.info("index:" + index + " type:" + type + " id:" + id); logger.info(sourceAsString); } else { logger.error("没有找到该id的文档" ); } } } catch (IOException e) { e.printStackTrace(); } } }
4. Bulk 批量索引文档,即批量往索引里面放入文档数据.类似于数据库里面批量向表里面插入多行数据,一行数据就是一个文档
BulkDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 import java.io.IOException;import java.util.concurrent.ExecutionException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.DocWriteRequest;import org.elasticsearch.action.DocWriteResponse;import org.elasticsearch.action.bulk.BulkItemResponse;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.delete.DeleteResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.update.UpdateResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.xcontent.XContentType;public class BulkDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (TransportClient client = InitDemo.getClient();) { BulkRequest request = new BulkRequest (); request.add(new IndexRequest ("mess" , "_doc" , "1" ) .source(XContentType.JSON,"field" , "foo" )); request.add(new IndexRequest ("mess" , "_doc" , "2" ) .source(XContentType.JSON,"field" , "bar" )); request.add(new IndexRequest ("mess" , "_doc" , "3" ) .source(XContentType.JSON,"field" , "baz" )); BulkResponse bulkResponse = client.bulk(request).get(); if (bulkResponse != null ) { for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } } } } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
5. search 搜索数据 SearchDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 import java.io.IOException;import java.util.Map;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.action.search.ShardSearchFailure;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.unit.TimeValue;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.builder.SearchSourceBuilder;public class SearchDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (TransportClient client = InitDemo.getClient();) { SearchRequest searchRequest = new SearchRequest ("bank" ); searchRequest.types("_doc" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); sourceBuilder.query(QueryBuilders.termQuery("age" , 24 )); sourceBuilder.from(0 ); sourceBuilder.size(10 ); sourceBuilder.timeout(new TimeValue (60 , TimeUnit.SECONDS)); searchRequest.source(sourceBuilder); SearchResponse searchResponse = client.search(searchRequest).get(); RestStatus status = searchResponse.status(); TimeValue took = searchResponse.getTook(); Boolean terminatedEarly = searchResponse.isTerminatedEarly(); boolean timedOut = searchResponse.isTimedOut(); int totalShards = searchResponse.getTotalShards(); int successfulShards = searchResponse.getSuccessfulShards(); int failedShards = searchResponse.getFailedShards(); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { } SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits(); float maxScore = hits.getMaxScore(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore(); String sourceAsString = hit.getSourceAsString(); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); logger.info("index:" + index + " type:" + type + " id:" + id); logger.info(sourceAsString); } } catch (IOException | InterruptedException | ExecutionException e) { logger.error(e); } } }
6. highlight 高亮 HighlightDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 import java.io.IOException;import java.util.Map;import java.util.concurrent.ExecutionException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.text.Text;import org.elasticsearch.index.query.QueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;public class HighlightDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (TransportClient client = InitDemo.getClient();) { SearchRequest searchRequest = new SearchRequest ("hl_test" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title" , "lucene solr" ); sourceBuilder.query(matchQueryBuilder); HighlightBuilder highlightBuilder = new HighlightBuilder (); highlightBuilder.requireFieldMatch(false ).field("title" ).field("content" ) .preTags("<strong>" ).postTags("</strong>" ); sourceBuilder.highlighter(highlightBuilder); searchRequest.source(sourceBuilder); SearchResponse searchResponse = client.search(searchRequest).get(); if (RestStatus.OK.equals(searchResponse.status())) { SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore(); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); logger.info("index:" + index + " type:" + type + " id:" + id); logger.info("sourceMap : " + sourceAsMap); Map<String, HighlightField> highlightFields = hit.getHighlightFields(); HighlightField highlight = highlightFields.get("title" ); if (highlight != null ) { Text[] fragments = highlight.fragments(); if (fragments != null ) { String fragmentString = fragments[0 ].string(); logger.info("title highlight : " + fragmentString); } } highlight = highlightFields.get("content" ); if (highlight != null ) { Text[] fragments = highlight.fragments(); if (fragments != null ) { String fragmentString = fragments[0 ].string(); logger.info("content highlight : " + fragmentString); } } } } } catch (IOException | InterruptedException | ExecutionException e) { logger.error(e); } } }
7. suggest 查询建议 SuggestDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 import java.io.IOException;import java.util.concurrent.ExecutionException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.elasticsearch.search.suggest.Suggest;import org.elasticsearch.search.suggest.SuggestBuilder;import org.elasticsearch.search.suggest.SuggestBuilders;import org.elasticsearch.search.suggest.SuggestionBuilder;import org.elasticsearch.search.suggest.completion.CompletionSuggestion;import org.elasticsearch.search.suggest.term.TermSuggestion;public class SuggestDemo { private static Logger logger = LogManager.getRootLogger(); public static void termSuggest (TransportClient client) { SearchRequest searchRequest = new SearchRequest ("mess" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); sourceBuilder.size(0 ); SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user" ).text("kmichy" ); SuggestBuilder suggestBuilder = new SuggestBuilder (); suggestBuilder.addSuggestion("suggest_user" , termSuggestionBuilder); sourceBuilder.suggest(suggestBuilder); searchRequest.source(sourceBuilder); try { SearchResponse searchResponse = client.search(searchRequest).get(); if (RestStatus.OK.equals(searchResponse.status())) { Suggest suggest = searchResponse.getSuggest(); TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user" ); for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { logger.info("text: " + entry.getText().string()); for (TermSuggestion.Entry.Option option : entry) { String suggestText = option.getText().string(); logger.info(" suggest option : " + suggestText); } } } } catch (InterruptedException | ExecutionException e) { logger.error(e); } } public static void completionSuggester (TransportClient client) { SearchRequest searchRequest = new SearchRequest ("music" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); sourceBuilder.size(0 ); SuggestionBuilder termSuggestionBuilder = SuggestBuilders.completionSuggestion("suggest" ).prefix("lucene s" ) .skipDuplicates(true ); SuggestBuilder suggestBuilder = new SuggestBuilder (); suggestBuilder.addSuggestion("song-suggest" , termSuggestionBuilder); sourceBuilder.suggest(suggestBuilder); searchRequest.source(sourceBuilder); try { SearchResponse searchResponse = client.search(searchRequest).get(); if (RestStatus.OK.equals(searchResponse.status())) { Suggest suggest = searchResponse.getSuggest(); CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest" ); for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) { logger.info("text: " + entry.getText().string()); for (CompletionSuggestion.Entry.Option option : entry) { String suggestText = option.getText().string(); logger.info(" suggest option : " + suggestText); } } } } catch (InterruptedException | ExecutionException e) { logger.error(e); } } public static void main (String[] args) { try (TransportClient client = InitDemo.getClient();) { termSuggest(client); logger.info("--------------------------------------" ); completionSuggester(client); } catch (IOException e) { logger.error(e); } } }
8. aggregation 聚合分析 AggregationDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 import java.io.IOException;import java.util.concurrent.ExecutionException;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.aggregations.AggregationBuilders;import org.elasticsearch.search.aggregations.Aggregations;import org.elasticsearch.search.aggregations.BucketOrder;import org.elasticsearch.search.aggregations.bucket.terms.Terms;import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;import org.elasticsearch.search.aggregations.metrics.avg.Avg;import org.elasticsearch.search.builder.SearchSourceBuilder;public class AggregationDemo { private static Logger logger = LogManager.getRootLogger(); public static void main (String[] args) { try (TransportClient client = InitDemo.getClient();) { SearchRequest searchRequest = new SearchRequest ("bank" ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder (); sourceBuilder.size(0 ); TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age" ) .field("age" ).order(BucketOrder.aggregation("average_balance" , true )); aggregation.subAggregation(AggregationBuilders.avg("average_balance" ) .field("balance" )); sourceBuilder.aggregation(aggregation); searchRequest.source(sourceBuilder); SearchResponse searchResponse = client.search(searchRequest).get(); if (RestStatus.OK.equals(searchResponse.status())) { Aggregations aggregations = searchResponse.getAggregations(); Terms byAgeAggregation = aggregations.get("by_age" ); logger.info("aggregation by_age 结果" ); logger.info("docCountError: " + byAgeAggregation.getDocCountError()); logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts()); logger.info("------------------------------------" ); for (Bucket buck : byAgeAggregation.getBuckets()) { logger.info("key: " + buck.getKeyAsNumber()); logger.info("docCount: " + buck.getDocCount()); Avg averageBalance = buck.getAggregations().get("average_balance" ); logger.info("average_balance: " + averageBalance.getValue()); logger.info("------------------------------------" ); } } } catch (IOException | InterruptedException | ExecutionException e) { logger.error(e); } } }
9. 官网文档 Document API 文档操作API:
https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-docs.html
Search API:
https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-search.html
六、Spring Data Elasticsearch ES与Spring集成使用,可以作为了解,建议还是使用原生的ES的java客户端
官网链接:
https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/
代码库:
https://github.com/spring-projects/spring-data-elasticsearch