package limax.zdb;
import java.io.BufferedWriter;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import com.google.gson.Gson;
import limax.util.Trace;
@SuppressWarnings({"rawtypes", "unchecked"})
public class TTableCachePreLoader {
/**
* load cache with all lines in preload file.
*/
public static final String PRELOAD_FILENAME = "preload";
/**
* zookeeper node name
*/
public static final String PRELOADER_IN_ZOOKEEPER = "/preloader";
private static TTableCachePreLoader instance = new TTableCachePreLoader();
public static TTableCachePreLoader getInstance() {
return instance;
}
private Map<String, String> preLoadTables = new HashMap<>();
private TTableCachePreLoader() {}
public void load(Tables tables) {
// load table names
initPreLoadTables(Zdb.meta().getPreLoadCfgName());
try {
// load
loadSequence(tables);
} catch (Exception e) {
// IGNORE
e.printStackTrace();
}
}
private void loadSequence(Tables tables) throws Exception {
String zkUrl = Zdb.meta().getZkUrl();
Objects.requireNonNull(zkUrl);
try (CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkUrl)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build()) {
client.start();
final InterProcessMutex lock = new InterProcessMutex(client, PRELOADER_IN_ZOOKEEPER);
try {
lock.acquire();
loadTables(tables);
} finally {
lock.release();
}
}
}
private void loadTables(Tables tables) {
Path path = Paths.get(PRELOAD_FILENAME);
try {
List<String> lines = Files.readAllLines(path, StandardCharsets.UTF_8);
Gson gson = new Gson();
lines.stream()
.map(json -> gson.fromJson(json, PreLoadInfo.class))
.filter(pli -> preLoadTables.get(pli.tableName) != null)
.forEach(pli -> loadTable(tables, pli));
} catch (NoSuchFileException e) {
createCacheFile(path);
} catch (Throwable e) {
deleteCacheFile(path);
Trace.error("pre-load failed, reason is " + e.getMessage());
}
}
private void initPreLoadTables(String cfgName) {
Properties props = new Properties();
try {
props.load(Files.newInputStream(Paths.get(cfgName)));
preLoadTables.putAll((Map)props);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* serialization load
*
* @param tables
* @param tk
*/
private void loadTable(Tables tables, PreLoadInfo pli) {
TTable table = tables.getTable(pli.tableName);
try {
Class clazz = Class.forName(pli.type);
Method mth = clazz.getMethod("valueOf", String.class);
Procedure.call(() -> {
pli.keys.forEach(key -> {
try {
Object obj = mth.invoke(null, key);
table.get(obj, false);
} catch (Exception e) {
Trace.error("pre-load tableName = " + pli.tableName
+ " and key = " + key + " failed!!! reason is " + e.getMessage());
}
});
return true;
});
} catch (Exception e) {
throw new Error("No " + pli.tableName + ".class or the class no valueOf method!");
}
}
private void createCacheFile(Path path) {
try {
Files.createFile(path);
} catch (IOException e) {
}
}
private void deleteCacheFile(Path path) {
try {
Files.deleteIfExists(path);
} catch (IOException e) {
}
}
public void store(Tables tables) {
// create file if not exist
Path path = Paths.get(PRELOAD_FILENAME);
createCacheFile(path);
// store
try (BufferedWriter writer
= Files.newBufferedWriter(path, Charset.forName("utf-8"), StandardOpenOption.WRITE)) {
preLoadTables.keySet().stream()
.map(tableName -> tables.getTable(tableName).getCache())
.filter(cache -> cache instanceof TTableCacheLRU)
.forEach(cache -> storeTable(writer, cache));
} catch (IOException e) {
e.printStackTrace();
}
}
private void storeTable(BufferedWriter writer, TTableCache cache) {
int preLoadRecords = cache.getTable().meta().getPreLoadRecords();
List<String> keys = ((TTableCacheLRU)cache).fetchCache(preLoadRecords);
String tableName = cache.getTable().getName();
String json = new Gson().toJson(new PreLoadInfo(tableName, preLoadTables.get(tableName), keys));
try {
writer.write(json);
writer.newLine();
} catch (IOException e) {
e.printStackTrace();
}
}
static class PreLoadInfo {
String tableName;
String type;
List<String> keys;
public PreLoadInfo(String tableName, String type, List<String> keys) {
this.tableName = tableName;
this.type = type;
this.keys = keys;
}
}
}