This commit is contained in:
Konano 2025-10-27 01:22:18 +08:00
parent f33b04175b
commit 126180d402
Signed by: Nano
GPG Key ID: 93686B78EE43A65A
5 changed files with 125 additions and 2 deletions

View File

@ -1,2 +0,0 @@
# sp548e_light

View File

@ -0,0 +1,9 @@
{
"domain": "sp548e_light",
"name": "SP548E Light",
"documentation": "https://www.home-assistant.io/integrations/switch/",
"requirements": [],
"codeowners": [],
"iot_class": "local_polling",
"version": "1.0.1"
}

View File

@ -0,0 +1,110 @@
import socket
import logging
from homeassistant.components.switch import SwitchEntity
_LOGGER = logging.getLogger(__name__)
HOST = "10.13.0.43"
PORT = 8587
HANDSHAKE_1 = "5350544543480001000000000a7762de19080201014645" # 握手包1
HANDSHAKE_2 = "53505445434800027a00000000" # 获取状态包
CMD_ON = "5350544543480050c700000001a5"
CMD_OFF = "5350544543480050c900000001aa"
def send_and_recv(sock: socket.socket, hex_str: str) -> bytes:
"""发送指令并返回响应(仅用于握手/查询)"""
_LOGGER.debug("发送(带接收)指令: %s", hex_str)
sock.send(bytes.fromhex(hex_str))
data = sock.recv(1024)
_LOGGER.debug("收到 %d 字节: %s", len(data), data.hex())
return data
def send_only(sock: socket.socket, hex_str: str) -> None:
"""只发送指令,不等待响应(用于控制命令)"""
_LOGGER.debug("发送(不接收)指令: %s", hex_str)
sock.send(bytes.fromhex(hex_str))
def handshake(sock: socket.socket) -> bytes:
"""执行握手并返回状态数据"""
_LOGGER.debug("执行握手步骤 1")
send_and_recv(sock, HANDSHAKE_1)
_LOGGER.debug("执行握手步骤 2 - 获取状态")
status = send_and_recv(sock, HANDSHAKE_2)
_LOGGER.debug("握手完成, 状态包共 %d 字节", len(status))
return status
def get_status() -> bool:
"""获取灯当前状态 True=开 False=关"""
try:
with socket.create_connection((HOST, PORT), timeout=5) as sock:
status = handshake(sock)
if status and len(status) > 40:
state_byte = status[40]
_LOGGER.debug("状态字节: %d", state_byte)
return state_byte == 1
else:
_LOGGER.warning("状态包长度不足: %s", status.hex())
except Exception as e:
_LOGGER.error("获取状态失败: %s", e)
return False
def light_on():
try:
with socket.create_connection((HOST, PORT), timeout=5) as sock:
_LOGGER.debug("开灯流程开始")
handshake(sock) # 必须先握手
send_only(sock, CMD_ON) # 控制指令无需接收
_LOGGER.debug("开灯命令发送完成")
except Exception as e:
_LOGGER.error("开灯失败: %s", e)
def light_off():
try:
with socket.create_connection((HOST, PORT), timeout=5) as sock:
_LOGGER.debug("关灯流程开始")
handshake(sock)
send_only(sock, CMD_OFF)
_LOGGER.debug("关灯命令发送完成")
except Exception as e:
_LOGGER.error("关灯失败: %s", e)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
async_add_entities([Sp548eLightSwitch()], True)
class Sp548eLightSwitch(SwitchEntity):
def __init__(self):
self._is_on = False
@property
def name(self):
return "SP548E Light"
@property
def is_on(self):
return self._is_on
def turn_on(self, **kwargs):
_LOGGER.debug("调用 turn_on 方法")
light_on()
self._is_on = True
self.schedule_update_ha_state()
def turn_off(self, **kwargs):
_LOGGER.debug("调用 turn_off 方法")
light_off()
self._is_on = False
self.schedule_update_ha_state()
def update(self):
_LOGGER.debug("调用 update 方法,开始读取设备状态")
self._is_on = get_status()
_LOGGER.debug("update 结束, 当前状态: %s", self._is_on)

6
hacs.json Normal file
View File

@ -0,0 +1,6 @@
{
"name": "SP458E Light",
"content_in_root": false,
"render_readme": false,
"hide_default_branch": true
}