Get mqtt working

This commit is contained in:
Austen Adler 2024-04-07 20:10:08 -04:00
parent 6cb9883a7c
commit aa96d81fa3
2 changed files with 46 additions and 17 deletions

View File

@ -76,10 +76,15 @@ pub struct Config {
#[clap(short, long, env, help = "Reverse all patterns")] #[clap(short, long, env, help = "Reverse all patterns")]
pub reverse: bool, pub reverse: bool,
#[clap(long, env, help = "MQTT id")] #[clap(long, env, help = "MQTT broker host", requires_all = ["mqtt_id", "mqtt_port"])]
pub mqtt_id: String, pub mqtt_broker: Option<String>,
#[clap(long, env, help = "MQTT broker host")] #[clap(long, env, help = "MQTT broker port", default_value = "1883")]
pub mqtt_broker: String, pub mqtt_port: Option<u16>,
#[clap(long, env, help = "MQTT broker port")] #[clap(long, env, help = "MQTT device id")]
pub mqtt_port: u16, pub mqtt_id: Option<String>,
#[clap(long, env, help = "MQTT username", requires_all = ["mqtt_password"])]
pub mqtt_username: Option<String>,
#[clap(long, env, help = "MQTT user password")]
pub mqtt_password: Option<String>,
} }

View File

@ -1,7 +1,9 @@
#![feature(let_chains)]
use common::Config; use common::Config;
use rumqttc::{Client, MqttOptions, QoS}; use rumqttc::{Client, MqttOptions, QoS};
use std::{thread, time::Duration}; use std::{thread, time::Duration};
use tracing::info; use tracing::info;
use tracing::warn;
use common::{error::ProgramResult, strip}; use common::{error::ProgramResult, strip};
use std::{ use std::{
@ -12,26 +14,48 @@ use std::{
pub fn start(strip_tx: Sender<strip::Message>, config: Config) -> ProgramResult<()> { pub fn start(strip_tx: Sender<strip::Message>, config: Config) -> ProgramResult<()> {
info!("Starting mqtt"); info!("Starting mqtt");
x(&config);
Ok(()) Ok(())
} }
fn x(config: &Config) { fn x(config: &Config) {
let mut mqttoptions = MqttOptions::new(&config.mqtt_id, &config.mqtt_broker, config.mqtt_port); let mut mqttoptions = if let Some(mqtt_id) = config.mqtt_id.as_ref()
&& let Some(mqtt_broker) = config.mqtt_broker.as_ref()
&& let Some(mqtt_port) = config.mqtt_port
{
MqttOptions::new(mqtt_id, mqtt_broker, mqtt_port)
} else {
warn!("MQTT is not emabled as flags are not all set");
return;
};
mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_keep_alive(Duration::from_secs(5));
let (mut client, mut connection) = Client::new(mqttoptions, 10); if let Some(mqtt_username) = config.mqtt_username.as_ref()
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap(); && let Some(mqtt_password) = config.mqtt_password.as_ref()
thread::spawn(move || { {
for i in 0..10 { info!("Using authentication with mqtt");
client mqttoptions.set_credentials(mqtt_username, mqtt_password);
.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize])
.unwrap();
thread::sleep(Duration::from_millis(100));
} }
});
info!("Starting mqtt client");
let (mut client, mut connection) = Client::new(mqttoptions, 10);
info!("Subscribing to homeassistant");
client
.subscribe("homeassistant/status", QoS::AtMostOnce)
.unwrap();
// thread::spawn(move || {
// for i in 0..10 {
// client
// .publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize])
// .unwrap();
// thread::sleep(Duration::from_millis(100));
// }
// });
// Iterate to poll the eventloop for connection progress // Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() { for (i, notification) in connection.iter().enumerate() {
println!("Notification = {:?}", notification); info!("Notification = {:?}", notification);
} }
info!("Done with mqtt");
} }