1.bulk操作
bulk操作的api
XContentBuilder xContentBuilder1 = XContentFactory.jsonBuilder() .startObject().field("name","1234").endObject(); XContentBuilder xContentBuilder2 = XContentFactory.jsonBuilder() .startObject().field("name","4321").endObject(); XContentBuilder xContentBuilder3 = XContentFactory.jsonBuilder() .startObject().field("name","9999").endObject(); BulkRequestBuilder bulk = client.prepareBulk(); bulk.add(new IndexRequest(index,type,"3").source(xContentBuilder1)); bulk.add(new IndexRequest(index,type,"4").source(xContentBuilder2)); bulk.add(new IndexRequest(index,type,"5").source(xContentBuilder3)); BulkResponse responses = bulk.get(); Iteratoriterator = responses.iterator(); for (BulkItemResponse respons : responses) { System.out.println(respons.getResponse()); }
bulkprocess的方式
BulkProcessor build = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long l, BulkRequest bulkRequest) { System.out.println("执行前"); } @Override public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) { System.out.println("执行后"); } @Override public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { System.out.println("处理异常"); } }) .setBulkActions(10000)//设置bulk执行数 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//设置bulk处理的大小 //设置超时和重调次数 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .setConcurrentRequests(1)//设置并发量 .build(); // build.add() 这里添加批处理的操作 build.flush(); build.close();