like title says...
i read article (https://www.elastic.co/blog/changing-mapping-with-zero-downtime), , concept great, struggling find decent reference on how via java api.
i found plugin: https://github.com/karussell/elasticsearch-reindex, seems overkill of trying do
after research @ local starbucks here came with:
let's assume have our index ("old_index") , has data... let's move data new index ("new_index") created (perhaps different schema string vs int field, or decide no longer wish analyze or store field, etc).
the basic idea here retrieve data existing index ("old_index") , ingest new index ("new_index"). however, there few things have do:
step 1. need perform search scroll https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
all retrieves results more efficiently vs regular search. there no scoring, etc. here documentation has say: "scrolling not intended real time user requests, rather processing large amounts of data, e.g. in order reindex contents of 1 index new index different configuration."
here link java api on how use it: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling.html
step 2. when doing insert, have use bulk ingest. once again, done performance reasons. here link bulk ingest java api: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor
now onto ho it...
step 1. set scroll search "load" data old index
searchresponse scrollresp = client.preparesearch("old_index") // specify index .setsearchtype(searchtype.scan) .setscroll(new timevalue(60000)) .setquery(querybuilders.matchallquery()) // match query .setsize(100).execute().actionget(); //100 hits per shard returned each scroll step 2. set bulk processor.
int bulk_actions_threshold = 1000; int bulk_concurrent_requests = 1; bulkprocessor bulkprocessor = bulkprocessor.builder(client, new bulkprocessor.listener() { @override public void beforebulk(long executionid, bulkrequest request) { logger.info("bulk going execute new bulk composed of {} actions", request.numberofactions()); } @override public void afterbulk(long executionid, bulkrequest request, bulkresponse response) { logger.info("executed bulk composed of {} actions", request.numberofactions()); } @override public void afterbulk(long executionid, bulkrequest request, throwable failure) { logger.warn("error executing bulk", failure); } }).setbulkactions(bulk_actions_threshold).setconcurrentrequests(bulk_concurrent_requests).setflushinterval(timevalue.timevaluemillis(5)).build(); step 3. read old index via created scroll searcher in step 1 until there mo records left , insert new index
//scroll until no hits returned while (true) { scrollresp = client.preparesearchscroll(scrollresp.getscrollid()).setscroll(new timevalue(600000)).execute().actionget(); //break condition: no hits returned if (scrollresp.gethits().gethits().length == 0) { logger.info("closing bulk processor"); bulkprocessor.close(); break; } // results scan search , add bulk ingest (searchhit hit: scrollresp.gethits()) { indexrequest request = new indexrequest("new_index", hit.type(), hit.id()); map source = ((map) ((map) hit.getsource())); request.source(source); bulkprocessor.add(request); } } step 4. time assign existing alias, points old index, new index. delete alias reference old index , delete old index itself. find out how determine alias assigned existing old index see post: elasticseach java api find aliases given index
to assign alias new index
client.admin().indices().preparealiases().addalias("new_index", "alias_name").get(); remove alias old index , delete old index
client.admin().indices().preparealiases().removealias("old_index", "alias_name").execute().actionget(); client.admin().indices().preparedelete("old_index").execute().actionget();
Comments
Post a Comment