Files
Rader_IQ/tcp-output(1).py

330 lines
15 KiB
Python
Raw Normal View History

2026-01-22 12:04:29 +08:00
import time
import random
import requests
from datetime import datetime
# 添加InfluxDB配置
INFLUXDB_URL = 'http://8.134.11.76:8086'
INFLUXDB_TOKEN = 'KuTa5ZsqoHIhi2IglOO06zExUYw1_mJ6K0mcA9X1y6O6CJDog3_Cgr8mUw1SwpuCCKRElqxa6wAhrrhsYPytkg=='
INFLUXDB_ORG = 'gzlg'
INFLUXDB_BUCKET = 'gzlg'
# 配置设备ID设置为0则自动注册设置为1000~1999则固定使用该ID
CONFIG_DEVICE_ID = 1003
# 分配的设备ID初始为None注册后更新
assigned_device_id = None
# 相位计数器
phase_counter = 0
def get_next_device_id():
"""从InfluxDB查询已注册的设备ID并返回下一个可用的ID"""
try:
# 查询InfluxDB中已存在的设备ID
query = '''
from(bucket: "{}")
|> range(start: -365d)
|> filter(fn: (r) => r["_measurement"] == "device_data")
|> keep(columns: ["deviceId"])
|> distinct(column: "deviceId")
|> sort()
'''.format(INFLUXDB_BUCKET)
url = f"{INFLUXDB_URL}/api/v2/query?org={INFLUXDB_ORG}"
headers = {
"Authorization": f"Token {INFLUXDB_TOKEN}",
"Content-Type": "application/json"
}
data = {
"query": query
}
response = requests.post(url, headers=headers, json=data)
print(f"🔍 InfluxDB查询响应状态: {response.status_code}")
if response.status_code == 200:
print(f"🔍 InfluxDB查询响应内容: {response.text}")
# 解析响应数据
device_ids = []
lines = response.text.strip().split('\n')
for line in lines:
if line and not line.startswith('#') and 'deviceId' not in line:
try:
# 解析CSV格式的响应数据
# 格式类似于: ,_result,0,1001,1001
parts = line.split(',')
if len(parts) >= 5:
# deviceId在第4个位置索引3
device_id_str = parts[3].strip()
if device_id_str:
device_id = int(device_id_str)
if 1000 <= device_id <= 1999: # 只考虑1000-1999范围内的设备ID
device_ids.append(device_id)
print(f"📱 发现设备ID: {device_id}")
except (ValueError, IndexError) as e:
# 忽略解析错误的行
print(f"⚠️ 忽略无法解析的行: {line}")
continue
# 对设备ID进行排序
device_ids.sort()
# 找到下一个可用的ID
next_id = 1001
for device_id in device_ids:
if device_id == next_id:
next_id += 1
elif device_id > next_id:
break
# 确保ID在有效范围内
if next_id > 1999:
next_id = 1001 # 如果超出范围,从头开始
print(f"📊 查询到已注册设备: {device_ids}")
print(f"🆕 分配新设备ID: {next_id}")
return next_id
else:
print(f"❌ 查询InfluxDB失败: {response.status_code} - {response.text}")
# 如果查询失败返回默认ID
return 1001
except Exception as e:
print(f"❌ 查询设备ID时出错: {e}")
import traceback
traceback.print_exc()
# 如果查询失败返回默认ID
return 1001
def register_device():
"""注册设备并获取设备ID"""
global assigned_device_id
# 检查配置的设备ID
if CONFIG_DEVICE_ID == 0:
# 自动注册模式
try:
# 获取下一个可用的设备ID
next_device_id = get_next_device_id()
assigned_device_id = next_device_id
print(f"✅ 设备注册成功! 设备ID: {assigned_device_id} (0x{assigned_device_id:04X})")
return True
except Exception as e:
print(f"❌ 注册过程中发生错误: {e}")
return False
elif 1000 <= CONFIG_DEVICE_ID <= 1999:
# 固定设备ID模式
assigned_device_id = CONFIG_DEVICE_ID
print(f"✅ 使用固定设备ID: {assigned_device_id} (0x{assigned_device_id:04X})")
return True
else:
# 配置的设备ID无效
print(f"❌ 配置的设备ID {CONFIG_DEVICE_ID} 无效请设置为0自动注册或1000~1999之间的值")
return False
def save_data_to_influxdb(protocol_id, data_value):
"""保存日常数据到InfluxDB"""
try:
# 根据协议ID确定字段名
field_mapping = {
1: "heartRate",
2: "breathingRate",
13: "personDetected",
14: "humanActivity",
15: "humanDistance", # 人体距离 (cm)
16: "humanPosition", # 人体方位 (cm)
17: "sleepState" # 睡眠状态
}
if protocol_id in field_mapping:
field_name = field_mapping[protocol_id]
# 创建数据点 - 使用 "daily_data" 作为测量值名称
data_point = {
"measurement": "daily_data", # 改为 daily_data 以区分日常数据
"tags": {
"deviceId": assigned_device_id,
"dataType": "daily" # 标识这是日常数据
},
"time": datetime.utcnow().isoformat() + "Z",
"fields": {}
}
# 对特定字段进行数值处理
if protocol_id in [1, 2]: # 心率和呼吸频率需要除以10
data_point["fields"][field_name] = float(data_value) / 10.0
elif protocol_id in [13, 14]: # 人检/活动数据确保是整数0或1
# 强制转换为整数确保只有0或1
data_point["fields"][field_name] = int(data_value)
if data_point["fields"][field_name] not in [0, 1]:
print(f"⚠️ 警告: 人检/活动数据值异常: {data_value}, 强制转换为: {data_point['fields'][field_name]}")
elif protocol_id == 15: # 人体距离范围0-65535
data_point["fields"][field_name] = int(data_value)
elif protocol_id == 16: # 人体方位,可以是正负值
data_point["fields"][field_name] = int(data_value)
elif protocol_id == 17: # 睡眠状态,使用预定义值
data_point["fields"][field_name] = int(data_value)
else:
data_point["fields"][field_name] = data_value
# 发送数据到InfluxDB
url = f"{INFLUXDB_URL}/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}"
headers = {
"Authorization": f"Token {INFLUXDB_TOKEN}",
"Content-Type": "text/plain; charset=utf-8"
}
# 构造行协议格式的数据,明确指定数据类型
if protocol_id in [13, 14, 15, 16, 17]:
# 对于人检/活动/距离/位置/睡眠状态数据,使用整数格式
line_protocol = f"daily_data,deviceId={assigned_device_id},dataType=daily {field_name}={int(data_point['fields'][field_name])}i"
else:
# 对于其他数据,使用浮点数格式
line_protocol = f"daily_data,deviceId={assigned_device_id},dataType=daily {field_name}={data_point['fields'][field_name]}"
response = requests.post(url, headers=headers, data=line_protocol)
if response.status_code == 204:
print(f"✅ 日常数据已保存到InfluxDB设备{assigned_device_id}上: {field_name}={data_point['fields'][field_name]}")
else:
print(f"❌ 保存日常数据到InfluxDB失败: {response.status_code} - {response.text}")
else:
print(f"⚠️ 未知的协议ID: {protocol_id}")
except Exception as e:
print(f"❌ 保存日常数据到InfluxDB时出错: {e}")
import traceback
traceback.print_exc()
def save_sleep_data_to_influxdb(sleep_data):
"""保存睡眠数据到InfluxDB"""
try:
# 构造睡眠数据的行协议格式 - 使用 "sleep_data" 作为测量值名称
line_protocol = f"sleep_data,deviceId={assigned_device_id},dataType=sleep "
# 按顺序添加各个字段
fields = []
fields.append(f"sleepQualityScore={int(sleep_data['sleepQualityScore'])}i") # 1B 睡眠质量评分 (0~100)
fields.append(f"totalSleepDuration={int(sleep_data['totalSleepDuration'])}i") # 2B 睡眠总时长 (0~65535 分钟)
fields.append(f"awakeDurationRatio={int(sleep_data['awakeDurationRatio'])}i") # 1B 清醒时长占比 (0~100)
fields.append(f"lightSleepRatio={int(sleep_data['lightSleepRatio'])}i") # 1B 浅睡时长占比 (0~100)
fields.append(f"deepSleepRatio={int(sleep_data['deepSleepRatio'])}i") # 1B 深睡时长占比 (0~100)
fields.append(f"outOfBedDuration={int(sleep_data['outOfBedDuration'])}i") # 1B 离床时长 (0~255)
fields.append(f"outOfBedCount={int(sleep_data['outOfBedCount'])}i") # 1B 离床次数 (0~255)
fields.append(f"turnCount={int(sleep_data['turnCount'])}i") # 1B 翻身次数 (0~255)
fields.append(f"avgBreathingRate={int(sleep_data['avgBreathingRate'])}i") # 1B 平均呼吸 (0~25)
fields.append(f"avgHeartRate={int(sleep_data['avgHeartRate'])}i") # 1B 平均心跳 (0~100)
fields.append(f"apneaCount={int(sleep_data['apneaCount'])}i") # 1B 呼吸暂停次数 (0~10)
line_protocol += ",".join(fields)
# 发送数据到InfluxDB
url = f"{INFLUXDB_URL}/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}"
headers = {
"Authorization": f"Token {INFLUXDB_TOKEN}",
"Content-Type": "text/plain; charset=utf-8"
}
response = requests.post(url, headers=headers, data=line_protocol)
if response.status_code == 204:
print(f"✅ 睡眠数据已保存到InfluxDB设备{assigned_device_id}")
else:
print(f"❌ 保存睡眠数据到InfluxDB失败: {response.status_code} - {response.text}")
except Exception as e:
print(f"❌ 保存睡眠数据到InfluxDB时出错: {e}")
import traceback
traceback.print_exc()
def generate_random_sleep_data():
"""生成随机睡眠数据用于测试"""
sleep_data = {
"sleepQualityScore": random.randint(0, 100), # 1B 睡眠质量评分 (0~100)
"totalSleepDuration": random.randint(0, 65535), # 2B 睡眠总时长 (0~65535 分钟)
"awakeDurationRatio": random.randint(0, 100), # 1B 清醒时长占比 (0~100)
"lightSleepRatio": random.randint(0, 100), # 1B 浅睡时长占比 (0~100)
"deepSleepRatio": random.randint(0, 100), # 1B 深睡时长占比 (0~100)
"outOfBedDuration": random.randint(0, 255), # 1B 离床时长 (0~255)
"outOfBedCount": random.randint(0, 255), # 1B 离床次数 (0~255)
"turnCount": random.randint(0, 255), # 1B 翻身次数 (0~255)
"avgBreathingRate": random.randint(0, 25), # 1B 平均呼吸 (0~25)
"avgHeartRate": random.randint(0, 100), # 1B 平均心跳 (0~100)
"apneaCount": random.randint(0, 10), # 1B 呼吸暂停次数 (0~10)
}
# 确保比例字段总和为100
total_ratio = sleep_data["awakeDurationRatio"] + sleep_data["lightSleepRatio"] + sleep_data["deepSleepRatio"]
if total_ratio != 100:
# 调整浅睡时长占比以确保总和为100
adjustment = 100 - total_ratio
sleep_data["lightSleepRatio"] = max(0, min(100, sleep_data["lightSleepRatio"] + adjustment))
# 再次检查总和
total_ratio = sleep_data["awakeDurationRatio"] + sleep_data["lightSleepRatio"] + sleep_data["deepSleepRatio"]
if total_ratio != 100:
# 如果仍然不是100则最后一次调整浅睡占比
sleep_data["lightSleepRatio"] += (100 - total_ratio)
sleep_data["lightSleepRatio"] = max(0, min(100, sleep_data["lightSleepRatio"]))
return sleep_data
def main():
global assigned_device_id
try:
# 第一步注册设备获取设备ID
if not register_device():
print("❌ 设备注册失败,程序退出")
return
print(f"🎯 开始使用设备ID {assigned_device_id} 发送数据...")
# 启动时立即发送一次睡眠数据
print("⏰ 启动时生成并发送睡眠数据...")
initial_sleep_data = generate_random_sleep_data()
save_sleep_data_to_influxdb(initial_sleep_data)
# 记录上次发送睡眠数据的时间
last_sleep_data_time = time.time()
# 第二步:开始发送数据
while True:
# 初始化数据值
data_value = 0
# 发送其他数据
protocol_id = random.choice([1, 2, 13, 14, 15, 16, 17]) # 1=心跳, 2=呼吸, 13=检测到人, 14=人体活动, 15=人体距离, 16=人体方位, 17=睡眠状态
if protocol_id == 1: # 心跳
data_value = random.randint(600, 1000)
elif protocol_id == 2: # 呼吸
data_value = random.randint(120, 200)
elif protocol_id == 13: # 检测到人1检测到0未检测到
data_value = random.choice([1])
elif protocol_id == 14: # 人体活动1活动0静止
data_value = random.choice([0])
elif protocol_id == 15: # 人体距离 (cm)范围0-65535
data_value = random.randint(0, 65535)
elif protocol_id == 16: # 人体方位 (cm),可以有正负值
data_value = random.randint(-32768, 32767)
elif protocol_id == 17: # 睡眠状态 (0x00=深睡, 0x01=浅睡, 0x02=清醒, 0x03=无)
data_value = random.choice([0, 1, 2, 3])
# 直接发送数据到InfluxDB
save_data_to_influxdb(protocol_id, data_value)
# 每隔一段时间例如30分钟发送一次睡眠数据
current_time = time.time()
if current_time - last_sleep_data_time >= 1800: # 30分钟 = 1800秒
print("⏰ 生成并发送睡眠数据...")
sleep_data = generate_random_sleep_data()
save_sleep_data_to_influxdb(sleep_data)
last_sleep_data_time = current_time
# 设置发送间隔
time.sleep(0.4) # 每0.4秒发送一次数据
except KeyboardInterrupt:
print(f"\n🛑 设备 {assigned_device_id} 发送端已停止")
except Exception as e:
print(f"❌ 发生错误: {e}")
if __name__ == "__main__":
main()