[TOC]
此工程使用了geomesa kafka
、geomesa hbase
、geomesa spark
进行流批数据的存储和计算。
此工程由数据产生、数据采集、数据可视化三个部分组成。
数据产生:数据为2015-01-01的AIS船舶数据。利用python
脚本读取每一行数据并写入另一个文件中,模拟实时数据。
下载了2015-01-01的船舶数据,下载地址:https://coast.noaa.gov/htdata/CMSP/AISDataHandler/2015/index.html
数据采集:flume
实时监控文件,当文件中有新的数据时,它将采集该数据发送给kafka
。geomesa kafka
订阅kafka
的topic
并接收其中的数据。
数据可视化:配置geoserver
中的geomesa(Kafka)
进行数据可视化。
启动命令
-
启动
zookeeper
zkServer.sh start
-
启动
tomcat
startup.sh
-
启动kafka
控制台显示
kafka-server-start.sh $KAFKA_HOME/config/server.properties
后台运行
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
-
启动
flume
flume-ng agent --name exec-memory-kafka --conf $FLUME_HOME/conf --conf-file /home/workspace/geomesa/geomesa-spark/ais/ais_flume_kafka.conf -Dflume.root.logger=INFO,console
-
启动应用程序
java -cp geomesa-demo-kafka-1.0-SNAPSHOT.jar org.sisyphus.demo.kafka.ais.Ais
-
执行
python
脚本,生成数据python ais.py
-
打开
geoserver
进行可视化http://hadoop001:8090/geoserver
python模拟生成代码
使用python脚本读取AIS_2015_01_Zone01.csv
中的数据。
vim ais.py
#coding:utf-8
import time
# filename = 'D:\workspace\VSCode\\test\\AIS_2015_01_Zone01.csv'
# filename = '/home/workspace/geomesa/geomesa-spark/ais/AIS.csv'
filename = '/home/workspace/geomesa/geomesa-spark/ais/AIS_2015_01_Zone01.csv'
with open(filename) as file_object:
lines = file_object.readlines()
for line in lines:
lineList = line.split(",")
mmsi = lineList[0]
date = lineList[1]
lat = lineList[2]
lon = lineList[3]
ais_temporal_log = "{mmsi}\t{date}\t{lat}\t{lon}".format(
mmsi = mmsi,date = date,lat = lat,lon = lon
)
print(ais_temporal_log)
time.sleep(1)
# f = open('D:\workspace\VSCode\\test\\ais_temporal_log.log',"a")
f = open('/home/workspace/geomesa/geomesa-spark/ais/ais_temporal_data.log',"a")
f.write(ais_temporal_log+"\n")
将脚本上传到linux
中,进行测试
python ais.py
查看有多少条输出结果
wc -l ais_data.log
more ais_data.log
Linux执行python代码
python ais.py
第一:测试flume
能否采集到数据,将采集到的数据输出到控制台。
测试选型如下:
access.log ==> 控制台输出
source ==> exec
channel ==> memory
sink ==> logger
编写配置文件ais_flume.conf
vim ais_flume.conf
exec-memory-logger.sources = exec-source
exec-memory-logger.channels = memory-channel
exec-memory-logger.sinks = logger-sink
exec-memory-logger.sources.exec-source.type = exec
exec-memory-logger.sources.exec-source.command = tail -F /home/workspace/geomesa/geomesa-spark/ais/ais_temporal_data.log
exec-memory-logger.sources.exec-source.shell = /bin/sh -c
exec-memory-logger.channels.memory-channel.type = memory
exec-memory-logger.sinks.logger-sink.type = logger
exec-memory-logger.sources.exec-source.channels = memory-channel
exec-memory-logger.sinks.logger-sink.channel = memory-channel
启动flume
,观察是否有数据输出到控制台
flume-ng agent --name exec-memory-logger --conf $FLUME_HOME/conf --conf-file /home/workspace/geomesa/geomesa-spark/ais/ais_flume.conf -Dflume.root.logger=INFO,console
第二:使用flume
采集生成的数据并发送到kafka
编写ais_flume_kafka.conf
配置文件
使用flume
采集生成的数据并发送到kafka
编写ais_flume_kafka.conf
配置文件
选型如下:
access.log ==> kafka
source ==> exec
channel ==> memory
sink ==> kafka-sink
vim ais_flume_kafka.conf
exec-memory-kafka.sources = exec-source
exec-memory-kafka.channels = memory-channel
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /home/workspace/geomesa/geomesa-spark/ais/ais_temporal_data.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
exec-memory-kafka.channels.memory-channel.type = memory
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop001:9092
exec-memory-kafka.sinks.kafka-sink.topic = aistopic
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
启动zookeeper
(kafka
需要)
zkServer.sh start
检查zk
状态
zkServer.sh status
启动kafka
控制台显示
kafka-server-start.sh $KAFKA_HOME/config/server.properties
后台执行
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
daemon 是一个守护进程,守护进行一直在后台执行
停止kafka
kafka-server-stop.sh $KAFKA_HOME/config/server.properties
查看topic
kafka-topics.sh --list --zookeeper hadoop001:2181
创建topic
kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 1 --partitions 1 --topic streamingtopic
删除topic
kafka-topics.sh --zookeeper hadoop001:2181 --delete --topic test
启动flume
flume-ng agent --name exec-memory-kafka --conf $FLUME_HOME/conf --conf-file /home/workspace/geomesa/geomesa-spark/ais/ais_flume_kafka.conf -Dflume.root.logger=INFO,console
启动kafka
生产者,产生数据
kafka-console-producer.sh --broker-list hadoop001:9092 --topic aistopic
启动kafka
消费者,查看是否消费数据
kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic aistopic --from-beginning --consumer-property group.id=test
--from-beginning 从头开始消费
--consumer-property group.id=test 指定消费组
geoserver
中添加geomesa(Kafka)
表中的数据进行可视化。
spring boot
和vue
进行前端可视化。
jar包执行命令
java -cp geomesa-kafka-1.0.jar com.gis.geomesa.kafka.ais.AIS
java -cp jar
com.example.geomesa.kafka.KafkaQuickStart
-brokers brokers -zookeepers zookeepers
此工程由数据的产生、采集、分析、存储和可视化。
-
启动
zookeeper
zkServer.sh start
-
启动
hdfs
start-dfs.sh
-
启动
hbase
start-hbase.sh
-
启动
tomcat
startup.sh
-
启动kafka
kafka-server-start.sh $KAFKA_HOME/config/server.properties
-
启动
flume
flume-ng agent --name exec-memory-kafka --conf $FLUME_HOME/conf --conf-file /home/workspace/geomesa/geomesa-spark/streaming_flume_kafka.conf -Dflume.root.logger=INFO,console
-
启动应用程序
-
启动数据生成脚本
crontab -e
python模拟生成代码
使用python脚本模拟生成人的位置信息,字段为name
、longitude
、latitude
、date
。
vim point_data.py
#coding:utf-8
import string
import random
import time
# randmon name
def name():
return random.choice(string.ascii_uppercase)+''.join(random.choice(string.ascii_lowercase) for _ in range(4))
# random lon
def longitude():
return "%.6f" % random.uniform(-180,180)
# random lat
def latitude():
return "%.6f" % random.uniform(-90,90)
# localtime
def date():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# merge all data
def spatio_temporal_data():
# w表示会将之前的数据覆盖,a表示追加
# f = open("D:\data\geomesa\\spatio_temporal_data.log","w+")
# f = open("D:\data\geomesa\\spatio_temporal_data.log","a")
f = open("/home/workspace/geomesa/geomesa-spark/person/spatio_temporal_data.log","w+")
while True:
spatio_temporal_log = "{name}\t{longitude}\t{latitude}\t{date}".format(
name = name(),longitude=longitude(),latitude=latitude(),date = date()
)
f.write(spatio_temporal_log + "\n")
print(spatio_temporal_log)
time.sleep(1)
# merge all data,每一100条
def spatio_temporal_data_100(count = 100):
# f = open("D:\data\geomesa\\spatio_temporal_data.log","w+")
f = open("/home/workspace/geomesa/geomesa-spark/person/spatio_temporal_data.log","a")
while count >= 1:
spatio_temporal_log = "{name}\t{longitude}\t{latitude}\t{date}".format(
name = name(),longitude=longitude(),latitude=latitude(),date = date()
)
f.write(spatio_temporal_log + "\n")
print(spatio_temporal_log)
count = count - 1
if __name__ == "__main__":
# spatio_temporal_data()
spatio_temporal_data_100()
将脚本上传到linux
中,进行测试
python point_data.py
查看有多少条输出结果
wc -l spatio_temporal_data.log
more spatio_temporal_data.log
Linux执行python代码
编写程序执行脚本
vim point_generator.sh
python /home/workspace/geomesa/geomesa-spark/point_data.py
给脚本添加可执行权限
chmod u+x point_generator.sh
执行脚本
./point_generator.sh
crontab
定时调度脚本执行python
代码
-e:编辑某个用户的
crontab
文件内容。如果不指定用户,则表示编辑当前用户的crontab
文件
crontab -e
每分钟执行一次,在文件的前面加上#
号停止定时任务
*/1 * * * * /home/workspace/geomesa/geomesa-spark/point_generate.sh
crontab
工具网址
http://tool.lu/crontab
可以使用tail -f
命令查看数据生成是否正常
查看后100行数据
tail -100f spatio_temporal_data.log
测试flume
能否采集到数据,将采集到的数据输出到控制台。
测试选型如下:
access.log ==> 控制台输出
source ==> exec
channel ==> memory
sink ==> logger
编写配置文件streaming_project.conf
vim streaming_project.conf
exec-memory-logger.sources = exec-source
exec-memory-logger.channels = memory-channel
exec-memory-logger.sinks = logger-sink
exec-memory-logger.sources.exec-source.type = exec
exec-memory-logger.sources.exec-source.command = tail -F /home/workspace/geomesa/geomesa-spark/person/spatio_temporal_data.log
exec-memory-logger.sources.exec-source.shell = /bin/sh -c
exec-memory-logger.channels.memory-channel.type = memory
exec-memory-logger.sinks.logger-sink.type = logger
exec-memory-logger.sources.exec-source.channels = memory-channel
exec-memory-logger.sinks.logger-sink.channel = memory-channel
启动flume,观察是否有数据输出到控制台
flume-ng agent --name exec-memory-logger --conf $FLUME_HOME/conf --conf-file /home/workspace/geomesa/geomesa-spark/person/streaming_project.conf -Dflume.root.logger=INFO,console
使用flume
采集生成的数据并发送到kafka
编写streaming_flume_kafka.conf
配置文件
选型如下:
access.log ==> kafka
source ==> exec
channel ==> memory
sink ==> kafka-sink
vim streaming_flume_kafka.conf
exec-memory-kafka.sources = exec-source
exec-memory-kafka.channels = memory-channel
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /home/workspace/geomesa/geomesa-spark/person/spatio_temporal_data.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
exec-memory-kafka.channels.memory-channel.type = memory
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop001:9092
exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
启动zookeeper
(kafka
需要)
zkServer.sh start
检查zk
状态
zkServer.sh status
启动kafka
控制台显示
kafka-server-start.sh $KAFKA_HOME/config/server.properties
后台执行
daemon 是一个守护进程,守护进行一直在后台执行
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
停止kafka
kafka-server-stop.sh $KAFKA_HOME/config/server.properties
查看topic
kafka-topics.sh --list --zookeeper hadoop001:2181
创建topic
kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 1 --partitions 1 --topic streamingtopic
删除topic
kafka-topics.sh --zookeeper hadoop001:2181 --delete --topic test
启动flume
flume-ng agent --name exec-memory-kafka --conf $FLUME_HOME/conf --conf-file /home/workspace/geomesa/geomesa-spark/person/streaming_flume_kafka.conf -Dflume.root.logger=INFO,console
启动kafka
消费者,查看是否消费数据
kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic streamingtopic --from-beginning
spark streaming
消费kafka
中的数据
编写spark streaming
程序
首先确保启动zookeeper
、hdfs
、hbase
已经启动,再启动kafka
,后启动flume
,最后启动sparkstreaming
应用程序。
spark sql
离线处理。
hbase
和hdfs
存储数据
启动hdfs
start-dfs.sh
启动hbase
start-hbase.sh
hbase
删除数据
disable 'tablename'
drop 'tablename'
geoserver
中添加geomesa(HBase)
表中的数据进行可视化。
spring boot
和vue
进行前端可视化。