最近工作中使用到InfluxDB2.x版本,在这里做一下学习记录。网上大多数教程都是基于InfluxDB1.x的,而2.x以后查询语句由原来的InfluxQL语句改为Flux语句,差别是特别大的。
1、基本语法
首先贴一段Flux查询代码
from(bucket: "position1")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "device_history_location" and r["device_id"] == "123")
bucket:相当于mysql中的库名
measurement:相当于表名
range:表示查询的时间范围
filter:表示过滤条件
2、SpringBoot整合
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.7.0</version>
</dependency>
- 示例代码
private static InfluxDBClient getClient() {
InfluxDBClient client = InfluxDBClientFactory.create("http://locahost:8086",
"111".toCharArray(), "test", "position1");
return client;
}
@Data
@Accessors(chain = true)
@Measurement(name = "device_history_location")
public class InfluxPosition {
@Column(name = "device_id",tag = true)
private String deviceId;
@Column(name = "vehicle_id")
private String vehicleId;
@Column(timestamp = true)
private Instant locationTime;
@Column(name = "longitude")
private BigDecimal longitude;
@Column(name = "latitude")
private BigDecimal latitude;
@Column(name = "speed")
private Integer speed;
@Column(name = "altitude")
private Integer altitude;
@Column(name = "mileage")
private Integer mileage;
}
private static void writeData(InfluxDBClient client) {
WriteApiBlocking writeApiBlocking = client.getWriteApiBlocking();
List<InfluxPosition> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
InfluxPosition position = new InfluxPosition()
.setDeviceId("123")
.setVehicleId("321")
.setLocationTime(Instant.now())
.setLongitude(new BigDecimal("113.12313"))
.setLatitude( new BigDecimal("23.8524"))
.setSpeed(i * 10)
.setAltitude(i * 100)
.setMileage(i * 1000);
list.add(position);
}
writeApiBlocking.writeMeasurements("position1","test",WritePrecision.MS,list);
}
private static List<FluxTable> queryData(InfluxDBClient client) {
String flux = "from(bucket: \"position1\")\n" +
" |> range(start: -6h)\n" +
" |> filter(fn: (r) => r[\"_measurement\"] == \"device_history_location\")" +
" |> pivot(rowKey:[\"_time\"],columnKey: [\"_field\"],valueColumn: \"_value\")";
List<FluxTable> query = client.getQueryApi().query(flux);
for (FluxTable table : query) {
List<FluxRecord> records = table.getRecords();
for (FluxRecord record : records) {
log.info("{}---{}---{}---{}", record.getMeasurement(),record.getField(),record.getValue(),record.getTime());
}
}
return query;
}
3、测试
public static void main(String[] args) {
InfluxDBClient client = getClient();
writeData(client);
}
执行代码后去控制台发现已经有数据了
public static void main(String[] args) {
InfluxDBClient client = getClient();
List<FluxTable> query = queryData(client);
FluxResultMapper resultMapper = new FluxResultMapper();
for (FluxTable table : query) {
List<FluxRecord> records = table.getRecords();
for (FluxRecord record : records) {
log.info("values:{}---time:{}", record.getValues(),record.getTime());
InfluxPosition influxPosition = resultMapper.toPOJO(record, InfluxPosition.class);
}
}
}