Files
Rader_Success_5/tcp-output(1).py
2026-01-22 16:36:22 +08:00

330 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import random
import requests
from datetime import datetime
# 添加InfluxDB配置
INFLUXDB_URL = 'http://8.134.11.76:8086'
INFLUXDB_TOKEN = 'KuTa5ZsqoHIhi2IglOO06zExUYw1_mJ6K0mcA9X1y6O6CJDog3_Cgr8mUw1SwpuCCKRElqxa6wAhrrhsYPytkg=='
INFLUXDB_ORG = 'gzlg'
INFLUXDB_BUCKET = 'gzlg'
# 配置设备ID设置为0则自动注册设置为1000~1999则固定使用该ID
CONFIG_DEVICE_ID = 1003
# 分配的设备ID初始为None注册后更新
assigned_device_id = None
# 相位计数器
phase_counter = 0
def get_next_device_id():
"""从InfluxDB查询已注册的设备ID并返回下一个可用的ID"""
try:
# 查询InfluxDB中已存在的设备ID
query = '''
from(bucket: "{}")
|> range(start: -365d)
|> filter(fn: (r) => r["_measurement"] == "device_data")
|> keep(columns: ["deviceId"])
|> distinct(column: "deviceId")
|> sort()
'''.format(INFLUXDB_BUCKET)
url = f"{INFLUXDB_URL}/api/v2/query?org={INFLUXDB_ORG}"
headers = {
"Authorization": f"Token {INFLUXDB_TOKEN}",
"Content-Type": "application/json"
}
data = {
"query": query
}
response = requests.post(url, headers=headers, json=data)
print(f"🔍 InfluxDB查询响应状态: {response.status_code}")
if response.status_code == 200:
print(f"🔍 InfluxDB查询响应内容: {response.text}")
# 解析响应数据
device_ids = []
lines = response.text.strip().split('\n')
for line in lines:
if line and not line.startswith('#') and 'deviceId' not in line:
try:
# 解析CSV格式的响应数据
# 格式类似于: ,_result,0,1001,1001
parts = line.split(',')
if len(parts) >= 5:
# deviceId在第4个位置索引3
device_id_str = parts[3].strip()
if device_id_str:
device_id = int(device_id_str)
if 1000 <= device_id <= 1999: # 只考虑1000-1999范围内的设备ID
device_ids.append(device_id)
print(f"📱 发现设备ID: {device_id}")
except (ValueError, IndexError) as e:
# 忽略解析错误的行
print(f"⚠️ 忽略无法解析的行: {line}")
continue
# 对设备ID进行排序
device_ids.sort()
# 找到下一个可用的ID
next_id = 1001
for device_id in device_ids:
if device_id == next_id:
next_id += 1
elif device_id > next_id:
break
# 确保ID在有效范围内
if next_id > 1999:
next_id = 1001 # 如果超出范围,从头开始
print(f"📊 查询到已注册设备: {device_ids}")
print(f"🆕 分配新设备ID: {next_id}")
return next_id
else:
print(f"❌ 查询InfluxDB失败: {response.status_code} - {response.text}")
# 如果查询失败返回默认ID
return 1001
except Exception as e:
print(f"❌ 查询设备ID时出错: {e}")
import traceback
traceback.print_exc()
# 如果查询失败返回默认ID
return 1001
def register_device():
"""注册设备并获取设备ID"""
global assigned_device_id
# 检查配置的设备ID
if CONFIG_DEVICE_ID == 0:
# 自动注册模式
try:
# 获取下一个可用的设备ID
next_device_id = get_next_device_id()
assigned_device_id = next_device_id
print(f"✅ 设备注册成功! 设备ID: {assigned_device_id} (0x{assigned_device_id:04X})")
return True
except Exception as e:
print(f"❌ 注册过程中发生错误: {e}")
return False
elif 1000 <= CONFIG_DEVICE_ID <= 1999:
# 固定设备ID模式
assigned_device_id = CONFIG_DEVICE_ID
print(f"✅ 使用固定设备ID: {assigned_device_id} (0x{assigned_device_id:04X})")
return True
else:
# 配置的设备ID无效
print(f"❌ 配置的设备ID {CONFIG_DEVICE_ID} 无效请设置为0自动注册或1000~1999之间的值")
return False
def save_data_to_influxdb(protocol_id, data_value):
"""保存日常数据到InfluxDB"""
try:
# 根据协议ID确定字段名
field_mapping = {
1: "heartRate",
2: "breathingRate",
13: "personDetected",
14: "humanActivity",
15: "humanDistance", # 人体距离 (cm)
16: "humanPosition", # 人体方位 (cm)
17: "sleepState" # 睡眠状态
}
if protocol_id in field_mapping:
field_name = field_mapping[protocol_id]
# 创建数据点 - 使用 "daily_data" 作为测量值名称
data_point = {
"measurement": "daily_data", # 改为 daily_data 以区分日常数据
"tags": {
"deviceId": assigned_device_id,
"dataType": "daily" # 标识这是日常数据
},
"time": datetime.utcnow().isoformat() + "Z",
"fields": {}
}
# 对特定字段进行数值处理
if protocol_id in [1, 2]: # 心率和呼吸频率需要除以10
data_point["fields"][field_name] = float(data_value) / 10.0
elif protocol_id in [13, 14]: # 人检/活动数据确保是整数0或1
# 强制转换为整数确保只有0或1
data_point["fields"][field_name] = int(data_value)
if data_point["fields"][field_name] not in [0, 1]:
print(f"⚠️ 警告: 人检/活动数据值异常: {data_value}, 强制转换为: {data_point['fields'][field_name]}")
elif protocol_id == 15: # 人体距离范围0-65535
data_point["fields"][field_name] = int(data_value)
elif protocol_id == 16: # 人体方位,可以是正负值
data_point["fields"][field_name] = int(data_value)
elif protocol_id == 17: # 睡眠状态,使用预定义值
data_point["fields"][field_name] = int(data_value)
else:
data_point["fields"][field_name] = data_value
# 发送数据到InfluxDB
url = f"{INFLUXDB_URL}/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}"
headers = {
"Authorization": f"Token {INFLUXDB_TOKEN}",
"Content-Type": "text/plain; charset=utf-8"
}
# 构造行协议格式的数据,明确指定数据类型
if protocol_id in [13, 14, 15, 16, 17]:
# 对于人检/活动/距离/位置/睡眠状态数据,使用整数格式
line_protocol = f"daily_data,deviceId={assigned_device_id},dataType=daily {field_name}={int(data_point['fields'][field_name])}i"
else:
# 对于其他数据,使用浮点数格式
line_protocol = f"daily_data,deviceId={assigned_device_id},dataType=daily {field_name}={data_point['fields'][field_name]}"
response = requests.post(url, headers=headers, data=line_protocol)
if response.status_code == 204:
print(f"✅ 日常数据已保存到InfluxDB设备{assigned_device_id}上: {field_name}={data_point['fields'][field_name]}")
else:
print(f"❌ 保存日常数据到InfluxDB失败: {response.status_code} - {response.text}")
else:
print(f"⚠️ 未知的协议ID: {protocol_id}")
except Exception as e:
print(f"❌ 保存日常数据到InfluxDB时出错: {e}")
import traceback
traceback.print_exc()
def save_sleep_data_to_influxdb(sleep_data):
"""保存睡眠数据到InfluxDB"""
try:
# 构造睡眠数据的行协议格式 - 使用 "sleep_data" 作为测量值名称
line_protocol = f"sleep_data,deviceId={assigned_device_id},dataType=sleep "
# 按顺序添加各个字段
fields = []
fields.append(f"sleepQualityScore={int(sleep_data['sleepQualityScore'])}i") # 1B 睡眠质量评分 (0~100)
fields.append(f"totalSleepDuration={int(sleep_data['totalSleepDuration'])}i") # 2B 睡眠总时长 (0~65535 分钟)
fields.append(f"awakeDurationRatio={int(sleep_data['awakeDurationRatio'])}i") # 1B 清醒时长占比 (0~100)
fields.append(f"lightSleepRatio={int(sleep_data['lightSleepRatio'])}i") # 1B 浅睡时长占比 (0~100)
fields.append(f"deepSleepRatio={int(sleep_data['deepSleepRatio'])}i") # 1B 深睡时长占比 (0~100)
fields.append(f"outOfBedDuration={int(sleep_data['outOfBedDuration'])}i") # 1B 离床时长 (0~255)
fields.append(f"outOfBedCount={int(sleep_data['outOfBedCount'])}i") # 1B 离床次数 (0~255)
fields.append(f"turnCount={int(sleep_data['turnCount'])}i") # 1B 翻身次数 (0~255)
fields.append(f"avgBreathingRate={int(sleep_data['avgBreathingRate'])}i") # 1B 平均呼吸 (0~25)
fields.append(f"avgHeartRate={int(sleep_data['avgHeartRate'])}i") # 1B 平均心跳 (0~100)
fields.append(f"apneaCount={int(sleep_data['apneaCount'])}i") # 1B 呼吸暂停次数 (0~10)
line_protocol += ",".join(fields)
# 发送数据到InfluxDB
url = f"{INFLUXDB_URL}/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}"
headers = {
"Authorization": f"Token {INFLUXDB_TOKEN}",
"Content-Type": "text/plain; charset=utf-8"
}
response = requests.post(url, headers=headers, data=line_protocol)
if response.status_code == 204:
print(f"✅ 睡眠数据已保存到InfluxDB设备{assigned_device_id}")
else:
print(f"❌ 保存睡眠数据到InfluxDB失败: {response.status_code} - {response.text}")
except Exception as e:
print(f"❌ 保存睡眠数据到InfluxDB时出错: {e}")
import traceback
traceback.print_exc()
def generate_random_sleep_data():
"""生成随机睡眠数据用于测试"""
sleep_data = {
"sleepQualityScore": random.randint(0, 100), # 1B 睡眠质量评分 (0~100)
"totalSleepDuration": random.randint(0, 65535), # 2B 睡眠总时长 (0~65535 分钟)
"awakeDurationRatio": random.randint(0, 100), # 1B 清醒时长占比 (0~100)
"lightSleepRatio": random.randint(0, 100), # 1B 浅睡时长占比 (0~100)
"deepSleepRatio": random.randint(0, 100), # 1B 深睡时长占比 (0~100)
"outOfBedDuration": random.randint(0, 255), # 1B 离床时长 (0~255)
"outOfBedCount": random.randint(0, 255), # 1B 离床次数 (0~255)
"turnCount": random.randint(0, 255), # 1B 翻身次数 (0~255)
"avgBreathingRate": random.randint(0, 25), # 1B 平均呼吸 (0~25)
"avgHeartRate": random.randint(0, 100), # 1B 平均心跳 (0~100)
"apneaCount": random.randint(0, 10), # 1B 呼吸暂停次数 (0~10)
}
# 确保比例字段总和为100
total_ratio = sleep_data["awakeDurationRatio"] + sleep_data["lightSleepRatio"] + sleep_data["deepSleepRatio"]
if total_ratio != 100:
# 调整浅睡时长占比以确保总和为100
adjustment = 100 - total_ratio
sleep_data["lightSleepRatio"] = max(0, min(100, sleep_data["lightSleepRatio"] + adjustment))
# 再次检查总和
total_ratio = sleep_data["awakeDurationRatio"] + sleep_data["lightSleepRatio"] + sleep_data["deepSleepRatio"]
if total_ratio != 100:
# 如果仍然不是100则最后一次调整浅睡占比
sleep_data["lightSleepRatio"] += (100 - total_ratio)
sleep_data["lightSleepRatio"] = max(0, min(100, sleep_data["lightSleepRatio"]))
return sleep_data
def main():
global assigned_device_id
try:
# 第一步注册设备获取设备ID
if not register_device():
print("❌ 设备注册失败,程序退出")
return
print(f"🎯 开始使用设备ID {assigned_device_id} 发送数据...")
# 启动时立即发送一次睡眠数据
print("⏰ 启动时生成并发送睡眠数据...")
initial_sleep_data = generate_random_sleep_data()
save_sleep_data_to_influxdb(initial_sleep_data)
# 记录上次发送睡眠数据的时间
last_sleep_data_time = time.time()
# 第二步:开始发送数据
while True:
# 初始化数据值
data_value = 0
# 发送其他数据
protocol_id = random.choice([1, 2, 13, 14, 15, 16, 17]) # 1=心跳, 2=呼吸, 13=检测到人, 14=人体活动, 15=人体距离, 16=人体方位, 17=睡眠状态
if protocol_id == 1: # 心跳
data_value = random.randint(600, 1000)
elif protocol_id == 2: # 呼吸
data_value = random.randint(120, 200)
elif protocol_id == 13: # 检测到人1检测到0未检测到
data_value = random.choice([1])
elif protocol_id == 14: # 人体活动1活动0静止
data_value = random.choice([0])
elif protocol_id == 15: # 人体距离 (cm)范围0-65535
data_value = random.randint(0, 65535)
elif protocol_id == 16: # 人体方位 (cm),可以有正负值
data_value = random.randint(-32768, 32767)
elif protocol_id == 17: # 睡眠状态 (0x00=深睡, 0x01=浅睡, 0x02=清醒, 0x03=无)
data_value = random.choice([0, 1, 2, 3])
# 直接发送数据到InfluxDB
save_data_to_influxdb(protocol_id, data_value)
# 每隔一段时间例如30分钟发送一次睡眠数据
current_time = time.time()
if current_time - last_sleep_data_time >= 1800: # 30分钟 = 1800秒
print("⏰ 生成并发送睡眠数据...")
sleep_data = generate_random_sleep_data()
save_sleep_data_to_influxdb(sleep_data)
last_sleep_data_time = current_time
# 设置发送间隔
time.sleep(0.4) # 每0.4秒发送一次数据
except KeyboardInterrupt:
print(f"\n🛑 设备 {assigned_device_id} 发送端已停止")
except Exception as e:
print(f"❌ 发生错误: {e}")
if __name__ == "__main__":
main()