树莓派4B+DHT11传感器:手把手教你用Python实现温湿度监控(附完整代码)

张开发
2026/4/15 10:03:33 15 分钟阅读

分享文章

树莓派4B+DHT11传感器:手把手教你用Python实现温湿度监控(附完整代码)
树莓派4BDHT11传感器Python温湿度监控全流程实战指南硬件准备与环境搭建树莓派4B与DHT11的硬件连接看似简单但细节决定成败。我们先从硬件选购开始——市面上常见的DHT11模块通常集成在PCB板上自带10kΩ上拉电阻这种模块比裸传感器更适合初学者。连接时需要注意电源选择虽然DHT11支持3.3V-5.5V供电但建议使用3.3V电源以避免电平转换问题GPIO配置推荐使用BCM编号方式GPIO4物理引脚7是最常用的选择接线检查使用40pin排线时务必确认VCC(1)→3.3V、DATA(7)→GPIO4、GND(9)→接地硬件连接完成后需要准备Python环境。建议使用Python3.7版本并创建虚拟环境python -m venv dht_env source dht_env/bin/activate安装必要的依赖库pip install RPi.GPIO matplotlib numpy提示如果使用树莓派官方系统GPIO库可能已预装。建议先执行sudo apt update sudo apt upgrade更新系统DHT11通信协议深度解析与Python实现DHT11采用单总线协议时序控制是读取成功的关键。让我们深入理解其通信机制起始信号主机拉低总线18-30ms后释放等待传感器响应响应信号传感器拉低83μs再拉高87μs表示准备就绪数据传输每位数据以54μs低电平开始高电平持续时间决定数据值23-27μs为068-74μs为1数据格式40位数据包含湿度整数(8bit)、湿度小数(8bit)、温度整数(8bit)、温度小数(8bit)和校验和(8bit)基于这些原理我们可以编写健壮的Python读取类import RPi.GPIO as GPIO import time class DHT11: def __init__(self, pin): self.pin pin GPIO.setmode(GPIO.BCM) def read(self): # 初始化通信 GPIO.setup(self.pin, GPIO.OUT) GPIO.output(self.pin, GPIO.HIGH) time.sleep(0.05) # 发送起始信号 GPIO.output(self.pin, GPIO.LOW) time.sleep(0.02) GPIO.output(self.pin, GPIO.HIGH) # 切换为输入模式 GPIO.setup(self.pin, GPIO.IN, pull_up_downGPIO.PUD_UP) # 等待传感器响应 self._wait_for_edge(GPIO.LOW, timeout1000) self._wait_for_edge(GPIO.HIGH, timeout1000) # 读取40位数据 data [] for _ in range(40): self._wait_for_edge(GPIO.LOW, timeout1000) pulse_length self._measure_pulse(GPIO.HIGH) data.append(1 if pulse_length 50 else 0) # 解析数据 humidity data[0:8] humidity_decimal data[8:16] temperature data[16:24] temperature_decimal data[24:32] checksum data[32:40] # 转换为十进制 humidity self._bits_to_int(humidity) temperature self._bits_to_int(temperature) calculated_checksum humidity temperature # 校验数据 if self._bits_to_int(checksum) ! calculated_checksum: raise ValueError(Checksum验证失败) return humidity, temperature def _wait_for_edge(self, edge, timeout): start time.time() while GPIO.input(self.pin) ! edge: if (time.time() - start) * 1000 timeout: raise TimeoutError(等待信号超时) def _measure_pulse(self, level): start time.time() while GPIO.input(self.pin) level: pass return (time.time() - start) * 1000000 # 返回微秒数 def _bits_to_int(self, bits): return int(.join(map(str, bits)), 2)数据采集与异常处理实战实际应用中传感器读取可能遇到各种异常情况。我们需要构建完善的错误处理机制常见问题及解决方案问题现象可能原因解决方案读取超时接线错误/传感器故障检查接线确认电源正常校验失败信号干扰/时序不准增加重试机制优化代码时序数据异常环境突变/传感器老化添加数据合理性检查改进后的读取函数应包含重试逻辑和数据验证def safe_read(dht, max_retries5): last_exception None for _ in range(max_retries): try: humidity, temperature dht.read() # 数据合理性检查 if 20 humidity 90 and -10 temperature 50: return humidity, temperature except Exception as e: last_exception e time.sleep(1) raise Exception(f读取失败: {last_exception}) from last_exception数据记录建议使用CSV格式便于后续分析import csv from datetime import datetime def log_data(humidity, temperature, filenamedht11_data.csv): file_exists os.path.isfile(filename) with open(filename, a, newline) as f: writer csv.writer(f) if not file_exists: writer.writerow([timestamp, humidity, temperature]) writer.writerow([ datetime.now().isoformat(), humidity, temperature ])数据可视化与实时监控采集到的数据需要直观展示matplotlib提供了强大的可视化能力。以下是创建实时监控仪表板的代码import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import pandas as pd def setup_plot(): fig, (ax1, ax2) plt.subplots(2, 1, figsize(10, 6)) fig.suptitle(温湿度实时监控) # 温度曲线 temp_line, ax1.plot([], [], r-, label温度(℃)) ax1.set_ylim(0, 40) ax1.set_ylabel(温度(℃)) ax1.legend(locupper left) # 湿度曲线 hum_line, ax2.plot([], [], b-, label湿度(%)) ax2.set_ylim(20, 90) ax2.set_ylabel(湿度(%)) ax2.legend(locupper left) return fig, temp_line, hum_line def update_plot(frame, temp_line, hum_line, dht): try: humidity, temperature safe_read(dht) # 更新数据 x_data pd.date_range(enddatetime.now(), periods50, freqS) y_temp [temperature] * 50 # 模拟历史数据 y_hum [humidity] * 50 temp_line.set_data(x_data, y_temp) hum_line.set_data(x_data, y_hum) return temp_line, hum_line except Exception as e: print(f更新图表时出错: {e}) # 启动实时监控 dht DHT11(4) fig, temp_line, hum_line setup_plot() ani FuncAnimation(fig, update_plot, fargs(temp_line, hum_line, dht), interval5000) plt.show()对于长期数据监测可以生成更专业的统计图表def generate_report(csv_file): df pd.read_csv(csv_file, parse_dates[timestamp]) df.set_index(timestamp, inplaceTrue) # 24小时趋势图 plt.figure(figsize(12, 6)) df.resample(H).mean().plot() plt.title(24小时温湿度趋势) plt.ylabel(数值) plt.grid(True) plt.savefig(daily_trend.png) # 温湿度分布图 plt.figure(figsize(8, 6)) plt.scatter(df[temperature], df[humidity], alpha0.5) plt.title(温湿度分布) plt.xlabel(温度(℃)) plt.ylabel(湿度(%)) plt.grid(True) plt.savefig(temp_hum_scatter.png)项目扩展与优化方向基础功能实现后可以考虑以下增强功能网络监控使用Flask创建Web界面添加REST API供远程查询from flask import Flask, jsonify app Flask(__name__) dht DHT11(4) app.route(/api/reading) def get_reading(): try: h, t safe_read(dht) return jsonify({ temperature: t, humidity: h, timestamp: datetime.now().isoformat() }) except Exception as e: return jsonify({error: str(e)}), 500报警功能当温湿度超出设定范围时触发通知支持邮件/短信报警def check_thresholds(humidity, temperature): if temperature 30: send_alert(高温警告: 当前温度{:.1f}℃.format(temperature)) if humidity 70: send_alert(高湿警告: 当前湿度{:.1f}%.format(humidity))数据持久化使用SQLite存储历史数据集成InfluxDBGrafana专业监控方案import sqlite3 def init_db(): conn sqlite3.connect(sensor_data.db) c conn.cursor() c.execute(CREATE TABLE IF NOT EXISTS readings (timestamp TEXT, humidity REAL, temperature REAL)) conn.commit() conn.close()性能优化使用多线程分离数据采集和显示逻辑实现环形缓冲区减少内存占用from threading import Thread from collections import deque class DataCollector(Thread): def __init__(self, dht, buffer_size100): super().__init__() self.dht dht self.buffer deque(maxlenbuffer_size) self.running True def run(self): while self.running: try: h, t safe_read(self.dht) self.buffer.append((datetime.now(), h, t)) time.sleep(5) except Exception as e: print(f采集线程出错: {e})实际部署时建议将采集程序设置为系统服务# /etc/systemd/system/dht11.service [Unit] DescriptionDHT11 Monitoring Service [Service] ExecStart/usr/bin/python3 /path/to/monitor.py WorkingDirectory/path/to/ Restartalways Userpi [Install] WantedBymulti-user.target启用服务sudo systemctl enable dht11.service sudo systemctl start dht11.service

更多文章