diff --git a/tcp-output(1).py b/tcp-output(1).py deleted file mode 100644 index 9bd9178..0000000 --- a/tcp-output(1).py +++ /dev/null @@ -1,330 +0,0 @@ -import time -import random -import requests -from datetime import datetime - -# 添加InfluxDB配置 -INFLUXDB_URL = 'http://8.134.11.76:8086' -INFLUXDB_TOKEN = 'KuTa5ZsqoHIhi2IglOO06zExUYw1_mJ6K0mcA9X1y6O6CJDog3_Cgr8mUw1SwpuCCKRElqxa6wAhrrhsYPytkg==' -INFLUXDB_ORG = 'gzlg' -INFLUXDB_BUCKET = 'gzlg' - -# 配置设备ID:设置为0则自动注册,设置为1000~1999则固定使用该ID -CONFIG_DEVICE_ID = 1003 - -# 分配的设备ID(初始为None,注册后更新) -assigned_device_id = None -# 相位计数器 -phase_counter = 0 - -def get_next_device_id(): - """从InfluxDB查询已注册的设备ID,并返回下一个可用的ID""" - try: - # 查询InfluxDB中已存在的设备ID - query = ''' - from(bucket: "{}") - |> range(start: -365d) - |> filter(fn: (r) => r["_measurement"] == "device_data") - |> keep(columns: ["deviceId"]) - |> distinct(column: "deviceId") - |> sort() - '''.format(INFLUXDB_BUCKET) - - url = f"{INFLUXDB_URL}/api/v2/query?org={INFLUXDB_ORG}" - headers = { - "Authorization": f"Token {INFLUXDB_TOKEN}", - "Content-Type": "application/json" - } - data = { - "query": query - } - - response = requests.post(url, headers=headers, json=data) - print(f"🔍 InfluxDB查询响应状态: {response.status_code}") - if response.status_code == 200: - print(f"🔍 InfluxDB查询响应内容: {response.text}") - # 解析响应数据 - device_ids = [] - lines = response.text.strip().split('\n') - for line in lines: - if line and not line.startswith('#') and 'deviceId' not in line: - try: - # 解析CSV格式的响应数据 - # 格式类似于: ,_result,0,1001,1001 - parts = line.split(',') - if len(parts) >= 5: - # deviceId在第4个位置(索引3) - device_id_str = parts[3].strip() - if device_id_str: - device_id = int(device_id_str) - if 1000 <= device_id <= 1999: # 只考虑1000-1999范围内的设备ID - device_ids.append(device_id) - print(f"📱 发现设备ID: {device_id}") - except (ValueError, IndexError) as e: - # 忽略解析错误的行 - print(f"⚠️ 忽略无法解析的行: {line}") - continue - - # 对设备ID进行排序 - device_ids.sort() - - # 找到下一个可用的ID - next_id = 1001 - for device_id in device_ids: - if device_id == next_id: - next_id += 1 - elif device_id > next_id: - break - - # 确保ID在有效范围内 - if next_id > 1999: - next_id = 1001 # 如果超出范围,从头开始 - - print(f"📊 查询到已注册设备: {device_ids}") - print(f"🆕 分配新设备ID: {next_id}") - return next_id - else: - print(f"❌ 查询InfluxDB失败: {response.status_code} - {response.text}") - # 如果查询失败,返回默认ID - return 1001 - except Exception as e: - print(f"❌ 查询设备ID时出错: {e}") - import traceback - traceback.print_exc() - # 如果查询失败,返回默认ID - return 1001 - -def register_device(): - """注册设备并获取设备ID""" - global assigned_device_id - - # 检查配置的设备ID - if CONFIG_DEVICE_ID == 0: - # 自动注册模式 - try: - # 获取下一个可用的设备ID - next_device_id = get_next_device_id() - assigned_device_id = next_device_id - - print(f"✅ 设备注册成功! 设备ID: {assigned_device_id} (0x{assigned_device_id:04X})") - return True - except Exception as e: - print(f"❌ 注册过程中发生错误: {e}") - return False - elif 1000 <= CONFIG_DEVICE_ID <= 1999: - # 固定设备ID模式 - assigned_device_id = CONFIG_DEVICE_ID - print(f"✅ 使用固定设备ID: {assigned_device_id} (0x{assigned_device_id:04X})") - return True - else: - # 配置的设备ID无效 - print(f"❌ 配置的设备ID {CONFIG_DEVICE_ID} 无效,请设置为0(自动注册)或1000~1999之间的值") - return False - -def save_data_to_influxdb(protocol_id, data_value): - """保存日常数据到InfluxDB""" - try: - # 根据协议ID确定字段名 - field_mapping = { - 1: "heartRate", - 2: "breathingRate", - 13: "personDetected", - 14: "humanActivity", - 15: "humanDistance", # 人体距离 (cm) - 16: "humanPosition", # 人体方位 (cm) - 17: "sleepState" # 睡眠状态 - } - - if protocol_id in field_mapping: - field_name = field_mapping[protocol_id] - - # 创建数据点 - 使用 "daily_data" 作为测量值名称 - data_point = { - "measurement": "daily_data", # 改为 daily_data 以区分日常数据 - "tags": { - "deviceId": assigned_device_id, - "dataType": "daily" # 标识这是日常数据 - }, - "time": datetime.utcnow().isoformat() + "Z", - "fields": {} - } - - # 对特定字段进行数值处理 - if protocol_id in [1, 2]: # 心率和呼吸频率需要除以10 - data_point["fields"][field_name] = float(data_value) / 10.0 - elif protocol_id in [13, 14]: # 人检/活动数据,确保是整数0或1 - # 强制转换为整数,确保只有0或1 - data_point["fields"][field_name] = int(data_value) - if data_point["fields"][field_name] not in [0, 1]: - print(f"⚠️ 警告: 人检/活动数据值异常: {data_value}, 强制转换为: {data_point['fields'][field_name]}") - elif protocol_id == 15: # 人体距离,范围0-65535 - data_point["fields"][field_name] = int(data_value) - elif protocol_id == 16: # 人体方位,可以是正负值 - data_point["fields"][field_name] = int(data_value) - elif protocol_id == 17: # 睡眠状态,使用预定义值 - data_point["fields"][field_name] = int(data_value) - else: - data_point["fields"][field_name] = data_value - - # 发送数据到InfluxDB - url = f"{INFLUXDB_URL}/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}" - headers = { - "Authorization": f"Token {INFLUXDB_TOKEN}", - "Content-Type": "text/plain; charset=utf-8" - } - - # 构造行协议格式的数据,明确指定数据类型 - if protocol_id in [13, 14, 15, 16, 17]: - # 对于人检/活动/距离/位置/睡眠状态数据,使用整数格式 - line_protocol = f"daily_data,deviceId={assigned_device_id},dataType=daily {field_name}={int(data_point['fields'][field_name])}i" - else: - # 对于其他数据,使用浮点数格式 - line_protocol = f"daily_data,deviceId={assigned_device_id},dataType=daily {field_name}={data_point['fields'][field_name]}" - - response = requests.post(url, headers=headers, data=line_protocol) - if response.status_code == 204: - print(f"✅ 日常数据已保存到InfluxDB设备{assigned_device_id}上: {field_name}={data_point['fields'][field_name]}") - else: - print(f"❌ 保存日常数据到InfluxDB失败: {response.status_code} - {response.text}") - else: - print(f"⚠️ 未知的协议ID: {protocol_id}") - - except Exception as e: - print(f"❌ 保存日常数据到InfluxDB时出错: {e}") - import traceback - traceback.print_exc() - -def save_sleep_data_to_influxdb(sleep_data): - """保存睡眠数据到InfluxDB""" - try: - # 构造睡眠数据的行协议格式 - 使用 "sleep_data" 作为测量值名称 - line_protocol = f"sleep_data,deviceId={assigned_device_id},dataType=sleep " - - # 按顺序添加各个字段 - fields = [] - fields.append(f"sleepQualityScore={int(sleep_data['sleepQualityScore'])}i") # 1B 睡眠质量评分 (0~100) - fields.append(f"totalSleepDuration={int(sleep_data['totalSleepDuration'])}i") # 2B 睡眠总时长 (0~65535 分钟) - fields.append(f"awakeDurationRatio={int(sleep_data['awakeDurationRatio'])}i") # 1B 清醒时长占比 (0~100) - fields.append(f"lightSleepRatio={int(sleep_data['lightSleepRatio'])}i") # 1B 浅睡时长占比 (0~100) - fields.append(f"deepSleepRatio={int(sleep_data['deepSleepRatio'])}i") # 1B 深睡时长占比 (0~100) - fields.append(f"outOfBedDuration={int(sleep_data['outOfBedDuration'])}i") # 1B 离床时长 (0~255) - fields.append(f"outOfBedCount={int(sleep_data['outOfBedCount'])}i") # 1B 离床次数 (0~255) - fields.append(f"turnCount={int(sleep_data['turnCount'])}i") # 1B 翻身次数 (0~255) - fields.append(f"avgBreathingRate={int(sleep_data['avgBreathingRate'])}i") # 1B 平均呼吸 (0~25) - fields.append(f"avgHeartRate={int(sleep_data['avgHeartRate'])}i") # 1B 平均心跳 (0~100) - fields.append(f"apneaCount={int(sleep_data['apneaCount'])}i") # 1B 呼吸暂停次数 (0~10) - - line_protocol += ",".join(fields) - - # 发送数据到InfluxDB - url = f"{INFLUXDB_URL}/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}" - headers = { - "Authorization": f"Token {INFLUXDB_TOKEN}", - "Content-Type": "text/plain; charset=utf-8" - } - - response = requests.post(url, headers=headers, data=line_protocol) - if response.status_code == 204: - print(f"✅ 睡眠数据已保存到InfluxDB设备{assigned_device_id}上") - else: - print(f"❌ 保存睡眠数据到InfluxDB失败: {response.status_code} - {response.text}") - except Exception as e: - print(f"❌ 保存睡眠数据到InfluxDB时出错: {e}") - import traceback - traceback.print_exc() - -def generate_random_sleep_data(): - """生成随机睡眠数据用于测试""" - sleep_data = { - "sleepQualityScore": random.randint(0, 100), # 1B 睡眠质量评分 (0~100) - "totalSleepDuration": random.randint(0, 65535), # 2B 睡眠总时长 (0~65535 分钟) - "awakeDurationRatio": random.randint(0, 100), # 1B 清醒时长占比 (0~100) - "lightSleepRatio": random.randint(0, 100), # 1B 浅睡时长占比 (0~100) - "deepSleepRatio": random.randint(0, 100), # 1B 深睡时长占比 (0~100) - "outOfBedDuration": random.randint(0, 255), # 1B 离床时长 (0~255) - "outOfBedCount": random.randint(0, 255), # 1B 离床次数 (0~255) - "turnCount": random.randint(0, 255), # 1B 翻身次数 (0~255) - "avgBreathingRate": random.randint(0, 25), # 1B 平均呼吸 (0~25) - "avgHeartRate": random.randint(0, 100), # 1B 平均心跳 (0~100) - "apneaCount": random.randint(0, 10), # 1B 呼吸暂停次数 (0~10) - } - - # 确保比例字段总和为100 - total_ratio = sleep_data["awakeDurationRatio"] + sleep_data["lightSleepRatio"] + sleep_data["deepSleepRatio"] - if total_ratio != 100: - # 调整浅睡时长占比以确保总和为100 - adjustment = 100 - total_ratio - sleep_data["lightSleepRatio"] = max(0, min(100, sleep_data["lightSleepRatio"] + adjustment)) - - # 再次检查总和 - total_ratio = sleep_data["awakeDurationRatio"] + sleep_data["lightSleepRatio"] + sleep_data["deepSleepRatio"] - if total_ratio != 100: - # 如果仍然不是100,则最后一次调整浅睡占比 - sleep_data["lightSleepRatio"] += (100 - total_ratio) - sleep_data["lightSleepRatio"] = max(0, min(100, sleep_data["lightSleepRatio"])) - - return sleep_data - -def main(): - global assigned_device_id - - try: - # 第一步:注册设备获取设备ID - if not register_device(): - print("❌ 设备注册失败,程序退出") - return - - print(f"🎯 开始使用设备ID {assigned_device_id} 发送数据...") - - # 启动时立即发送一次睡眠数据 - print("⏰ 启动时生成并发送睡眠数据...") - initial_sleep_data = generate_random_sleep_data() - save_sleep_data_to_influxdb(initial_sleep_data) - - # 记录上次发送睡眠数据的时间 - last_sleep_data_time = time.time() - - # 第二步:开始发送数据 - while True: - # 初始化数据值 - data_value = 0 - - # 发送其他数据 - protocol_id = random.choice([1, 2, 13, 14, 15, 16, 17]) # 1=心跳, 2=呼吸, 13=检测到人, 14=人体活动, 15=人体距离, 16=人体方位, 17=睡眠状态 - - if protocol_id == 1: # 心跳 - data_value = random.randint(600, 1000) - elif protocol_id == 2: # 呼吸 - data_value = random.randint(120, 200) - elif protocol_id == 13: # 检测到人(1检测到,0未检测到) - data_value = random.choice([1]) - elif protocol_id == 14: # 人体活动(1活动,0静止) - data_value = random.choice([0]) - elif protocol_id == 15: # 人体距离 (cm),范围0-65535 - data_value = random.randint(0, 65535) - elif protocol_id == 16: # 人体方位 (cm),可以有正负值 - data_value = random.randint(-32768, 32767) - elif protocol_id == 17: # 睡眠状态 (0x00=深睡, 0x01=浅睡, 0x02=清醒, 0x03=无) - data_value = random.choice([0, 1, 2, 3]) - - # 直接发送数据到InfluxDB - save_data_to_influxdb(protocol_id, data_value) - - # 每隔一段时间(例如30分钟)发送一次睡眠数据 - current_time = time.time() - if current_time - last_sleep_data_time >= 1800: # 30分钟 = 1800秒 - print("⏰ 生成并发送睡眠数据...") - sleep_data = generate_random_sleep_data() - save_sleep_data_to_influxdb(sleep_data) - last_sleep_data_time = current_time - - # 设置发送间隔 - time.sleep(0.4) # 每0.4秒发送一次数据 - - except KeyboardInterrupt: - print(f"\n🛑 设备 {assigned_device_id} 发送端已停止") - except Exception as e: - print(f"❌ 发生错误: {e}") - -if __name__ == "__main__": - main() \ No newline at end of file