How to reindex in ElasticSearch via Java API -


like title says...

i read article (https://www.elastic.co/blog/changing-mapping-with-zero-downtime), , concept great, struggling find decent reference on how via java api.

i found plugin: https://github.com/karussell/elasticsearch-reindex, seems overkill of trying do

after research @ local starbucks here came with:

let's assume have our index ("old_index") , has data... let's move data new index ("new_index") created (perhaps different schema string vs int field, or decide no longer wish analyze or store field, etc).

the basic idea here retrieve data existing index ("old_index") , ingest new index ("new_index"). however, there few things have do:

step 1. need perform search scroll https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

all retrieves results more efficiently vs regular search. there no scoring, etc. here documentation has say: "scrolling not intended real time user requests, rather processing large amounts of data, e.g. in order reindex contents of 1 index new index different configuration."

here link java api on how use it: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling.html

step 2. when doing insert, have use bulk ingest. once again, done performance reasons. here link bulk ingest java api: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor

now onto ho it...

step 1. set scroll search "load" data old index

searchresponse scrollresp = client.preparesearch("old_index") // specify index     .setsearchtype(searchtype.scan)     .setscroll(new timevalue(60000))     .setquery(querybuilders.matchallquery()) // match query     .setsize(100).execute().actionget(); //100 hits per shard returned each scroll 

step 2. set bulk processor.

int bulk_actions_threshold = 1000; int bulk_concurrent_requests = 1; bulkprocessor bulkprocessor = bulkprocessor.builder(client, new bulkprocessor.listener() {     @override     public void beforebulk(long executionid, bulkrequest request) {         logger.info("bulk going execute new bulk composed of {} actions", request.numberofactions());     }      @override     public void afterbulk(long executionid, bulkrequest request, bulkresponse response) {         logger.info("executed bulk composed of {} actions", request.numberofactions());     }      @override     public void afterbulk(long executionid, bulkrequest request, throwable failure) {         logger.warn("error executing bulk", failure);     }     }).setbulkactions(bulk_actions_threshold).setconcurrentrequests(bulk_concurrent_requests).setflushinterval(timevalue.timevaluemillis(5)).build(); 

step 3. read old index via created scroll searcher in step 1 until there mo records left , insert new index

//scroll until no hits returned while (true) {     scrollresp = client.preparesearchscroll(scrollresp.getscrollid()).setscroll(new timevalue(600000)).execute().actionget();     //break condition: no hits returned     if (scrollresp.gethits().gethits().length == 0) {         logger.info("closing bulk processor");         bulkprocessor.close();         break;      }     // results scan search , add bulk ingest     (searchhit hit: scrollresp.gethits()) {         indexrequest request = new indexrequest("new_index", hit.type(), hit.id());         map source = ((map) ((map) hit.getsource()));         request.source(source);         bulkprocessor.add(request);    } } 

step 4. time assign existing alias, points old index, new index. delete alias reference old index , delete old index itself. find out how determine alias assigned existing old index see post: elasticseach java api find aliases given index

to assign alias new index

client.admin().indices().preparealiases().addalias("new_index", "alias_name").get(); 

remove alias old index , delete old index

client.admin().indices().preparealiases().removealias("old_index", "alias_name").execute().actionget(); client.admin().indices().preparedelete("old_index").execute().actionget(); 

Comments