// 用户名及密码校验
if (variableHeader.hasPassword() && variableHeader.hasUserName()) {
authenticationService.asyncAuthenticate(ClientAuthDTO.of(clientId, username, password))
.thenAccept(authentication -> {
if (authentication == null) {
// authentication 是有可能为 null 的
// 比如 认证服务 response http status = 200, 但是响应内容为空
authentication = Authentication.of(clientId);
} else {
authentication.setClientId(clientId); // 置入 clientId
}
process0(ctx, msg, authentication);
})
.exceptionally(throwable -> {
MqttConnAckMessage mqttConnAckMessage;
if (throwable.getCause() instanceof AuthenticationException) {
mqttConnAckMessage = MqttMessageBuilders.connAck()
.sessionPresent(false)
.returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)
.build();
} else {
mqttConnAckMessage = MqttMessageBuilders.connAck()
.sessionPresent(false)
.returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)
.build();
}
log.error(String.format("client[id: %s, username: %s]登入失败", clientId, username), throwable);
// 告知客户端并关闭连接
ctx.writeAndFlush(mqttConnAckMessage);
ctx.close();
return null;
});
} else {
process0(ctx, msg, Authentication.of(clientId));
}