Files
CTC_Badge/Firmware/main.py
2024-07-13 13:05:41 -05:00

1017 lines
34 KiB
Python

'''
Written by: Amelia Wietting
Date: 20240124
For: FTC Team 19415
This update was pushed OTA. Do not change it unless you know what you are doing
'''
from CONFIG.WIFI_CONFIG import COUNTRY, MAX_WIFI_CONNECT_TIMEOUT, WIFI_LIST
from CONFIG.MQTT_CONFIG import MQTT_USERNAME, MQTT_PASSWORD, MQTT_SERVER, MQTT_CLIENT_ID
from CONFIG.FTC_TEAM_CONFIG import TEAM_ASSIGNED
from CONFIG.CLOCK_CONFIG import NTP_SERVER, TIMEZONE_OFFSET, DAYLIGHT_SAVING
from CONFIG.LED_MANAGER import NUM_LEDS, LED_PIN, BRIGHTNESS, STARTING_ANIMATION
from CONFIG.OTA_CONFIG import OTA_HOST, PROJECT_NAME, FILENAMES
from updates import update_file_replace
from helper import hsv_to_rgb
import micropython
import uasyncio
import time
import ntptime
import utime
import urequests
from machine import Pin, reset
import network
from umqtt.simple import MQTTClient
import sys
import json
import neopixel
import uos
import machine
import gc
# from micropyGPS.micropyGPS import MicropyGPS
from machine import Pin, UART
#led = Pin(25, Pin.OUT)
GPS_SCAN_TIMEOUT_SECONDS = 5
# TODO: Old vaieable, should clean this up
current_color = "AA0000"
# TODO: Make it more clear what this does
UPDATE_INTERVAL_BLINKIES = 0.0001 # refresh interval for blinkies in seconds
current_leds = [[0] * 3 for _ in range(NUM_LEDS)]
target_leds = [[0] * 3 for _ in range(NUM_LEDS)]
# Set up our neopixel LED strip
led_strip = neopixel.NeoPixel(Pin(LED_PIN), NUM_LEDS)
# Asynchronous tasks management
animation_task = None
quit_animation = False
wifi_connected = False
mqtt_connected = False
STANDALONE_MODE = True ## TRINA-TODO UPDATE
# Set up the clock stuffs
SECOND_HAND_POS = 0 # Starting position of the second hand
MINUTE_HAND_POS = 0 # Starting position of the second hand
HOUR_HAND_POS = 0 # Starting position of the second hand
LAST_UPDATE = utime.time() # Time of the last update
last_drawn_hand = 0
LEDS_PER_CIRCLE = NUM_LEDS//2
timezone_offset_sync = 0
network_list = None
if DAYLIGHT_SAVING:
timezone_offset_mod = TIMEZONE_OFFSET + 1
gps_loc = []
GPS_SCAN_TIMEOUT_SECONDS = 5
WIFI_SCAN_TIMEOUT_SECONDS = 10
segment_len = NUM_LEDS//3 * 2
network_list = None
async def wifi_scan():
global network_list, segment_len
while True:
networks = wlan.scan()
if networks is not None:
#print(f"GPS: {gps_string}")
#if gps_string is None:
#gps_string = "None"
print("WiFi Network Count:", len(networks))
if not network_list:
network_list = []
segment_len = len(networks)
for network in networks:
if network not in network_list:
network_list.append(network)
#ssid = network[0]
#print("WiFi Network:", ssid)
#print(f"Network obj: {network}")
#network_string = str(network)
await uasyncio.sleep(WIFI_SCAN_TIMEOUT_SECONDS)
# Setup timer interrupt
#wifi_scan_timer = machine.Timer(-1)
#wifi_scan_timer.init(period=10000, mode=machine.Timer.PERIODIC, callback=wifi_scan)
def convert_coordinates(sections):
if sections[0] == 0: # sections[0] contains the degrees
return None
# sections[1] contains the minutes
data = sections[0] + (sections[1] / 60.0)
# sections[2] contains 'E', 'W', 'N', 'S'
if sections[2] == 'S':
data = -data
if sections[2] == 'W':
data = -data
data = '{0:.6f}'.format(data) # 6 decimal places
return str(data)
## GPS scan task
async def gps_scan():
global gps_loc
gps_module = UART(1, baudrate=9600, tx=Pin(4), rx=Pin(5))
time_zone = -3
gps = MicropyGPS(time_zone)
while True:
#print("Checking for GPS updates")
length = gps_module.any()
if length > 0:
data = gps_module.read()
gps_loc.append(f"{data}\n")
print(data)
for byte in data:
message = gps.update(chr(byte))
latitude = convert_coordinates(gps.latitude)
longitude = convert_coordinates(gps.longitude)
if latitude is None or longitude is None:
continue
print('Lat: ' + latitude)
print('Lon: ' + longitude)
await uasyncio.sleep(GPS_SCAN_TIMEOUT_SECONDS)
# TODO: Explain better batching up our MQTT messages to be shipped off for power saving. See if we can save a local file to pick up on restart
async def publish_list_of_mqtt_messages():
global network_list, gps_loc
while True:
if network_list is not None:
for network in network_list:
publish_to_mqtt("wifi_data",f"{network},{MQTT_CLIENT_ID}")
network_list = None
if gps_loc is not None:
for message in gps_loc:
message_str = str(message)
publish_to_mqtt("gps_data",f"{message_str},{MQTT_CLIENT_ID}")
gps_loc = []
await uasyncio.sleep(10)
def get_time():
return utime.localtime()
async def set_time():
global timezone_offset_sync
ntptime.host = NTP_SERVER
while True:
try:
cur_time = get_time()
print("Local time before synchronization: %s" % str(get_time()))
# Make sure to have internet connection
ntptime.settime()
new_time = get_time()
if new_time[6]-cur_time[6] > 1:
#we got ahead, need to go back
#adjust things to sync with the offset
timezone_offset_sync = cur_time[6]-new_time[6]
else:
timezone_offset_sync = 0
print("Local time after synchronization: %s" % str(get_time()))
except Exception as e:
print("Error syncing time:", e)
await uasyncio.sleep(3600)
ticked = False
tick_number = 0
def handle_time_message(msg):
global SECOND_HAND_POS, LAST_UPDATE, MINUTE_HAND_POS, HOUR_HAND_POS, ticked, tick_number
now = utime.time()
try:
# Print the received message for debugging
#print("Received time message:", msg)
tick_number_str, time_str = msg.split(',')
tick_number = int(tick_number_str.strip()) # Convert tick_number to an integer
time_parts = time_str.split(':')
if len(time_parts) != 3:
print("Unexpected time format:", time_str)
return
hours, minutes, seconds = [int(part.strip()) for part in time_parts]
# Update minute hand position
MINUTE_HAND_POS = int((minutes * LEDS_PER_CIRCLE // 60 + LEDS_PER_CIRCLE) % LEDS_PER_CIRCLE)
# Update hour hand position (approximation)
HOUR_HAND_POS = int(((hours % 12) * LEDS_PER_CIRCLE // 12 + minutes // 12) % LEDS_PER_CIRCLE)
#ticks = seconds * 2
SECOND_HAND_POS = int((seconds * LEDS_PER_CIRCLE // 60) % LEDS_PER_CIRCLE) #seconds % NUM_LEDS
#SECOND_HAND_POS = int(SECOND_HAND_POS % LEDS_PER_CIRCLE + LEDS_PER_CIRCLE)
if SECOND_HAND_POS < NUM_LEDS:
SECOND_HAND_POS = int(SECOND_HAND_POS + LEDS_PER_CIRCLE)
if MINUTE_HAND_POS < NUM_LEDS:
MINUTE_HAND_POS = int(MINUTE_HAND_POS + LEDS_PER_CIRCLE)
if HOUR_HAND_POS > LEDS_PER_CIRCLE:
HOUR_HAND_POS = int(HOUR_HAND_POS - LEDS_PER_CIRCLE)
LAST_UPDATE = utime.time()
ticked = True
#print(f"Handled time message, {tick_number}, {SECOND_HAND_POS}")
#print(f"Handled time message, {tick_number}, {SECOND_HAND_POS}")
#print(f"Buzzing haptics!")
#drv2605.set_realtime_input(255)
#await uasyncio.sleep(0.01)
#drv2605.set_realtime_input(0)
#print("Handled time message, MINUTE_HAND_POS:", MINUTE_HAND_POS)
#print("Handled time message, HOUR_HAND_POS:", HOUR_HAND_POS)
except Exception as e:
print("Error in handle_time_message:", str(e))
async def handle_ticking():
global SECOND_HAND_POS, LAST_UPDATE, MINUTE_HAND_POS, HOUR_HAND_POS, ticked, tick_number
now = utime.time()
while True:
# Print the received message for debugging
#print("Received time message:", msg)
#tick_number_str, time_str = msg.split(',')
#tick_number = int(tick_number_str.strip()) # Convert tick_number to an integer
#time_parts = time_str.split(':')
dateTimeObj = utime.localtime()
Dyear, Dmonth, Dday, Dhour, Dmin, Dsec, Dweekday, Dyearday = (dateTimeObj)
time_parts = [Dhour,Dmin,Dsec]
if len(time_parts) != 3:
print("Unexpected time format:", time_str)
return
hours, minutes, seconds = [part for part in time_parts]
print(f"Ticked Manually: {Dhour}, {Dmin}, {Dsec}")
# Update minute hand position
MINUTE_HAND_POS = int((minutes * LEDS_PER_CIRCLE // 60 + LEDS_PER_CIRCLE) % LEDS_PER_CIRCLE)
# Update hour hand position (approximation)
HOUR_HAND_POS = int(((hours % 12) * LEDS_PER_CIRCLE // 12 + minutes // 12) % LEDS_PER_CIRCLE)
#ticks = seconds * 2
SECOND_HAND_POS = int((seconds * LEDS_PER_CIRCLE // 60) % LEDS_PER_CIRCLE) #seconds % NUM_LEDS
#SECOND_HAND_POS = int(SECOND_HAND_POS % LEDS_PER_CIRCLE + LEDS_PER_CIRCLE)
if SECOND_HAND_POS < NUM_LEDS:
SECOND_HAND_POS = int(SECOND_HAND_POS + LEDS_PER_CIRCLE)
if MINUTE_HAND_POS < NUM_LEDS:
MINUTE_HAND_POS = int(MINUTE_HAND_POS + LEDS_PER_CIRCLE)
if HOUR_HAND_POS > LEDS_PER_CIRCLE:
HOUR_HAND_POS = int(HOUR_HAND_POS - LEDS_PER_CIRCLE)
LAST_UPDATE = utime.time()
ticked = True
await uasyncio.sleep_ms(1000)
# TODO: Move these to a better place
pause_animation = False
pause_timeout = 0
def hex_to_rgb(hex_str):
hex_str = hex_str.lstrip('#')
return tuple(int(hex_str[i:i+2], 16) for i in (0, 2, 4))
def normalize_color(r, g, b, max_value=BRIGHTNESS):
max_current = max(r, g, b)
if max_current <= max_value:
return r, g, b # No need to normalize
scale_factor = max_value / max_current
return round(r * scale_factor), round(g * scale_factor), round(b * scale_factor)
def make_leds_color(color_hex="770000,4"):
global current_color, pause_animation, pause_timeout
data = color_hex.split(",")
pause_animation = True
pause_timeout = float(data[1])
current_color = data[0]
r, g, b = hex_to_rgb(current_color)
#print(f"make_leds_color {r},{g},{b}")
for i in range(NUM_LEDS):
led_strip[i] = (r, g, b)
led_strip.write() # Update the strip
## TODO: Better document and move this section which is for telling you Wifi isn't connected
# Set the range of pins we want
width_of_wifi_status_leds = 2
# Figure out where our center LED for the wifi indicator will be
wifi_led_center = NUM_LEDS/2 - NUM_LEDS/4
wifi_status_cur_led = wifi_led_center
wifi_status_led_range = [wifi_led_center-width_of_wifi_status_leds, wifi_led_center+width_of_wifi_status_leds]
# Start by using the range from the middle of the wifi status indicator to the edge. It'll be 5 wide and pulse back and forth with no more than three on at a time.
#
def hue_offset(index, offset, divisor = 2):
return (float(index) / (NUM_LEDS // divisor) + offset) % 1.0
# Global variable for direction change
direction_change = False
loop_count = 0
hue_cache = {}
def update_strip(position, length, cycle, direction, hue_increment):
global hue_cache
# Turn off all LEDs
for i in range(NUM_LEDS):
led_strip[i] = (0, 0, 0)
# Base hue adjusted by the cycle for dynamic coloring
base_hue = (cycle / MAX_COLOR_CYCLE) % 1 # Ensure hue is between 0 and 1
# Adjust the increment based on the direction of the chase
hue_increment = hue_increment * (1 if direction == 1 else -1)
# Turn on LEDs in the specified segment with a trailing rainbow effect
for offset in range(length):
# Calculate hue based on direction
current_hue = (base_hue + offset * hue_increment) % 1 # Wrap around the hue
color = hsv_to_rgb(current_hue, 1, BRIGHTNESS)
idx = (position + offset) % NUM_LEDS # Handle LED index wrap-around
led_strip[idx] = color
led_strip.write()
async def set_leds(led_settings):
global ticked
"""
Set the LEDs based on a list of settings.
Each setting in the list should be in the format [LED, H, S, V, SleepTime].
"""
for setting in led_settings:
led_index, hue, saturation, value, sleep_time = setting
# Convert HSV to RGB
r, g, b = hsv_to_rgb(hue, saturation, value)
# Convert float RGB values (0 to 1 range) to integer RGB (0 to 255 range)
r, g, b = int(r * 255), int(g * 255), int(b * 255)
# Set the LED color
#r,g,b = normalize_color(r,g,b)
#print(f"set_leds {r},{g},{b}")
led_strip[led_index] = (r, g, b)
led_strip.write()
#if ticked:
#print("Cancel this run!")
#return False, led_settings # Return the remaining settings
return True, None
def hsv_to_rgb(h, s, v):
if s == 0.0: return (v, v, v)
i = int(h*6.)
f = (h*6.)-i
p,q,t = int(255*(v*(1.-s))), int(255*(v*(1.-s*f))), int(255*(v*(1.-s*(1.-f))))
v = int(255*v)
i %= 6
if i == 0: return (v, t, p)
if i == 1: return (q, v, p)
if i == 2: return (p, v, t)
if i == 3: return (p, q, v)
if i == 4: return (t, p, v)
if i == 5: return (v, p, q)
# Interrupt handler function
def toggle_direction(timer):
global direction_change, ticked, STANDALONE_MODE
toggle_direction = not direction_change
if STANDALONE_MODE:
ticked = True
# Setup timer interrupt
timer = machine.Timer(-1)
#timer.init(period=500, mode=machine.Timer.PERIODIC, callback=toggle_direction)
SEGMENT_LENGTH = NUM_LEDS//3 #NUM_LEDS // 3
seg_length = 0
MAX_COLOR_CYCLE = 360 # Maximum value for the color cycle
position = 0
async def chase():
global direction_change, loop_count, quit_animation, pause_animation, pause_timeout, position, ticked, segment_len
direction = -1 # Start with any direction, -1 or 1
cycle = 1
hue_increment = 60.0 / 360
while not quit_animation:
if pause_animation:
# print(f"Pausing chase for {pause_timeout} seconds")
await uasyncio.sleep(pause_timeout)
pause_timeout = 0
pause_animation = False
update_strip(position, segment_len, cycle, direction, hue_increment) # Pass direction to update_strip
# Update the position and cycle
position += direction
cycle += 1
if cycle > MAX_COLOR_CYCLE:
cycle = 1
# Check for direction change
if ticked:
direction *= -1
ticked = False
await uasyncio.sleep(.1) # Non-blocking sleep
# TODO: Move this to be by the other time stuff
def get_clock_hand_positions():
# Get the current time
t = utime.localtime()
hour = t.tm_hour % 12 # Convert to 12-hour format
minute = t.tm_min
second = t.tm_sec
# Calculate positions
hour_pos = int((hour / 12.0) * NUM_LEDS)
minute_pos = int((minute / 60.0) * NUM_LEDS)
second_pos = int((second / 60.0) * NUM_LEDS)
return hour_pos, minute_pos, second_pos
# TODO: Document how the rainbows work
SPEED = 5
UPDATES = 1000
async def rainbows(timeout_mod = 0):
global SPEED, UPDATES
offset = 0.0
run_on_timeout = False
if timeout_mod > 0:
run_on_timeout = True
timeout = 150
start_time = utime.time()
print(f"Making it all RAINBOWS up in here one sec...")
while True:
current_time = utime.time()
if current_time - start_time > timeout and run_on_timeout:
uasyncio.create_task(run_animation("chase"))
break
SPEED = min(255, max(1, SPEED))
offset += float(SPEED) / 500.0
pins_to_skip = set()
if not wifi_connected:
pins_to_skip.update(range(int(wifi_status_led_range[0]), int(wifi_status_led_range[1])))
if mqtt_connected:
hour_hand_positions = [(HOUR_HAND_POS + offset) % NUM_LEDS for offset in range(-2, 3)]
minute_hand_positions = [(MINUTE_HAND_POS + offset) % NUM_LEDS for offset in range(-1, 2)]
pins_to_skip.update(hour_hand_positions)
pins_to_skip.update(minute_hand_positions)
# Setting second hand on both rows
second_hand_positions = [SECOND_HAND_POS % NUM_LEDS]
second_hand_hue = (hue_offset(SECOND_HAND_POS, offset) + 0.5) % 1.0
complementary_hue = (hue_offset(0, offset) + 0.5) % 1.0
#hue = float(i) / (NUM_LEDS // 2)
rgb = hsv_to_rgb(complementary_hue, 1.0, BRIGHTNESS)
rgb_int = tuple(int(c * 255) for c in rgb)
for pos in second_hand_positions:
led_strip[pos] = rgb_int
#led_strip.set_hsv(pos, second_hand_hue, 1.0, 1.0)
pins_to_skip.add(pos)
complementary_hue = (hue_offset(pos, offset) + 0.5) % 1.0
#hue = float(i) / (NUM_LEDS // 2)
rgb = hsv_to_rgb(complementary_hue, 1.0, BRIGHTNESS)
rgb_int = tuple(int(c * 255) for c in rgb)
# Setting hour and minute hands with offset hue
for pos in hour_hand_positions + minute_hand_positions:
led_strip[pos] = rgb_int
#led_strip.set_hsv(pos, complementary_hue, 1.0, 1.0)
pins_to_skip.add(pos)
for i in range(NUM_LEDS):
if i in pins_to_skip:
continue
hue = hue_offset(i, offset)
rgb = hsv_to_rgb(hue + offset, 1.0, BRIGHTNESS)
#rgb_int = tuple(int(c * BRIGHTNESS * 255) for c in rgb)
r = rgb[0]
g = rgb[1]
b = rgb[2]
#print(f"Rainbows {r},{g},{b}")
led_strip[i] = rgb
#led_strip.set_hsv(i, hue, 1.0, BRIGHTNESS)
led_strip.write()
await uasyncio.sleep(0.01)
# TODO: Document how the rainbows work
SPEED = 5
UPDATES = 1000
async def movie_ticker(timeout_mod = 0):
border_led_position = [0,1,2,3,4,5,6,7,8,23,24,39,40,55,56,57,58,59,60,61,62,63,48,47,32,31,16,15]
global SPEED, UPDATES
offset = 0.0
run_on_timeout = False
if timeout_mod > 0:
run_on_timeout = True
timeout = 150
start_time = utime.time()
print(f"Making it SHOW TIME up in here one sec...")
while True:
current_time = utime.time()
if current_time - start_time > timeout and run_on_timeout:
uasyncio.create_task(run_animation("chase"))
break
SPEED = min(255, max(1, SPEED))
offset += float(SPEED) / 500.0
pins_to_skip = set()
for i in border_led_position:
if i not in border_led_position:
continue
hue = hue_offset(i, offset)
rgb = hsv_to_rgb(hue + offset, 1.0, BRIGHTNESS)
#rgb_int = tuple(int(c * BRIGHTNESS * 255) for c in rgb)
r = rgb[0]
g = rgb[1]
b = rgb[2]
#print(f"Rainbows {r},{g},{b}")
led_strip[i] = rgb
#led_strip.set_hsv(i, hue, 1.0, BRIGHTNESS)
led_strip.write()
await uasyncio.sleep(0.01)
async def i_dont_know_why_this_works(color=1):
global current_color, loop_count
hue_1, hue_2 = (100, 220) if color == "1" else (0, 45) if color == "2" else (150, 180)
# Assuming BRIGHTNESS is now a value between 0 and 255
brightness_scale = MAX_SOLID_BRIGHTNESS / 255.0 # Convert to 0-1 scale
while True:
for i in range(NUM_LEDS):
cycle = hue_1 if i % 2 == 0 else hue_2
update_strip(i, 1, cycle / MAX_COLOR_CYCLE) # Update each LED individually
led_strip.write()
await uasyncio.sleep(UPDATE_INTERVAL_BLINKIES)
for i in range(NUM_LEDS):
cycle = hue_2 if i % 2 == 0 else hue_1
update_strip(i, 1, cycle / MAX_COLOR_CYCLE) # Update each LED individually
led_strip.write()
await uasyncio.sleep(UPDATE_INTERVAL_BLINKIES)
# Transition to the rainbows animation
animation_task = uasyncio.create_task(rainbows())
await animation_task
async def alternating_blinkies(color="1"):
if color == "1":
hue_1, hue_2 = 50, 220
elif color == "2":
hue_1, hue_2 = 0, 30
elif color == "3":
hue_1, hue_2 = 220, 30
else:
hue_1, hue_2 = 50, 100
# Assuming BRIGHTNESS is now a value between 0 and 255
brightness_scale = BRIGHTNESS # Convert to 0-1 scale
start_time = utime.time()
timeout = 60
while True:
current_time = utime.time()
if current_time - start_time > timeout:
break
# First pattern
led_settings = []
for i in range(NUM_LEDS):
hue = hue_1 if i % 2 == 0 else hue_2
led_settings.append([i, hue / 360, 1.0, BRIGHTNESS, 0]) # Sleep time is set to 0
await set_all_leds_once(led_settings)
await uasyncio.sleep(UPDATE_INTERVAL_BLINKIES)
# Second pattern
led_settings = []
for i in range(NUM_LEDS):
hue = hue_2 if i % 2 == 0 else hue_1
led_settings.append([i, hue / 360, 1.0, BRIGHTNESS, 0]) # Sleep time is set to 0
await set_all_leds_once(led_settings)
await uasyncio.sleep(UPDATE_INTERVAL_BLINKIES)
uasyncio.create_task(run_animation(STARTING_ANIMATION))
async def set_all_leds_once(led_settings):
for setting in led_settings:
led_index, hue, saturation, value, _ = setting # Ignore sleep time here
r, g, b = hsv_to_rgb(hue, saturation, value)
r, g, b = int(r * 255), int(g * 255), int(b * 255)
#r,g,b = normalize_color(r,g,b)
#print(f"set_all_leds_at_once {r},{g},{b}")
led_strip[led_index] = (r, g, b)
led_strip.write()
async def run_animation(animation_name, color=1):
global quit_animation
global animation_task
#quit_animation = False
print(f"Running animation {animation_name}..")
if animation_task:
# quit_animation = True
animation_task.cancel()
if animation_name == "alternating_blinkies":
animation_task = uasyncio.get_event_loop().create_task(alternating_blinkies(color))
elif animation_name == "rainbows":
animation_task = uasyncio.get_event_loop().create_task(rainbows())
elif animation_name == "chase":
animation_task = uasyncio.get_event_loop().create_task(chase())
elif animation_name == "idk":
animation_task = uasyncio.get_event_loop().create_task(i_dont_know_why_this_works())
elif animation_name == "movie_ticker":
animation_task = uasyncio.get_event_loop().create_task(movie_ticker())
await animation_task
def update_file_from_mqtt_message(msg_string):
print(f"Starting update process for {msg_string}...")
update_file_replace(msg_string)
def sub_cb(topic, msg):
msg_string = msg.decode("UTF-8")
#print(f"Received message: '{msg_string}' on topic: '{topic}'") # Debugging output
if topic == b'color_change':
print("Changing LED color...") # Debugging output
make_leds_color(msg_string)
elif topic == b'scores':
print("Processing scores...") # Debugging output
data = msg_string.split(",")
game_outcome = data[1]
result_match = 0
team = data[0]
print(f"Team: {team}, Game Outcome: {game_outcome}") # Debugging output
if team == TEAM_ASSIGNED:
print("Running alternating blinkies animation...") # Debugging output
if game_outcome == "start":
uasyncio.create_task(run_animation("rainbows"))
result_match = "1"
elif game_outcome == "tie":
result_match = "3"
elif game_outcome == "loss":
result_match = "2"
elif game_outcome == "win":
result_match = "1"
uasyncio.create_task(run_animation("alternating_blinkies", result_match))
elif topic == b'animate':
print("Running custom animation...") # Debugging output
data = msg_string.split(",")
animation_string = data[0]
color_blinkies = data[1] if len(data) > 1 else None
print(f"Animation: {animation_string}, Color: {color_blinkies}") # Debugging output
uasyncio.create_task(run_animation(animation_string, color_blinkies))
elif topic == b'audio_reactive':
uasyncio.create_task(handle_audio_data(msg_string))
elif topic == b'time':
handle_time_message(msg_string)
elif topic == b'update':
print(f"Triggered msg string {msg_string}")
#update_file_from_mqtt_message(msg_string)
async def mqtt_task(client):
while True:
try:
client.check_msg()
await uasyncio.sleep(0.5)
except Exception as e:
print(f"Errors checking messages: {e}")
reset()
async def connect_to_wifi():
global wifi_connected
global wlan
wifi_connected = False
# TODO update this to look for all networks in the list WIFI_LIST=[["WhyFhy","WhyKnot42!"],["IoT","1234567890"]] and try to connect to the ones it finds
# set up wifi
connection_attempts=0
try:
#status_handler("Scanning for your wifi network one sec")
# This is being moved to setup so wifi scanning will still work in the background
# Setup should be done globally now oops
#wlan = network.WLAN(network.STA_IF)
#wlan.active(True)
nets = wlan.scan()
for net in nets:
print(f'Network seen: {net}')
for network_config in WIFI_LIST:
ssid_to_find = network_config[0]
if ssid_to_find == net[0].decode('utf-8'):
print(f'Network found! {ssid_to_find}')
print(f"Attempting to connect to SSID: {ssid_to_find}")
if len(network_config) == 0:
wlan.connect(ssid_to_find)
else:
wlan.connect(ssid_to_find, network_config[1])
while not wlan.isconnected():
#await status_handler(f"Waiting to connect to the network: {ssid_to_find}...")
connection_attempts += 1
#await uasyncio.sleep(1)
if connection_attempts > MAX_WIFI_CONNECT_TIMEOUT:
print("Exceeded MAX_WIFI_CONNECT_TIMEOUT!!!")
break
wifi_connected = True
print('WLAN connection succeeded!')
break
else:
print(f"Unable to find SSID: {ssid_to_find}")
if wifi_connected:
break
except Exception as e:
print(f"Setup failed: {e}")
# Status handler function
row_one = False
async def status_handler(message):
global wifi_connected, row_one
print(message)
if row_one:
print(f"Row one")
for i in range(NUM_LEDS//2):
led_strip[i] = (0, 0, 0)
await uasyncio.sleep(NUM_LEDS // 2 * 0.0005)
else:
print(f"Row Two!!")
for i in range(NUM_LEDS//2, NUM_LEDS):
led_strip[i] = (0, 0, 0)
await uasyncio.sleep(NUM_LEDS // 2 * 0.0005)
led_strip.write()
row_one = not row_one
if row_one:
for i in range(NUM_LEDS//2):
make_leds_color("008800")
await uasyncio.sleep(NUM_LEDS//2 * 0.001)
else:
for i in range(NUM_LEDS//2, NUM_LEDS):
led_strip[i] = (100, 100, 100)
await uasyncio.sleep(NUM_LEDS//2 * 0.001)
led_strip.write()
def connectMQTT():
global mqtt_connected
client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_SERVER,
port=0,
user=MQTT_USERNAME,
password=MQTT_PASSWORD,
keepalive=0
)
client.set_callback(sub_cb)
try:
client.connect()
mqtt_connected = True
except Exception as e:
print('Error connecting to %s MQTT broker error: %s' % (MQTT_SERVER, e))
topics = [b'time',b'color_change', b'scores', b'animate', b'audio_reactive', b'chase', b'update']
if mqtt_connected:
for topic in topics:
try:
client.subscribe(topic)
print('Connected to {} MQTT broker, subscribed to {} topic'.format(MQTT_SERVER, topic.decode()))
except Exception as e:
print('Error subscribing to %s topic! Error: %s' % (topic.decode(), e))
return client
def publish_to_mqtt(topic, message):
try:
client.publish(topic, message)
#print('Message published to topic {}: {}'.format(topic, message))
except Exception as e:
print(f'Error publishing message to topic {topic}: {message}, {e}')
async def setup_wireless():
global wifi_connected
global mqtt_connected
global wlan
try:
if not wifi_connected:
for i in range(0, 3):
await connect_to_wifi()
if wifi_connected:
print('Wifi connection successful!')
wifi_status = "connected"
for _ in range(2): # Flash red green times
make_leds_color(color_hex="000900,0.25")
time.sleep(0.5)
make_leds_color(color_hex="000000,0.25")
time.sleep(0.5)
make_leds_color(color_hex="09000F,0.25")
break
else:
print(f'Wifi connection failed!')
wifi_status = "failed"
for _ in range(2): # Flash red three times
make_leds_color(color_hex="090000,0.25")
time.sleep(.5)
make_leds_color(color_hex="000000,0.25")
time.sleep(0.5)
except Exception as e:
print(f'Wifi connection failed! {e}')
wifi_status = "failed"
wifi_connected = False
for _ in range(4): # Flash red three times
make_leds_color(color_hex="090000,0.25")
time.sleep(.5)
make_leds_color(color_hex="000000,0.25")
time.sleep(0.5)
# if no wifi, then you get...
if wifi_connected:
set_time()
counter = 0
for i in range(0,5):
try:
await uasyncio.sleep(2)
print("Attempting to connect to MQTT broker...")
client = connectMQTT()
if mqtt_connected:
for _ in range(2): # Flash red green times
make_leds_color(color_hex="000F0F,0.25")
time.sleep(0.25)
make_leds_color(color_hex="000000,0.25")
time.sleep(0.25)
make_leds_color(color_hex="000F0F,0.25")
mqtt_connected = True
else:
print(f'MQTT connection failed!')
wifi_status = "failed"
for _ in range(2): # Flash red three times
make_leds_color(color_hex="080000,0.25")
time.sleep(.5)
make_leds_color(color_hex="000000,0.25")
time.sleep(0.5)
mqtt_connected = False
#make_leds_color(color_hex="005500,2")
return client
except Exception as e:
print("Failed to connect to MQTT: %s" % e)
#make_leds_color(color_hex="FF0000,2")
#
async def check_connections():
global wifi_connected, mqtt_connected
while True:
if not wlan.isconnected():
mqtt_connected = False
print('WiFi disconnected, attempting to reconnect WiFi and MQTT...')
await setup_wireless()
await uasyncio.sleep(10) # Check connection status every 10 seconds
def start_wifi_card():
global wlan
print("Kicking on the wifi card one sec..")
try:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
print("WiFi card initialized")
except:
print("Errors initializing WiFi")
async def main():
global wifi_connected, mqtt_connected
global client, stand_alone
# Run our beginning animation
print(f"Turning on the starting animation: {STARTING_ANIMATION}")
uasyncio.create_task(run_animation(STARTING_ANIMATION))
# Start the Wifi Card
start_wifi_card()
uasyncio.create_task(handle_ticking())
# Enable if you want to scan for wifi
#uasyncio.create_task(wifi_scan())
# If we're not in standalone mode IE No WiFi mode go ahead and try to connect to a network
if not STANDALONE_MODE:
max_attempts = 3
for x in range(0,3):
if not wifi_connected or not mqtt_connected:
client = await setup_wireless()
else:
break
if wifi_connected:
# This is janky, need to make it work better
uasyncio.create_task(check_connections())
# This will reach out to the interwebs to grab the time remotely every hour
uasyncio.create_task(set_time())
# This is how you check in with my server for updates!
if mqtt_connected:
uasyncio.create_task(mqtt_task(client))
uasyncio.create_task(publish_list_of_mqtt_messages())
while True:
await uasyncio.sleep_ms(1)
uasyncio.run(main())