@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TemperatureReport implements TcpPayload, TcpDeviceMessage {
private long deviceId;
private float temperature;
private float humidity;
@Override
public DeviceMessage toDeviceMessage() {
ReportPropertyMessage message = new ReportPropertyMessage();
message.setProperties(Collections.singletonMap("temperature", temperature));
/* Map<String, Object> map =new HashMap();
map.put("temperature", temperature);
map.put("humidity", humidity);*/
message.getProperties().put("humidity", humidity);
message.setDeviceId(String.valueOf(deviceId));
message.setTimestamp(System.currentTimeMillis());
return message;
}
@Override
public byte[] toBytes() {
//前8位为设备ID,后4位为温度值,低位字节在前.
byte[] data = new byte[16];
BytesUtils.numberToLe(data, deviceId, 0, 8);
BytesUtils.numberToLe(data, Float.floatToIntBits(temperature), 8, 4);
BytesUtils.numberToLe(data, Float.floatToIntBits(humidity), 12, 4);
return data;
}
@Override
public void fromBytes(byte[] bytes, int offset) {
this.deviceId = BytesUtils.leToLong(bytes, offset, 8);
this.temperature = BytesUtils.leToFloat(bytes, offset + 8, 4);
this.humidity = BytesUtils.leToFloat(bytes, offset + 12, 4);
}
@Override
public String toString() {
return "TemperatureReport{" +
"deviceId=" + deviceId +
", temperature=" + temperature +
", humidity=" + humidity +
'}';
}
}
vertx.createNetClient()
.connect(8088, "127.0.0.1", result -> {
if (result.succeeded()) {
byte[] login=DemoTcpMessage.of(MessageType.AUTH_REQ, AuthRequest.of(1000, key)).toBytes();
NetSocket socket = result.result();
socket.handler(buffer -> {
// TODO: 2020/3/2 粘拆包处理
DemoTcpMessage tcpMessage = DemoTcpMessage.of(buffer.getBytes());
System.out.println(tcpMessage);
//认证通过后定时上报温度数据
if (tcpMessage.getType() == MessageType.AUTH_RES && ((AuthResponse) tcpMessage.getData()).getStatus() == TcpStatus.SUCCESS) {
Flux.interval(Duration.ofSeconds(1))
.flatMap(t -> Flux.just(
DemoTcpMessage.of(MessageType.REPORT_TEMPERATURE,
// TemperatureReport.of(deviceId, (float) ThreadLocalRandom.current().nextDouble(20D, 50D)))
TemperatureReport.of(deviceId, (float)12.4,(float)30))
.toBytes()
/* DemoTcpMessage.of(MessageType.FIRE_ALARM,
FireAlarm.builder()
.point(ThreadLocalRandom.current().nextInt())
.lat(102.234F)
.lnt(122.122F)
.deviceId(deviceId)
.build()).toBytes()*/
))
.map(Buffer::buffer)
.window(1)//一次性发送2条数据
.flatMap(list -> list.reduce(Buffer::appendBuffer))
.doOnNext(buf -> socket.write(buf, res -> {
log.debug("send : {}", Hex.encodeHexString(buf.getBytes()));
if (!res.succeeded()) {
res.cause().printStackTrace();
}
}))
.subscribe();
}
}).write(Buffer.buffer(login), res -> {
log.debug("send auth req:{}",Hex.encodeHexString(login));
if (!res.succeeded()) {
res.cause().printStackTrace();
}
}).exceptionHandler(Throwable::printStackTrace);
} else {
result.cause().printStackTrace();
System.exit(0);
}
});