Connect to HA

This commit is contained in:
Austen Adler 2024-04-21 16:52:12 -04:00
parent f65d45260e
commit dd978179df
6 changed files with 196 additions and 140 deletions

View File

@ -14,7 +14,7 @@ TARGET = arm-unknown-linux-musleabihf
# TARGET = arm-unknown-linux-musleabi
# TARGET = armv7-unknown-linux-musleabi
HOST = 192.168.85.104
HOST = raspberrypi
# HOST = 192.168.1.82
# HOST = raspberrypi

View File

@ -1,6 +1,9 @@
use std::fmt::Debug;
use crate::pattern::Pattern;
use crate::{
color::{self, Rgb},
pattern::Pattern,
};
#[derive(Debug)]
pub enum Message {
ClearLights,
@ -10,6 +13,27 @@ pub enum Message {
Quit,
}
/// The state of the strip
#[derive(Clone, Debug)]
pub struct State {
pub on: bool,
pub pattern: Option<String>,
// brightnes: u8,
pub color: Rgb,
// Off,
// Pattern
}
impl Default for State {
fn default() -> Self {
Self {
on: false,
pattern: None,
color: color::BLACK,
}
}
}
// impl Debug for Message {
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// match self {

View File

@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
pub type Icon = ();
pub mod homeassistant {
pub const HOMEASSISTANT_TOPIC: &str = "homeassistant/status";
pub const STATUS_ONLINE: &[u8] = b"online";
pub const STATUS_OFFLINE: &[u8] = b"offline";
@ -1614,6 +1615,9 @@ pub mod lawn_mower {
pub mod light {
use super::*;
pub const STATUS_DEFAULT_LIGHT_ON: &str = "ON";
pub const STATUS_DEFAULT_LIGHT_OFF: &str = "OFF";
#[derive(Debug, PartialEq, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct DefaultDiscovery {
@ -2089,12 +2093,19 @@ pub mod light {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))]
pub struct JsonIncoming {
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub brightness: Option<usize>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub color_mode: Option<ColorMode>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub color_temp: Option<usize>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub color: Option<IncomingColor>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub effect: Option<String>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub state: Option<String>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub transition: Option<usize>,
}
@ -2102,14 +2113,23 @@ pub mod light {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))]
pub struct IncomingColor {
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub r: Option<usize>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub g: Option<usize>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub b: Option<usize>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub c: Option<usize>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub w: Option<usize>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub x: Option<f32>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub y: Option<f32>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub h: Option<f32>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub s: Option<f32>,
}
}

View File

@ -1,7 +1,7 @@
#![feature(let_chains)]
use anyhow::{Context as _, Result};
use common::{
color::Rgb,
color::{self, Gradient, Rgb},
error::ProgramError,
pattern::{MovingRainbow, Solid, SolidParams},
MqttConfig,
@ -9,15 +9,16 @@ use common::{
use homeassistant_mqtt_discovery::{
integrations::{
homeassistant,
light::{self, JsonIncoming},
light::{self, IncomingColor, JsonIncoming},
},
Common,
};
use rumqttc::{
Client, Connection, ConnectionError, Event, Incoming, MqttOptions, Packet, PingReq, Publish,
QoS,
use rumqttc::{Client, Connection, Event, Incoming, MqttOptions, Packet, Publish, QoS};
use std::{
sync::mpsc::{channel, Receiver},
thread,
time::Duration,
};
use std::time::Duration;
use tracing::{error, info, warn};
use common::{error::ProgramResult, strip};
@ -25,9 +26,6 @@ use std::sync::mpsc::Sender;
const MQTT_PREFIX: &str = "aw_lights";
const FRIENDLY_NAME: &str = "AW Lights Light";
const HOMEASSISTANT_TOPIC: &str = "homeassistant/status";
const LIGHT_ON: &str = "ON";
const LIGHT_OFF: &str = "OFF";
pub struct MqttBuilder {
topics: Topics,
@ -41,7 +39,11 @@ impl MqttBuilder {
Self { config, topics }
}
pub fn start(self, strip_tx: Sender<strip::Message>) -> ProgramResult<()> {
pub fn start(
self,
strip_tx: Sender<strip::Message>,
state_rx: Receiver<common::strip::State>,
) -> ProgramResult<()> {
let (client, connection) = self.create_conection();
let mut mqtt = Mqtt {
@ -50,7 +52,7 @@ impl MqttBuilder {
client,
};
mqtt.start(connection, strip_tx)
mqtt.start(connection, strip_tx, state_rx)
}
fn create_conection(&self) -> (Client, Connection) {
@ -96,85 +98,104 @@ impl Mqtt {
&mut self,
mut connection: Connection,
strip_tx: Sender<strip::Message>,
state_rx: Receiver<common::strip::State>,
) -> ProgramResult<()> {
info!("Starting mqtt");
self.init().map_err(|e| ProgramError::Boxed(Box::new(e)))?;
// Iterate to poll the eventloop for connection progress
for (i, message) in connection.iter().enumerate() {
// info!("Notification #{i} = {:?}", message);
let (internal_tx, internal_rx) = channel::<InternalMessage>();
let internal_tx2 = internal_tx.clone();
match message {
Ok(Event::Incoming(Incoming::Publish(p))) => {
if let Err(e) = self.handle_incoming_message(p) {
info!("Got error: {e:?}");
thread::spawn(move || {
// Iterate to poll the eventloop for connection progress
for (_i, message) in connection.iter().enumerate() {
// info!("Notification #{i} = {:?}", message);
match message {
Ok(Event::Incoming(Incoming::Publish(p))) => {
let _ = internal_tx2.send(InternalMessage::InboundMqttPacket(p));
// info!(
// "Got publish notification. Trying to deserialize: {:#?}",
// serde_json::from_slice::<light::JsonIncoming>(&p.payload)
// )
}
Ok(Event::Outgoing(_))
| Ok(Event::Incoming(Packet::PingResp))
| Ok(Event::Incoming(Incoming::SubAck(_)))
| Ok(Event::Incoming(Incoming::ConnAck(_)))
| Ok(Event::Incoming(Incoming::PubAck(_)))
| Ok(Event::Incoming(Incoming::PubRec(_)))
| Ok(Event::Incoming(Packet::PingReq)) => {}
Ok(m) => info!("Got unhandled message: {m:?}"),
Err(e) => {
error!("Connection error to mqtt: {e:?}")
}
// info!(
// "Got publish notification. Trying to deserialize: {:#?}",
// serde_json::from_slice::<light::JsonIncoming>(&p.payload)
// )
}
Ok(Event::Outgoing(_))
| Ok(Event::Incoming(Packet::PingResp))
| Ok(Event::Incoming(Incoming::SubAck(_)))
| Ok(Event::Incoming(Incoming::ConnAck(_)))
| Ok(Event::Incoming(Incoming::PubAck(_)))
| Ok(Event::Incoming(Incoming::PubRec(_)))
| Ok(Event::Incoming(Packet::PingReq)) => {}
Ok(m) => info!("Got unhandled message: {m:?}"),
Err(e) => {
error!("Connection error to mqtt: {e:?}")
}
}
let _ = internal_tx2.send(InternalMessage::MqttDied);
});
thread::spawn(move || {
while let Ok(p) = state_rx.recv() {
let _ = internal_tx.send(InternalMessage::OutboundStatePacket(p));
}
});
while let Ok(msg) = internal_rx.recv() {
match msg {
InternalMessage::InboundMqttPacket(p) => {
if let Err(e) = self.handle_incoming_message(&strip_tx, p) {
info!("Got error: {e:?}");
}
}
InternalMessage::OutboundStatePacket(p) => {
let state_msg = self.gen_state_message(&p);
info!("Sending state message: {:?}", state_msg);
// Send initial autodiscovery
if let Err(e) = self
.client
.publish(
&self.topics.state_topic,
QoS::AtLeastOnce,
false,
serde_json::to_vec(&state_msg).unwrap(),
)
.context("Sending initial autodiscovery")
{
error!("{e:?}");
}
}
InternalMessage::MqttDied => todo!(),
}
}
// let (client, mut connection) = self.create_conection();
// self.handle_loop(&mut connection, strip_tx)
// .map_err(|e| ProgramError::Boxed(Box::new(e)))?;
// Iterate to poll the eventloop for connection progress
// for (i, message) in connection.iter().enumerate() {
// info!("Notification #{i} = {:?}", message);
// match message {
// Ok(Event::Incoming(Incoming::Publish(p))) => {
// info!(
// "Got publish notification. Trying to deserialize: {:#?}",
// serde_json::from_slice::<light::JsonIncoming>(&p.payload)
// )
// }
// Ok(_) => todo!(),
// Err(e) => {
// error!("Connection error to mqtt: {e:?}")
// }
// }
// // if let Ok(Event::Incoming(Incoming::Publish(p))) = message {
// // info!(
// // "Got publish notification. Trying to deserialize: {:#?}",
// // serde_json::from_slice::<light::JsonIncoming>(&p.payload)
// // )
// // }
// }
info!("Done with mqtt");
Ok(())
}
fn handle_incoming_message(&self, publish: Publish) -> Result<()> {
info!("Got incoming message: {publish:?}");
fn handle_incoming_message(
&self,
strip_tx: &Sender<strip::Message>,
publish: Publish,
) -> Result<()> {
if publish.topic == self.topics.command_topic {
info!("Got command topic");
let command = serde_json::from_slice::<light::JsonIncoming>(&publish.payload)
.context("Deserializing command message")?;
let translated_command = build_strip_tx_msg(&command);
let translated_command = build_strip_tx_msg(&command)
.context("Translating command to internal strip_tx message")?;
info!("Setting light to state: {:?}", translated_command);
} else if publish.topic == HOMEASSISTANT_TOPIC {
strip_tx
.send(translated_command)
.context("Sending command to strip_tx")?;
} else if publish.topic == homeassistant::HOMEASSISTANT_TOPIC {
if &publish.payload == homeassistant::STATUS_ONLINE {
info!("Homeassistant is online");
self.send_discovery()?;
@ -190,35 +211,6 @@ impl Mqtt {
Ok(())
}
// fn handle_loop(
// &mut self,
// // client: &Client,
// connection: &mut Connection,
// strip_tx: Sender<strip::Message>,
// ) -> Result<()> {
// // Iterate to poll the eventloop for connection progress
// for (i, message) in connection.iter().enumerate() {
// info!("Notification #{i} = {:?}", message);
// match message {
// Ok(Event::Incoming(Incoming::Publish(p))) => {
// info!(
// "Got publish notification. Trying to deserialize: {:#?}",
// serde_json::from_slice::<light::JsonIncoming>(&p.payload)
// )
// }
// Ok(_) => todo!(),
// Err(e) => {
// error!("Connection error to mqtt: {e:?}")
// }
// }
// todo![]
// }
// Ok(())
// }
/// Called after the initial connection to mqtt
fn init(&mut self) -> Result<()> {
// let topics = Topics::new(self.config);
@ -227,7 +219,7 @@ impl Mqtt {
// Check if homeassistant is starting or not
self.client
.subscribe(HOMEASSISTANT_TOPIC, QoS::AtMostOnce)
.subscribe(homeassistant::HOMEASSISTANT_TOPIC, QoS::AtMostOnce)
.context("Subscribing to homeassistant status topic")?;
// Check for commands
@ -261,19 +253,11 @@ impl Mqtt {
Ok(())
}
// /// Called for each message
// fn handle_incoming_event(
// &self,
// idx: usize,
// message: Result<Event, ConnectionError>,
// ) -> Result<()> {
// }
/// Generates a discovery message
fn gen_discovery_message(&self) -> light::JsonDiscovery {
// "<discovery_prefix>/device_trigger/[<node_id>/]<object_id>/config",
// homeassistant/device_trigger/0x90fd9ffffedf1266/action_arrow_left_click/config
let discovery = light::JsonDiscovery {
light::JsonDiscovery {
common: Common {
unique_id: Some(self.config.mqtt_id.to_string()),
..Common::default()
@ -285,16 +269,44 @@ impl Mqtt {
supported_color_modes: Some(vec![light::ColorMode::Rgb]),
state_topic: Some(self.topics.state_topic.clone()),
..light::JsonDiscovery::new(self.topics.command_topic.clone())
};
discovery
}
}
fn gen_state_message(&self, state: &strip::State) -> light::JsonIncoming {
// "<discovery_prefix>/device_trigger/[<node_id>/]<object_id>/config",
// homeassistant/device_trigger/0x90fd9ffffedf1266/action_arrow_left_click/config
light::JsonIncoming {
brightness: None,
color_mode: None,
color_temp: None,
color: Some(IncomingColor {
r: Some(state.color.0 as usize),
g: Some(state.color.1 as usize),
b: Some(state.color.2 as usize),
..IncomingColor::default()
}),
effect: state.pattern.clone(),
state: Some(
(if state.on {
light::STATUS_DEFAULT_LIGHT_ON
} else {
light::STATUS_DEFAULT_LIGHT_OFF
})
.to_string(),
),
// transition: None,
..light::JsonIncoming::default()
}
}
}
fn build_strip_tx_msg(command: &JsonIncoming) -> Option<strip::Message> {
use strip::Message;
info!("Got incoming command: {command:?}");
if let Some(state) = &command.state
&& state == LIGHT_OFF
&& state == light::STATUS_DEFAULT_LIGHT_OFF
{
return Some(Message::ClearLights);
}
@ -316,35 +328,22 @@ fn build_strip_tx_msg(command: &JsonIncoming) -> Option<strip::Message> {
}))));
}
if let Some(brightness) = &command.brightness {
let brightness = *brightness as u8;
return Some(Message::ChangePattern(Box::new(Solid::new(&SolidParams {
color: color::BLACK.fade_to(color::WHITE, brightness),
}))));
}
error!("Not able to parse input as a command: {command:?}");
None
// ClearLights
// ChangePattern(Box<dyn Pattern + Send + Sync>)
// SetNumLights(u16)
// SetTickTime(u64)
// Quit
}
// enum Status {
// Alive,
// Dead,
// }
// impl ToString for Status {
// fn to_string(&self) -> String {
// match self {
// Self::Alive => String::from("alive"),
// Self::Dead => String::from("dead"),
// }
// }
// }
struct Topics {
autodiscovery: String,
state_topic: String,
command_topic: String,
status_topic: String,
}
impl Topics {
@ -354,11 +353,16 @@ impl Topics {
autodiscovery: format!("{}/light/{mqtt_id}/config", config.mqtt_discovery_prefix),
state_topic: format!("{MQTT_PREFIX}/{mqtt_id}/state"),
command_topic: format!("{MQTT_PREFIX}/{mqtt_id}/set"),
status_topic: format!("{MQTT_PREFIX}/{mqtt_id}/status"),
}
}
}
enum InternalMessage {
InboundMqttPacket(Publish),
OutboundStatePacket(strip::State),
MqttDied,
}
// unique_id: bedroom_switch
// name: "Bedroom Switch"
// state_topic: "home/bedroom/switch1"

View File

@ -59,11 +59,13 @@ fn main() -> ProgramResult<()> {
let (strip_terminated_tx, strip_terminated_rx) = channel::<()>();
let (state_tx, state_rx) = channel::<common::strip::State>();
// The strip itself
let config_clone = config.clone();
make_child(message_tx.clone(), move |message_tx| -> ProgramResult<()> {
let mut strip = LedStrip::new(config_clone)?;
strip.strip_loop(message_tx, &strip_rx, strip_terminated_tx)
strip.strip_loop(message_tx, &state_tx, &strip_rx, strip_terminated_tx)
});
// Webui user-interface
@ -78,7 +80,7 @@ fn main() -> ProgramResult<()> {
make_child(
message_tx.clone(),
move |_message_tx| -> ProgramResult<()> {
mqtt::MqttBuilder::new(config.mqtt.clone()).start(mqtt_strip_tx)
mqtt::MqttBuilder::new(config.mqtt.clone()).start(mqtt_strip_tx, state_rx)
// mqtt::start(mqtt_strip_tx, config.mqtt.clone())
},
);

View File

@ -9,10 +9,7 @@ use std::{
cmp,
ops::Add,
str::FromStr,
sync::{
mpsc::{Receiver, Sender},
Arc,
},
sync::mpsc::{Receiver, Sender},
thread,
time::{Duration, Instant},
};
@ -144,10 +141,14 @@ impl LedStrip {
pub fn strip_loop(
&mut self,
message_tx: &Sender<common::Message>,
state_tx: &Sender<common::strip::State>,
rx: &Receiver<Message>,
strip_terminated: Sender<()>,
) -> Result<(), ProgramError> {
let mut exit = false;
let mut state = common::strip::State::default();
let _ = state_tx.send(state.clone());
loop {
let target_time = Instant::now().add(Duration::from_millis(self.config.tick_time_ms));
@ -159,6 +160,8 @@ impl LedStrip {
}));
if pat.init(self.pattern_num_lights()).is_ok() {
self.pattern = pat;
state.on = false;
let _ = state_tx.send(state.clone());
info!("Cleared lights");
} else {
let _result = message_tx.send(common::Message::String(format!(
@ -174,6 +177,9 @@ impl LedStrip {
self.pattern = pat;
info!("Changed pattern");
state.on = true;
state.pattern = Some(format!("Rainbow"));
let _ = state_tx.send(state.clone());
}
Err(e) => {
let _result = message_tx.send(common::Message::String(format!(