最近在学习 Elasticsearch,这是一个分布式的大数据搜索引擎,其实也可以看作是一个分布式的数据库。我使用的 Elasticsearch 的版本是 2.4.1,鉴于网上相关的中文资料较少,所以自己看官方文档学习一下。
使用 Maven 工程,我的 pom 文件如下所示:
org.elasticsearch elasticsearch 2.4.1 org.apache.logging.log4j log4j-api 2.6.2 org.apache.logging.log4j log4j-core 2.6.2
连接机器
TransportClient client = TransportClient.builder() .build() .addTransportAddress(new InetSocketTransportAddress(InetAddress .getByName("localhost"), 9300));
Index API 创建 Index 并且插入 Document
创建索引有很多种方法,这里列举常用的 2 种:
HashMapjson = new HashMap ();json.put("first_name","Shuang");json.put("last_name", "Peng");json.put("age", 24);json.put("about", "I love coding");IndexResponse response = client .prepareIndex("tseg","students","1") .setSource(json).get();IndexResponse response = client.prepareIndex("tseg","students","1") .setSource(jsonBuilder() .startObject() .field("first_name", "Shuang") .field("first_name", "Peng") .field("age", 24) .field("about", "I love coding") .endObject()) .get();
注意:Index API 只能用于创建 index,类似于关系型数据库里面的 create table,他不能对已有的数据库进行添加。追加操作可以用后面会提到的 Update 或者 Bulk 来完成。
Get API 获取 Document
GetResponse response2 = client.prepareGet("tseg", "students", "1").get();Mapres = response2.getSource();for (Map.Entry entry: res.entrySet()){ System.out.println(entry.getKey() + " : " + entry.getValue()); }
Delete API 删除 Index 或者 Document
// 用来删除对应的 document DeleteResponse response3 = client.prepareDelete("tesg","students","1").get();// 用来删除对应的 indexDeleteIndexResponse response4 = client.admin().indices().prepareDelete("facebook").execute().actionGet();
Update API 更新操作
更新操作也有两种方法。建议使用第一种,第二种太复杂了。。。看看就好。
第一种
client.prepareUpdate("tseg", "students", "1") .setDoc(jsonBuilder() .startObject().field("age", 32) .endObject()) .get();
第二种
IndexRequest indexRequest = new IndexRequest("tseg", "students", "1") .source(jsonBuilder() .startObject() .field("first_name", "Shuang") .field("last_name", "Peng") .field("age", 32) .field("about", "I loving coding") .endObject());UpdateRequest updateRequest = new UpdateRequest("tseg","students", "1") .doc(jsonBuilder() .startObject().field("age", 32) .endObject()) .upsert(indexRequest); client.update(updateRequest).get();
不过这里提一下第二种方法,如果对应的 field 不存在的话,则更新操作自动变为插入操作,否则,就是正常的修改操作。
Multi Get API 多查找
MultiGetResponse API 可以一次返回多个要查找的值。下面介绍了两种方法,一种是返回一个 Map,我们可以按照不同的 field 取值;第二种方法是直接返回一个字符串(Json格式)。
MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("tseg", "students", "1", "2", "3").get();for (MultiGetItemResponse itemResponses : multiGetItemResponses) { GetResponse response5 = itemResponses.getResponse(); if (response5.isExists()) { // 第一种用法 Mapfields = response5.getSource(); System.out.println(fields.get("first_name"));// 第二种用法 String json2 = response5.getSourceAsString(); System.out.println(json2);}
Bulk API 批量操作
Bulk API允许批量提交index和delete请求, 如下:
BulkRequestBuilder bulkRequest = client.prepareBulk();bulkRequest.add(client.prepareIndex("tseg", "students", "1") .setSource(jsonBuilder() .startObject() .field("first_name", "Allen") .field("last_name", "Peng") .field("age", "22") .endObject())) .get(); bulkRequest.add(client.prepareIndex("tseg", "students", "2")) .setSource(jsonBuilder() .startObject() .field("first_name", "Hou") .field("last_name", "Xue") .field("age", "30") .endObject())) .get();HashMapjson2 = new HashMap ();List list = new ArrayList ();list.add("music");list.add("football");json2.put("first_name", "Peng");json2.put("last_name", "Peng");json2.put("interests", list);BulkRequestBuilder bulkRequest2 = client.prepareBulk();// 两种执行方法,个人倾向于第一种bulkRequest2.add(client.prepareIndex("facebook", "info", "3").setSource(json2)).get();// 第二种方法bulkRequest2.add(client.prepareIndex("facebook", "info","1").setSource(json2)).execute().actionGet();
还可以这样做:
BulkRequestBuilder bulkRequest = client.prepareBulk();bulkRequest.add(client.prepareIndex("index1", "type1", "id1") .setSource(source);bulkRequest.add(client.prepareIndex("index2", "type2", "id2") .setSource(source);BulkResponse bulkResponse = bulkRequest.execute().actionGet();
Bulk Processor API 可在批量操作完成之前和之后进行相应的操作
BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .build(); bulkProcessor.add(new IndexRequest("index1", "type1", "id1").source(source1)); bulkProcessor.add(new DeleteRequest("index2", "type2", "id2");
beforeBulk 会在批量提交之前执行,可以从 BulkRequest 中获取请求信息request.requests() 或者请求数量 request.numberOfActions()。
第一个 afterBulk 会在批量成功后执行,可以跟 beforeBulk 配合计算批量所需时间。
第二个 afterBulk 会在批量失败后执行。
在例子中,当请求超过 10000 个(default=1000)或者总大小超过1GB(default=5MB)时,触发批量提交动作。
后记
项目代码已经共享至 。