Python 物联网(IoT)与边缘计算开发实战
https://www.python.org/static/community_logos/python-logo-master-v3-TM.png

物联网基础与硬件交互
Raspberry Pi GPIO控制
python
import RPi.GPIO as GPIO
import time

# 设置GPIO模式
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)

# 定义引脚
LED_PIN = 17
BUTTON_PIN = 18

# 初始化引脚
GPIO.setup(LED_PIN, GPIO.OUT)
GPIO.setup(BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)

def blink_led(times, speed=0.5):
    """LED闪烁效果"""
    for _ in range(times):
        GPIO.output(LED_PIN, GPIO.HIGH)
        time.sleep(speed)
        GPIO.output(LED_PIN, GPIO.LOW)
        time.sleep(speed)

try:
    print("按下按钮控制LED (Ctrl+C退出)")
    while True:
        if GPIO.input(BUTTON_PIN) == GPIO.LOW:
            print("按钮按下 - LED闪烁")
            blink_led(3, 0.3)
            time.sleep(0.5)  # 防抖延迟
finally:
    GPIO.cleanup()  # 清理GPIO设置
https://www.raspberrypi.com/documentation/computers/images/GPIO-Pinout-Diagram-2.png

传感器数据采集
python
import Adafruit_DHT
import time

# 设置传感器类型和引脚
DHT_SENSOR = Adafruit_DHT.DHT22
DHT_PIN = 4

def read_sensor():
    """读取温湿度传感器数据"""
    humidity, temperature = Adafruit_DHT.read_retry(DHT_SENSOR, DHT_PIN)
    if humidity is not None and temperature is not None:
        return {
            'temperature': round(temperature, 1),
            'humidity': round(humidity, 1),
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
        }
    else:
        print("传感器读取失败!")
        return None

# 每5秒读取一次数据
while True:
    sensor_data = read_sensor()
    if sensor_data:
        print(f"温度: {sensor_data['temperature']}°C, 湿度: {sensor_data['humidity']}%")
    time.sleep(5)
物联网通信协议
MQTT协议实现
python
import paho.mqtt.client as mqtt
import json
import time

# MQTT配置
MQTT_BROKER = "broker.example.com"
MQTT_PORT = 1883
MQTT_TOPIC_PUB = "sensor/data"
MQTT_TOPIC_SUB = "sensor/control"
CLIENT_ID = "python-iot-client"

# 传感器模拟数据
def get_sensor_data():
    return {
        "device_id": CLIENT_ID,
        "temperature": 25.5 + (time.time() % 3),
        "humidity": 50 + (time.time() % 10),
        "timestamp": int(time.time())
    }

# 连接回调
def on_connect(client, userdata, flags, rc):
    print(f"连接MQTT服务器,返回码: {rc}")
    client.subscribe(MQTT_TOPIC_SUB)

# 消息接收回调
def on_message(client, userdata, msg):
    payload = msg.payload.decode()
    print(f"收到消息 [{msg.topic}]: {payload}")
    
    try:
        command = json.loads(payload)
        if command.get('action') == 'reboot':
            print("执行重启指令...")
            # 这里添加重启逻辑

    except json.JSONDecodeError:
        print("无效的JSON消息")

# 创建MQTT客户端
client = mqtt.Client(CLIENT_ID)
client.on_connect = on_connect
client.on_message = on_message

# 连接并启动循环
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()

try:
    while True:
        # 发布传感器数据
        sensor_data = get_sensor_data()
        client.publish(MQTT_TOPIC_PUB, json.dumps(sensor_data))
        print(f"发布数据: {sensor_data}")
        time.sleep(10)
except KeyboardInterrupt:
    print("断开连接...")
    client.loop_stop()
    client.disconnect()
https://mqtt.org/assets/img/mqtt-publish-subscribe.png

CoAP协议实现
python
from aiocoap import *
import asyncio
import time

async def coap_server():
    """CoAP服务器实现"""
    protocol = await Context.create_server_context(CoAPServer())

    # 持续运行
    print("CoAP服务器启动...")
    await asyncio.get_running_loop().create_future()

class CoAPServer(Resource):
    """CoAP资源处理"""
    
    def __init__(self):
        super().__init__()
        self.sensor_data = {
            "temperature": 25.0,
            "humidity": 50.0
        }
    
    async def render_get(self, request):
        """处理GET请求"""
        self.update_sensor_data()
        payload = json.dumps(self.sensor_data).encode('utf-8')
        return Message(payload=payload)
    
    async def render_post(self, request):
        """处理POST请求"""
        try:
            payload = json.loads(request.payload.decode('utf-8'))
            if 'set_temp' in payload:
                self.sensor_data['temperature'] = payload['set_temp']
            if 'set_humidity' in payload:
                self.sensor_data['humidity'] = payload['set_humidity']
            
            return Message(code=CHANGED, 
                         payload=b"Settings updated")
        except:
            return Message(code=BAD_REQUEST, 
                         payload=b"Invalid request")

    def update_sensor_data(self):
        """更新传感器数据(模拟)"""
        self.sensor_data = {
            "temperature": 25.0 + (time.time() % 3),
            "humidity": 50.0 + (time.time() % 10),
            "timestamp": int(time.time())
        }

async def coap_client():
    """CoAP客户端实现"""
    protocol = await Context.create_client_context()
    
    # 获取数据
    request = Message(code=GET, uri='coap://localhost/sensor')
    try:
        response = await protocol.request(request).response
        print(f"收到响应: {response.payload.decode()}")
    except Exception as e:
        print(f"请求失败: {e}")
    
    # 设置数据
    payload = {"set_temp": 26.5, "set_humidity": 55.0}
    request = Message(code=POST, 
                     payload=json.dumps(payload).encode(),
                     uri='coap://localhost/sensor')
    try:
        response = await protocol.request(request).response
        print(f"设置响应: {response.payload.decode()}")
    except Exception as e:
        print(f"设置失败: {e}")

# 运行示例
async def main():
    server_task = asyncio.create_task(coap_server())
    await asyncio.sleep(1)  # 等待服务器启动
    await coap_client()
    server_task.cancel()

asyncio.run(main())
边缘计算框架
使用MicroPython
python
# ESP32 MicroPython示例
import machine
import network
import urequests
import ujson
from time import sleep

# 配置WiFi
WIFI_SSID = "your_wifi"
WIFI_PASS = "your_password"

def connect_wifi():
    sta_if = network.WLAN(network.STA_IF)
    if not sta_if.isconnected():
        print("连接WiFi...")
        sta_if.active(True)
        sta_if.connect(WIFI_SSID, WIFI_PASS)
        while not sta_if.isconnected():
            pass
    print("网络配置:", sta_if.ifconfig())

# 读取传感器(模拟)
def read_sensor():
    return {
        "temp": 25 + machine.rng() % 5,
        "humidity": 50 + machine.rng() % 10
    }

# 边缘计算处理
def process_data(data):
    # 简单异常检测
    if data['temp'] > 30 or data['humidity'] > 80:
        data['alert'] = True
    else:
        data['alert'] = False
    return data

# 主循环
connect_wifi()
while True:
    sensor_data = read_sensor()
    processed_data = process_data(sensor_data)
    
    if processed_data['alert']:
        print("警报状态! 发送数据...")
        response = urequests.post(
            "api.example.com/alerts",
            json=processed_data,
            headers={'Content-Type': 'application/json'}
        )
        print("响应:", response.text)
        response.close()
    
    sleep(60)  # 每分钟检查一次
使用EdgeX Foundry
python
import requests
import json
import time

# EdgeX配置
EDGEX_URL = "localhost:48080/api/v1"
DEVICE_NAME = "temperature-sensor"

def register_device():
    """注册设备到EdgeX"""
    device = {
        "name": DEVICE_NAME,
        "description": "Python IoT温度传感器",
        "adminState": "UNLOCKED",
        "operatingState": "ENABLED",
        "protocols": {
            "other": {
                "Address": "virtual01",
                "Protocol": "300"
            }
        }
    }
    
    response = requests.post(
        f"{EDGEX_URL}/device",
        json=device,
        headers={"Content-Type": "application/json"}
    )
    return response.json()

def send_reading(value):
    """发送传感器读数"""
    reading = {
        "device": DEVICE_NAME,
        "readings": [
            {
                "name": "Temperature",
                "value": str(value)
            }
        ]
    }
    
    response = requests.post(
        f"{EDGEX_URL}/reading",
        json=reading,
        headers={"Content-Type": "application/json"}
    )
    return response.status_code == 200

# 模拟设备运行
print("注册设备...")
register_device()

print("开始发送传感器数据...")
while True:
    temp = 20 + (time.time() % 10)  # 模拟温度波动
    if send_reading(temp):
        print(f"发送温度数据: {temp}°C")
    else:
        print("发送数据失败")
    time.sleep(5)
https://docs.edgexfoundry.org/1.3/_images/EdgeX_arch.png

物联网数据处理
实时数据流处理
python
import pyarrow.flight as flight
import pandas as pd
import numpy as np

class FlightServer(flight.FlightServerBase):
    """Arrow Flight服务器实现"""
    
    def __init__(self, location):
        super().__init__(location)
        self.data = pd.DataFrame({
            'timestamp': pd.date_range('2023-01-01', periods=100, freq='s'),
            'value': np.random.randn(100)
        })
    
    def do_get(self, context, ticket):
        """处理数据获取请求"""
        df = self.data[self.data['timestamp'] > pd.Timestamp.now() - pd.Timedelta('1min')]
        table = pa.Table.from_pandas(df)
        return flight.RecordBatchStream(table)

class DataProcessor:
    """实时数据处理"""
    
    def __init__(self, server_url):
        self.client = flight.FlightClient(server_url)
    
    def process_stream(self):
        """处理实时数据流"""
        while True:
            try:
                # 获取最近1分钟数据
                descriptor = flight.FlightDescriptor.for_command(b"latest_data")
                flight_info = self.client.get_flight_info(descriptor)
                
                for endpoint in flight_info.endpoints:
                    for location in endpoint.locations:
                        reader = self.client.do_get(location.ticket)
                        batch = reader.read_all()
                        df = batch.to_pandas()
                        
                        # 实时分析
                        if not df.empty:
                            avg = df['value'].mean()
                            max_val = df['value'].max()
                            print(f"平均值: {avg:.2f}, 最大值: {max_val:.2f}")
            
            except Exception as e:
                print(f"处理错误: {e}")
            
            time.sleep(10)

# 启动服务器
server = FlightServer("grpc://0.0.0.0:8815")
server_thread = threading.Thread(target=server.serve)
server_thread.start()

# 启动客户端处理
processor = DataProcessor("grpc://localhost:8815")
processor.process_stream()
时序数据库集成
python
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS

class InfluxDBManager:
    """InfluxDB时序数据库管理"""
    
    def __init__(self, url, token, org, bucket):
        self.client = influxdb_client.InfluxDBClient(
            url=url,
            token=token,
            org=org
        )
        self.bucket = bucket
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
    
    def write_data(self, measurement, tags, fields):
        """写入数据点"""
        point = influxdb_client.Point(measurement)
        
        # 添加标签
        for tag_key, tag_value in tags.items():
            point.tag(tag_key, tag_value)
        
        # 添加字段
        for field_key, field_value in fields.items():
            point.field(field_key, field_value)
        
        # 写入数据库
        self.write_api.write(bucket=self.bucket, record=point)
    
    def query_data(self, query):
        """查询数据"""
        result = self.query_api.query(query)
        records = []
        
        for table in result:
            for record in table.records:
                records.append({
                    'time': record.get_time(),
                    'measurement': record.get_measurement(),
                    **record.values
                })
        
        return records

# 使用示例
influx_mgr = InfluxDBManager(
    url="localhost:8086",
    token="your-token",
    org="your-org",
    bucket="iot-data"
)

# 写入传感器数据
influx_mgr.write_data(
    measurement="temperature",
    tags={"location": "room1", "device": "sensor1"},
    fields={"value": 25.3}
)

# 查询最近1小时数据
query = """
from(bucket: "iot-data")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
"""
data = influx_mgr.query_data(query)
print("查询结果:", data)
物联网安全
设备认证与加密
python
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.backends import default_backend
import os

class IoTDeviceSecurity:
    """物联网设备安全类"""
    
    def __init__(self):
        # 生成ECDSA密钥对
        self.private_key = ec.generate_private_key(
            ec.SECP256R1(), default_backend()
        )
        self.public_key = self.private_key.public_key()
        
        # 预共享密钥(实际应用中应从安全存储获取)
        self.shared_secret = os.urandom(32)
    
    def get_public_key_pem(self):
        """获取PEM格式的公钥"""
        return self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
    
    def sign_data(self, data):
        """签名数据"""
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        signature = self.private_key.sign(
            data,
            ec.ECDSA(hashes.SHA256())
        )
        return signature
    
    def verify_signature(self, public_key_pem, data, signature):
        """验证签名"""
        public_key = serialization.load_pem_public_key(
            public_key_pem,
            backend=default_backend()
        )
        
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        try:
            public_key.verify(
                signature,
                data,
                ec.ECDSA(hashes.SHA256())
            )
            return True
        except:
            return False
    
    def derive_session_key(self, peer_public_key_pem):
        """派生会话密钥"""
        peer_public_key = serialization.load_pem_public_key(
            peer_public_key_pem,
            backend=default_backend()
        )
        
        shared_key = self.private_key.exchange(
            ec.ECDH(), peer_public_key
        )
        
        # 使用HKDF派生密钥
        derived_key = HKDF(
            algorithm=hashes.SHA256(),
            length=32,
            salt=None,
            info=b'session-key',
            backend=default_backend()
        ).derive(shared_key)
        
        return derived_key

# 使用示例
device1 = IoTDeviceSecurity()
device2 = IoTDeviceSecurity()

# 交换公钥
device1_pubkey = device1.get_public_key_pem()
device2_pubkey = device2.get_public_key_pem()

# 派生会话密钥
session_key1 = device1.derive_session_key(device2_pubkey)
session_key2 = device2.derive_session_key(device1_pubkey)

print("会话密钥匹配:", session_key1 == session_key2)
安全固件更新
python
import hashlib
import hmac
import requests
import tempfile
import subprocess

class SecureFirmwareUpdater:
    """安全固件更新"""
    
    def __init__(self, device_id, secret_key, update_server):
        self.device_id = device_id
        self.secret_key = secret_key.encode('utf-8')
        self.update_server = update_server
    
    def check_update(self):
        """检查更新"""
        # 创建认证签名
        nonce = os.urandom(16).hex()
        message = f"{self.device_id}:{nonce}".encode('utf-8')
        signature = hmac.new(
            self.secret_key, 
            message, 
            hashlib.sha256
        ).hexdigest()
        
        # 发送认证请求
        response = requests.get(
            f"{self.update_server}/check-update",
            headers={
                "Device-ID": self.device_id,
                "Nonce": nonce,
                "Signature": signature
            }
        )
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"检查更新失败: {response.text}")
            return None
    
    def download_firmware(self, version, checksum):
        """下载固件"""
        # 创建临时文件
        temp_file = tempfile.NamedTemporaryFile(delete=False)
        
        try:
            # 流式下载固件
            response = requests.get(
                f"{self.update_server}/firmware/{version}",
                stream=True
            )
            
            # 计算下载文件的哈希
            sha256 = hashlib.sha256()
            
            for chunk in response.iter_content(chunk_size=8192):
                temp_file.write(chunk)
                sha256.update(chunk)
            
            temp_file.close()
            
            # 验证校验和
            if sha256.hexdigest() != checksum:
                os.unlink(temp_file.name)
                raise ValueError("固件校验和不匹配")
            
            return temp_file.name
        except:
            os.unlink(temp_file.name)
            raise
    
    def apply_update(self, firmware_path):
        """应用更新"""
        # 验证固件签名(示例)
        if not self.verify_firmware(firmware_path):
            raise ValueError("固件签名验证失败")
        
        # 执行更新脚本(实际实现依平台而定)
        result = subprocess.run(
            ["/bin/sh", firmware_path],
            capture_output=True,
            text=True
        )
        
        if result.returncode != 0:
            print(f"更新失败: {result.stderr}")
            return False
        
        print("固件更新成功!")
        return True
    
    def verify_firmware(self, firmware_path):
        """验证固件签名"""
        # 这里应实现实际的签名验证逻辑
        # 示例中仅检查文件大小
        return os.path.getsize(firmware_path) > 0

# 使用示例
updater = SecureFirmwareUpdater(
    device_id="device-123",
    secret_key="your-secret-key",
    update_server="firmware.example.com"
)

update_info = updater.check_update()
if update_info and update_info['available']:
    print(f"发现新版本: {update_info['version']}")
    try:
        firmware_path = updater.download_firmware(
            update_info['version'],
            update_info['checksum']
        )
        if updater.apply_update(firmware_path):
            print("设备需要重启以完成更新")
    except Exception as e:
        print(f"更新失败: {str(e)}")
else:
    print("设备固件已是最新")
物联网可视化
实时仪表盘
python
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
import pandas as pd
import random
from datetime import datetime

# 创建Dash应用
app = dash.Dash(__name__)

# 初始数据
initial_data = pd.DataFrame({
    'timestamp': [datetime.now()],
    'temperature': [25.0],
    'humidity': [50.0]
})

# 应用布局
app.layout = html.Div([
    html.H1("物联网设备监控仪表盘"),
    
    dcc.Interval(
        id='interval-component',
        interval=5*1000,  # 5秒
        n_intervals=0
    ),
    
    html.Div([
        html.Div([
            dcc.Graph(id='temp-gauge'),
            html.H3("当前温度", style={'text-align': 'center'})
        ], className="six columns"),
        
        html.Div([
            dcc.Graph(id='humidity-gauge'),
            html.H3("当前湿度", style={'text-align': 'center'})
        ], className="six columns")
    ], className="row"),
    
    dcc.Graph(id='temp-trend'),
    
    html.Div(id='alerts-container')
], className="container")

# 回调函数 - 更新数据
@app.callback(
    Output('temp-gauge', 'figure'),
    Output('humidity-gauge', 'figure'),
    Output('temp-trend', 'figure'),
    Output('alerts-container', 'children'),
    Input('interval-component', 'n_intervals')
)
def update_metrics(n):
    # 模拟新数据
    new_data = {
        'timestamp': datetime.now(),
        'temperature': 25 + random.uniform(-2, 2),
        'humidity': 50 + random.uniform(-5, 5)
    }
    
    # 更新数据集
    global initial_data
    initial_data = initial_data.append(new_data, ignore_index=True)
    
    # 温度仪表
    temp_gauge = go.Figure(go.Indicator(
        mode="gauge+number",
        value=new_data['temperature'],
        domain={'x': [0, 1], 'y': [0, 1]},
        gauge={
            'axis': {'range': [None, 40]},
            'steps': [
                {'range': [0, 20], 'color': "lightgray"},
                {'range': [20, 30], 'color': "gray"},
                {'range': [30, 40], 'color': "red"}
            ],
            'threshold': {
                'line': {'color': "black", 'width': 4},
                'thickness': 0.75,
                'value': 30
            }
        }
    ))
    temp_gauge.update_layout(height=300, margin=dict(t=0, b=0))
    
    # 湿度仪表
    humidity_gauge = go.Figure(go.Indicator(
        mode="gauge+number",
        value=new_data['humidity'],
        domain={'x': [0, 1], 'y': [0, 1]},
        gauge={
            'axis': {'range': [0, 100]},
            'steps': [
                {'range': [0, 30], 'color': "red"},
                {'range': [30, 70], 'color': "lightgray"},
                {'range': [70, 100], 'color': "blue"}
            ],
            'threshold': {
                'line': {'color': "black", 'width': 4},
                'thickness': 0.75,
                'value': 70
            }
        }
    ))
    humidity_gauge.update_layout(height=300, margin=dict(t=0, b=0))
    
    # 温度趋势图
    temp_trend = go.Figure()
    temp_trend.add_trace(go.Scatter(
        x=initial_data['timestamp'],
        y=initial_data['temperature'],
        name='温度',
        line=dict(color='red', width=2)
    ))
    temp_trend.add_trace(go.Scatter(
        x=initial_data['timestamp'],
        y=initial_data['humidity'],
        name='湿度',
        yaxis='y2',
        line=dict(color='blue', width=2)
    ))
    temp_trend.update_layout(
        yaxis=dict(title='温度 (°C)'),
        yaxis2=dict(
            title='湿度 (%)',
            overlaying='y',
            side='right'
        ),
        hovermode="x unified"
    )
    
    # 警报信息
    alerts = []
    if new_data['temperature'] > 28:
        alerts.append(html.Div(
            f"高温警报! 当前温度: {new_data['temperature']:.1f}°C",
            style={
                'color': 'white',
                'background': 'red',
                'padding': '10px',
                'margin': '10px 0',
                'border-radius': '5px'
            }
        ))
    if new_data['humidity'] > 70:
        alerts.append(html.Div(
            f"高湿度警报! 当前湿度: {new_data['humidity']:.1f}%",
            style={
                'color': 'white',
                'background': 'blue',
                'padding': '10px',
                'margin': '10px 0',
                'border-radius': '5px'
            }
        ))
    
    return temp_gauge, humidity_gauge, temp_trend, alerts

# 运行应用
if __name__ == '__main__':
    app.run_server(debug=True, host='0.0.0.0')
https://plotly.com/python/static/images/dash-dashboards/iot-dashboard.png

结语与学习路径
https://www.python.org/static/community_logos/python-logo-master-v3-TM.png

通过这十一篇系列教程,你已经掌握了:

物联网硬件交互与传感器集成

物联网通信协议(MQTT/CoAP)

边缘计算框架与应用

物联网数据处理与分析

物联网安全实践

实时可视化仪表盘开发

进阶学习方向:

专业领域深入:

工业物联网(IIoT)平台开发

智能家居系统集成

智慧城市解决方案

技术栈扩展:

5G与物联网融合应用

AIoT(人工智能物联网)开发

数字孪生技术实现

认证体系:

AWS IoT认证

Cisco IoT认证

工业物联网专业认证

开源贡献:

参与主流IoT框架开发

贡献边缘计算项目

开发物联网安全工具

Python在物联网领域的应用前景广阔,持续探索和实践将助你成为这一变革性技术的引领者!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/912908.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/912908.shtml
英文地址,请注明出处:http://en.pswp.cn/news/912908.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高通SG882G平台(移远):1、编译脚本

文档提供的编译,有点问题。所以我重新整理了脚本。 build-lib.sh #!/bin/bashfunction prepare_build() {if [ ! -d download ]; thenmkdir downloadfilocal MODIFIED_DIRfile-replacelocal FILE_NAMEset_bb_env.shcp ${MODIFIED_DIR}/${FILE_NAME} \poky/qti-con…

Mac电脑 触摸板增强工具 BetterTouchTool

BetterTouchTool mac版,是一款触摸板增强工具,允许用户使用各种手势来控制其计算机。 Bettertouchtool mac是一个小而高效的macOS应用程序,旨在帮助您为手势定义快捷方式。 此外,Bettertouchtool可用于使用常规鼠标和键盘快捷键…

LSTM(Long Short-Term Memory)模型的深度解析

在6.28号我发了一个博客《RNN(循环神经网络)与LSTM(长短期记忆网络)输出的详细对比分析》,但是我并未详细讲解LSTM,LSTM是循环神经网络中的一个模型,然而通过这篇博客给大家深度解析一下LSTM&am…

WebRTC 安全性分析研究

一、概述 本文着重分析 WebRTC 的安全性,分析其安全性考虑及安全性实现,回答了以下问题: WebRTC 加密过程需要或依赖 CA (Certificate Authority)吗? 不需要 CA, 但可能依赖 CA.DTLS-SRTP 加密机制中, DTLS 与 SRTP 的关系是什么? DTLS 实现秘钥交换…

阿里云操作系统控制台如何解决三大OS运维难题?

背景 操作系统运维常常遇到以下问题: 1.问题定界浪费大量人力:当业务出现问题时,客户在不清楚是操作系统问题还是业务问题时,往往会拉上所有相关团队一起排查,浪费人力。 2.问题定位时间长:通过操作系统…

自由学习记录(65)

其他脚本语言也可以热更新,但 Lua 特别适合,游戏主程序通常是 C,Lua 只是逻辑脚本,改 Lua 不影响主程序运行 语言应用场景PythonWeb 后端 / 数据处理服务JavaScript浏览器端热重载 / React HMRC#Unity 的 ILRuntime / HybridCLR …

抗辐照芯片在核电厂火灾探测器中的应用优势与性能解析

一、引言 核电厂作为能源供应的关键设施,其安全性备受关注。火灾是威胁核电厂安全运行的重要风险因素之一。在核电厂的特殊环境下,火灾探测器肩负着及时发现火情、保障核电厂安全运行的重任。然而,核电厂存在高能辐射等复杂环境因素&#xf…

FastAPI+Sqlite+HTML的登录注册与文件上传系统:完整实现指南

作为一名开发者,你是否曾想过亲手搭建一个包含用户注册、登录认证和文件上传功能的完整 Web 系统?今天,我将带大家一步步拆解一个基于FastAPI(后端)和原生 JavaScript(前端)的前后端分离项目&am…

【动态规划】P11188 「KDOI-10」商店砍价|普及+

本文涉及知识点 C动态规划 P11188 「KDOI-10」商店砍价 题目背景 English Statement. You must submit your code at the Chinese version of the statement. 您可以点击 这里 下载本场比赛的选手文件。 You can click here to download all tasks and examples of the c…

国产LHR3040芯片是REF5040的代替品

LHR3040是一款噪声低、漂移低、精度高的电压基准产品系列。这些基准同时支持灌电流和拉电流,并且具有出色的线性和负载调节性能。采用专有的设计技术实现了出色的温漂(3ppm/℃)和高精度(0.05%)。这些特性与极低噪声相结合,使LHR30XX系列成为高精度数据采…

专题:2025AI营销市场发展研究报告|附400+份报告PDF汇总下载

原文链接:https://tecdat.cn/?p42800 在数字化浪潮席卷全球的当下,AI营销正成为驱动企业增长的核心动力。 从市场规模来看,AI营销正经历着爆发式增长,生成式AI的出现更是为其注入了强大活力。在应用层面,AI已渗透到营…

深入对比 Python 中的 `__repr__` 与 `__str__`:选择正确的对象表示方法

文章目录 核心概念对比1. 根本目的差异2. 调用场景对比深入解析:何时使用哪种方法场景 1:开发者调试 vs 用户展示场景 2:技术表示 vs 简化视图高级对比:特殊场景处理1. 容器中的对象表示2. 日志记录的最佳实践3. 异常信息展示最佳实践指南1. 何时实现哪个方法?2. 实现原则…

万能公式基分析重构补丁复分析和欧拉公式原理推导

基分析, x11 x2-1 x3i 存在加法法则 x1x20 所以x1-x2 存在链式基乘法法则 x1x1*x1x2*x2 x2x3*x3 x3x1*x3 -x1x2x3 将链式基乘法操作 二次,三次,直至n次化简得 一次 x1 -x1 x3 矩阵 x1 x1 x2 x2 x3 …

OpenCV 4.10.0 移植

OpenCV 4.10.0 移植使用 概述移植编译下载解压编译环境编译 编译完成OpenCV 库文件及其作用 使用实例参考代码 参考 概述 OpenCV(Open Source Computer Vision Library)是计算机视觉领域最广泛使用的开源库之一,提供了丰富的功能模块&#xf…

Tomcat10.0以上版本编译成功但报错HTTP状态 404

Tomcat正常启动且项目已成功部署,但出现404错误。 HTTP状态 404 - 未找到package org.example;import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpSer…

在Flask项目中用Git LFS管理大文件(PDF)的完整实践

在Flask项目中用Git LFS高效管理大文件(以农机说明书PDF为例) 背景与需求 在农机管理系统等实际项目中,经常需要上传和管理大量超大文件(如200MB以上的PDF说明书、图片等)。如果直接用Git管理这些大文件,不仅会导致仓库膨胀、clone/pull速度变慢,还可能遇到推送失败等…

朴素贝叶斯算法案例演示及Python实现

目录 一、基本原理二、案例演示2.1 未平滑处理2.2 Laplace平滑处理 三、Python实现 一、基本原理 朴素贝叶斯思想:依靠特征概率去预测分类,针对于代分类的样本,会求解在该样本出现的条件下,各个类别出现的概率,哪个类…

RAG从入门到高阶(二):Retrieve-and-Rerank

在上一篇教程中,我们了解了 Naive RAG 的基本原理和实现。它就像一个刚刚学会查找资料的新手,虽然能找到一些信息,但有时候找到的并不够精准,甚至会有一些无关的干扰。 今天,我们将介绍 Retrieve-and-Rerank RAG&…

【脚本】Linux磁盘目录挂载脚本(不分区)

以下是一个不带分区,直接挂载整个磁盘到指定目录的脚本。该脚本会检查磁盘是否已挂载,自动创建文件系统(可选),并配置开机自动挂载: #!/bin/bash# 磁盘直接挂载脚本(不分区) # 使用…