Limax-ZDB缓存预热

Limax-ZDB缓存预热

Posted by CaiJiahe on August 30, 2017

package limax.zdb;

import java.io.BufferedWriter;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import com.google.gson.Gson;

import limax.util.Trace;

@SuppressWarnings({"rawtypes", "unchecked"})
public class TTableCachePreLoader {

	/**
	 * load cache with all lines in preload file.
	 */
	public static final String PRELOAD_FILENAME = "preload";
	
	/**
	 * zookeeper node name
	 */
	public static final String PRELOADER_IN_ZOOKEEPER = "/preloader";
	
	private static TTableCachePreLoader instance = new TTableCachePreLoader();
	
	public static TTableCachePreLoader getInstance() {
		return instance;
	}
	
	private Map<String, String> preLoadTables = new HashMap<>();
	
	private TTableCachePreLoader() {}
	
	public void load(Tables tables) {
		// load table names
		initPreLoadTables(Zdb.meta().getPreLoadCfgName());
	
		try {
			// load
			loadSequence(tables);
		} catch (Exception e) {
			// IGNORE
			e.printStackTrace();
		}
	}
	
	private void loadSequence(Tables tables) throws Exception {
		String zkUrl = Zdb.meta().getZkUrl();
		Objects.requireNonNull(zkUrl);
		try (CuratorFramework client = CuratorFrameworkFactory.builder()
	            .connectString(zkUrl)
	            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build()) {
			client.start();
			final InterProcessMutex lock = new InterProcessMutex(client, PRELOADER_IN_ZOOKEEPER);
			try {
				lock.acquire();
				loadTables(tables);
			} finally {
				lock.release();
			}
		}
	}
	
	private void loadTables(Tables tables) {
		Path path = Paths.get(PRELOAD_FILENAME);
		try {
			List<String> lines = Files.readAllLines(path, StandardCharsets.UTF_8);
			Gson gson = new Gson();
			lines.stream()
				.map(json -> gson.fromJson(json, PreLoadInfo.class))
				.filter(pli -> preLoadTables.get(pli.tableName) != null)
				.forEach(pli -> loadTable(tables, pli));
		} catch (NoSuchFileException e) {
			createCacheFile(path);
		} catch (Throwable e) {
			deleteCacheFile(path);
			Trace.error("pre-load failed, reason is " + e.getMessage());
		}
	}
	
	private void initPreLoadTables(String cfgName) {
		Properties props = new Properties();
		try {
			props.load(Files.newInputStream(Paths.get(cfgName)));
			preLoadTables.putAll((Map)props);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * serialization load
	 * 
	 * @param tables
	 * @param tk
	 */
	private void loadTable(Tables tables, PreLoadInfo pli) {
		TTable table = tables.getTable(pli.tableName);
		try {
			Class clazz = Class.forName(pli.type);
			Method mth = clazz.getMethod("valueOf", String.class);
			Procedure.call(() -> {
				pli.keys.forEach(key -> {
					try {
						Object obj = mth.invoke(null, key);
						table.get(obj, false);
					} catch (Exception e) {
						Trace.error("pre-load tableName = " + pli.tableName 
								+ " and key = " + key + " failed!!! reason is " + e.getMessage());
					}
				});
				return true;
			});
		} catch (Exception e) {
			throw new Error("No " + pli.tableName + ".class or the class no valueOf method!");
		}
	}
	
	private void createCacheFile(Path path) {
		try {
			Files.createFile(path);
		} catch (IOException e) {
		}
	}
	
	private void deleteCacheFile(Path path) {
		try {
			Files.deleteIfExists(path);
		} catch (IOException e) {
		}
	}
	
	public void store(Tables tables) {
		// create file if not exist
		Path path = Paths.get(PRELOAD_FILENAME);
		createCacheFile(path);
		
		// store
		try (BufferedWriter writer 
				= Files.newBufferedWriter(path, Charset.forName("utf-8"), StandardOpenOption.WRITE)) {
			preLoadTables.keySet().stream()
				.map(tableName -> tables.getTable(tableName).getCache())
				.filter(cache -> cache instanceof TTableCacheLRU)
				.forEach(cache -> storeTable(writer, cache));
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void storeTable(BufferedWriter writer, TTableCache cache) {
		int preLoadRecords = cache.getTable().meta().getPreLoadRecords();
		List<String> keys = ((TTableCacheLRU)cache).fetchCache(preLoadRecords);
		
		String tableName = cache.getTable().getName();
		String json = new Gson().toJson(new PreLoadInfo(tableName, preLoadTables.get(tableName), keys));
		
		try {
			writer.write(json);
			writer.newLine();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	static class PreLoadInfo {
		String tableName;
		String type;
		List<String> keys;
		
		public PreLoadInfo(String tableName, String type, List<String> keys) {
			this.tableName = tableName;
			this.type = type;
			this.keys = keys;
		}
	}
	
}