0x01 简介
Lettuce是一个可伸缩线程安全的Redis客户端。多个线程可以共享同一个RedisConnection。它利用Netty框架来高效地管理多个连接。
0x02 向server发送请求
public interface StatefulConnection<K, V> extends AutoCloseable {
...
<T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command);
}
public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {
private final RedisChannelWriter channelWriter;
...
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
if (debugEnabled) {
logger.debug("dispatching command {}", cmd);
}
return channelWriter.write(cmd);
}
}
StatefulConnection
接口中定义了dispatch
方法,在这个方法的实现抽象类RedisChannelHandler
中,该方法调用了channelWriter.write
方法。最后这个方法会调到CommandHandler.write
方法,然后序列化协议发送给redis server。
// 一个负责响应redis server响应和发送redis server请求的netty ChannelHandler
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
...
}
0x03 解析server响应
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
private ByteBuf buffer;
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
...
try {
buffer.writeBytes(input);
decode(ctx, buffer);
} finally {
input.release();
}
}
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
...
while (canDecode(buffer)) {
RedisCommand<?, ?, ?> command = stack.peek();
try {
if (!decode(ctx, buffer, command)) {
return;
}
} catch (Exception e) {
ctx.close();
throw e;
}
command.complete();
afterComplete(ctx, command);
}
}
...
}
从channelRead中得到EventLoop读取到的数据,然后对其解码。
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
private final RedisStateMachine rsm = new RedisStateMachine();
...
private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?, ?, ?> command) {
if (!decode(buffer, command, command.getOutput())) {
...
return false;
}
...
return true;
}
protected boolean decode(ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) {
return rsm.decode(buffer, command, output);
}
}
解码的时候会从command里得到相应的Output对象,然后使用RedisStateMachine进行解码。
public class RedisStateMachine {
static class State {
enum Type {
SINGLE, ERROR, INTEGER, BULK, MULTI, BYTES
}
Type type = null;
int count = -1;
}
public boolean decode(ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) {
int length, end;
ByteBuffer bytes;
if (isEmpty(stack)) {
add(stack, new State());
}
if (output == null) {
return isEmpty(stack);
}
loop:
while (!isEmpty(stack)) {
State state = peek(stack);
if (state.type == null) {
if (!buffer.isReadable()) {
break;
}
state.type = readReplyType(buffer);
buffer.markReaderIndex();
}
switch (state.type) {
case SINGLE:
if ((bytes = readLine(buffer)) == null) {
break loop;
}
if (!QUEUED.equals(bytes)) {
safeSet(output, bytes, command);
}
break;
case ERROR:
if ((bytes = readLine(buffer)) == null) {
break loop;
}
safeSetError(output, bytes, command);
break;
case INTEGER:
if ((end = findLineEnd(buffer)) == -1) {
break loop;
}
long integer = readLong(buffer, buffer.readerIndex(), end);
safeSet(output, integer, command);
break;
case BULK:
if ((end = findLineEnd(buffer)) == -1) {
break loop;
}
length = (int) readLong(buffer, buffer.readerIndex(), end);
if (length == -1) {
safeSet(output, null, command);
} else {
state.type = BYTES;
state.count = length + 2;
buffer.markReaderIndex();
continue loop;
}
break;
case MULTI:
if (state.count == -1) {
if ((end = findLineEnd(buffer)) == -1) {
break loop;
}
length = (int) readLong(buffer, buffer.readerIndex(), end);
state.count = length;
buffer.markReaderIndex();
safeMulti(output, state.count, command);
}
if (state.count <= 0) {
break;
}
state.count--;
addFirst(stack, new State());
continue loop;
case BYTES:
if ((bytes = readBytes(buffer, state.count)) == null) {
break loop;
}
safeSet(output, bytes, command);
break;
default:
throw new IllegalStateException("State " + state.type + " not supported");
}
buffer.markReaderIndex();
remove(stack);
output.complete(size(stack));
}
if (debugEnabled) {
logger.debug("Decoded {}, empty stack: {}", command, isEmpty(stack));
}
return isEmpty(stack);
}
}
这个方法是用来解析redis server返回来的协议,在解析完成后返回到CommandHandler.decode方法,执行command.complete()
方法。
public class CommandWrapper<K, V, T> implements RedisCommand<K, V, T>, CompleteableCommand<T>, DecoratedCommand<K, V, T> {
public void complete() {
command.complete();
Consumer[] consumers = ONCOMPLETE.get(this);
if (consumers != EMPTY && ONCOMPLETE.compareAndSet(this, consumers, EMPTY)) {
for (Consumer<? super T> consumer : consumers) {
if (getOutput() != null) {
consumer.accept(getOutput().get());
} else {
consumer.accept(null);
}
}
}
}
}
private static class SubscriptionCommand<K, V, T> extends CommandWrapper<K, V, T> implements DemandAware.Sink {
public void complete() {
if (completed) {
return;
}
try {
super.complete();
if (getOutput() != null) {
Object result = getOutput().get();
if (getOutput().hasError()) {
onError(new RedisCommandExecutionException(getOutput().getError()));
completed = true;
return;
}
if (!(getOutput() instanceof StreamingOutput<?>) && result != null) {
if (dissolve && result instanceof Collection) {
Collection<T> collection = (Collection<T>) result;
for (T t : collection) {
if (t != null) {
subscription.onNext(t);
}
}
} else {
subscription.onNext((T) result);
}
}
}
subscription.onAllDataRead();
} finally {
completed = true;
}
}
}
在CommandWrapper.complete()
中会调用SubscriptionCommand.complete()
方法,SubscriptionCommand.complete
方法中会检查Output对象中的result,然后调用Subscription的onNext方法,用来向订阅者推送消息。
最后调用subscription.onAllDataRead()
方法推送onComplete事件收尾。