import time import random import requests from datetime import datetime # 添加InfluxDB配置 INFLUXDB_URL = 'http://8.134.11.76:8086' INFLUXDB_TOKEN = 'KuTa5ZsqoHIhi2IglOO06zExUYw1_mJ6K0mcA9X1y6O6CJDog3_Cgr8mUw1SwpuCCKRElqxa6wAhrrhsYPytkg==' INFLUXDB_ORG = 'gzlg' INFLUXDB_BUCKET = 'gzlg' # 配置设备ID:设置为0则自动注册,设置为1000~1999则固定使用该ID CONFIG_DEVICE_ID = 1002 # 分配的设备ID(初始为None,注册后更新) assigned_device_id = None # 相位计数器 phase_counter = 0 def get_next_device_id(): """从InfluxDB查询已注册的设备ID,并返回下一个可用的ID""" try: # 查询InfluxDB中已存在的设备ID query = ''' from(bucket: "{}") |> range(start: -365d) |> filter(fn: (r) => r["_measurement"] == "device_data") |> keep(columns: ["deviceId"]) |> distinct(column: "deviceId") |> sort() '''.format(INFLUXDB_BUCKET) url = f"{INFLUXDB_URL}/api/v2/query?org={INFLUXDB_ORG}" headers = { "Authorization": f"Token {INFLUXDB_TOKEN}", "Content-Type": "application/json" } data = { "query": query } response = requests.post(url, headers=headers, json=data) print(f"🔍 InfluxDB查询响应状态: {response.status_code}") if response.status_code == 200: print(f"🔍 InfluxDB查询响应内容: {response.text}") # 解析响应数据 device_ids = [] lines = response.text.strip().split('\n') for line in lines: if line and not line.startswith('#') and 'deviceId' not in line: try: # 解析CSV格式的响应数据 # 格式类似于: ,_result,0,1001,1001 parts = line.split(',') if len(parts) >= 5: # deviceId在第4个位置(索引3) device_id_str = parts[3].strip() if device_id_str: device_id = int(device_id_str) if 1000 <= device_id <= 1999: # 只考虑1000-1999范围内的设备ID device_ids.append(device_id) print(f"📱 发现设备ID: {device_id}") except (ValueError, IndexError) as e: # 忽略解析错误的行 print(f"⚠️ 忽略无法解析的行: {line}") continue # 对设备ID进行排序 device_ids.sort() # 找到下一个可用的ID next_id = 1001 for device_id in device_ids: if device_id == next_id: next_id += 1 elif device_id > next_id: break # 确保ID在有效范围内 if next_id > 1999: next_id = 1001 # 如果超出范围,从头开始 print(f"📊 查询到已注册设备: {device_ids}") print(f"🆕 分配新设备ID: {next_id}") return next_id else: print(f"❌ 查询InfluxDB失败: {response.status_code} - {response.text}") # 如果查询失败,返回默认ID return 1001 except Exception as e: print(f"❌ 查询设备ID时出错: {e}") import traceback traceback.print_exc() # 如果查询失败,返回默认ID return 1001 def register_device(): """注册设备并获取设备ID""" global assigned_device_id # 检查配置的设备ID if CONFIG_DEVICE_ID == 0: # 自动注册模式 try: # 获取下一个可用的设备ID next_device_id = get_next_device_id() assigned_device_id = next_device_id print(f"✅ 设备注册成功! 设备ID: {assigned_device_id} (0x{assigned_device_id:04X})") return True except Exception as e: print(f"❌ 注册过程中发生错误: {e}") return False elif 1000 <= CONFIG_DEVICE_ID <= 1999: # 固定设备ID模式 assigned_device_id = CONFIG_DEVICE_ID print(f"✅ 使用固定设备ID: {assigned_device_id} (0x{assigned_device_id:04X})") return True else: # 配置的设备ID无效 print(f"❌ 配置的设备ID {CONFIG_DEVICE_ID} 无效,请设置为0(自动注册)或1000~1999之间的值") return False def save_data_to_influxdb(protocol_id, data_value): """保存数据到InfluxDB""" try: # 创建数据点 data_point = { "measurement": "device_data", "tags": { "deviceId": assigned_device_id }, "time": datetime.utcnow().isoformat() + "Z", "fields": {} } # 根据协议ID确定字段名 field_mapping = { 1: "heartRate", 2: "breathingRate", 3: "heartPhase", 4: "breathingPhase", 13: "personDetected", 14: "humanActivity" } if protocol_id in field_mapping: field_name = field_mapping[protocol_id] # 对特定字段进行数值处理 if protocol_id in [1, 2]: # 心率和呼吸频率需要除以10 data_point["fields"][field_name] = float(data_value) / 10.0 elif protocol_id in [3, 4]: # 相位数据需要除以1000 data_point["fields"][field_name] = float(data_value) / 1000.0 elif protocol_id in [13, 14]: # 人检/活动数据,确保是整数0或1 # 强制转换为整数,确保只有0或1 data_point["fields"][field_name] = int(data_value) if data_point["fields"][field_name] not in [0, 1]: print(f"⚠️ 警告: 人检/活动数据值异常: {data_value}, 强制转换为: {data_point['fields'][field_name]}") else: data_point["fields"][field_name] = data_value print(f"📊 准备保存数据: 协议ID={protocol_id}, 字段={field_name}, 值={data_point['fields'][field_name]}, 原始值={data_value}") # 发送数据到InfluxDB url = f"{INFLUXDB_URL}/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}" headers = { "Authorization": f"Token {INFLUXDB_TOKEN}", "Content-Type": "text/plain; charset=utf-8" } # 构造行协议格式的数据,明确指定数据类型 if protocol_id in [13, 14]: # 对于人检/活动数据,使用整数格式 line_protocol = f"device_data,deviceId={assigned_device_id} {field_name}={int(data_point['fields'][field_name])}i" else: # 对于其他数据,使用浮点数格式 line_protocol = f"device_data,deviceId={assigned_device_id} {field_name}={data_point['fields'][field_name]}" response = requests.post(url, headers=headers, data=line_protocol) if response.status_code == 204: print(f"✅ 数据已保存到InfluxDB设备{assigned_device_id}上: {field_name}={data_point['fields'][field_name]}") else: print(f"❌ 保存数据到InfluxDB失败: {response.status_code} - {response.text}") else: print(f"⚠️ 未知的协议ID: {protocol_id}") except Exception as e: print(f"❌ 保存数据到InfluxDB时出错: {e}") import traceback traceback.print_exc() def main(): global assigned_device_id, phase_counter try: # 第一步:注册设备获取设备ID if not register_device(): print("❌ 设备注册失败,程序退出") return print(f"🎯 开始使用设备ID {assigned_device_id} 发送数据...") # 第二步:开始发送数据 while True: # 初始化数据值 data_value = 0 # 每发送100个相位数据后发送其他数据 if phase_counter < 2: # 发送相位数据 (心率相位或呼吸相位) protocol_id = random.choice([3, 4]) # 3=心率相位, 4=呼吸相位 data_value = random.randint(-10000, 10000) phase_counter += 1 else: # 重置计数器 phase_counter = 0 # 发送其他数据 protocol_id = random.choice([1, 2, 13, 14]) # 1=心跳, 2=呼吸, 13=检测到人, 14=人体活动 if protocol_id == 1: # 心跳 data_value = random.randint(600, 1000) elif protocol_id == 2: # 呼吸 data_value = random.randint(120, 200) elif protocol_id == 13: # 检测到人(1检测到,0未检测到) data_value = random.choice([0, 1]) elif protocol_id == 14: # 人体活动(1活动,0静止) data_value = random.choice([0, 1]) # 直接发送数据到InfluxDB save_data_to_influxdb(protocol_id, data_value) # 设置发送间隔 time.sleep(0.1) # 每100ms发送一次数据 except KeyboardInterrupt: print(f"\n🛑 设备 {assigned_device_id} 发送端已停止") except Exception as e: print(f"❌ 发生错误: {e}") if __name__ == "__main__": main()