''' Written by: Amelia Wietting Date: 20240124 For: FTC Team 19415 This update was pushed OTA. Do not change it unless you know what you are doing ''' from CONFIG.WIFI_CONFIG import COUNTRY, MAX_WIFI_CONNECT_TIMEOUT, WIFI_LIST from CONFIG.MQTT_CONFIG import MQTT_USERNAME, MQTT_PASSWORD, MQTT_SERVER, MQTT_CLIENT_ID from CONFIG.FTC_TEAM_CONFIG import TEAM_ASSIGNED from CONFIG.CLOCK_CONFIG import NTP_SERVER, TIMEZONE_OFFSET, DAYLIGHT_SAVING from CONFIG.LED_MANAGER import NUM_LEDS, LED_PIN, BRIGHTNESS, STARTING_ANIMATION from CONFIG.OTA_CONFIG import OTA_HOST, PROJECT_NAME, FILENAMES from updates import update_file_replace from helper import hsv_to_rgb import micropython import uasyncio import time import ntptime import utime import urequests from machine import Pin, reset import network from umqtt.simple import MQTTClient import sys import json import neopixel import uos import machine import gc # from micropyGPS.micropyGPS import MicropyGPS from machine import Pin, UART #led = Pin(25, Pin.OUT) GPS_SCAN_TIMEOUT_SECONDS = 5 # TODO: Old vaieable, should clean this up current_color = "AA0000" # TODO: Make it more clear what this does UPDATE_INTERVAL_BLINKIES = 0.0001 # refresh interval for blinkies in seconds current_leds = [[0] * 3 for _ in range(NUM_LEDS)] target_leds = [[0] * 3 for _ in range(NUM_LEDS)] # Set up our neopixel LED strip led_strip = neopixel.NeoPixel(Pin(LED_PIN), NUM_LEDS) # Asynchronous tasks management animation_task = None quit_animation = False wifi_connected = False mqtt_connected = False STANDALONE_MODE = True ## TRINA-TODO UPDATE # Set up the clock stuffs SECOND_HAND_POS = 0 # Starting position of the second hand MINUTE_HAND_POS = 0 # Starting position of the second hand HOUR_HAND_POS = 0 # Starting position of the second hand LAST_UPDATE = utime.time() # Time of the last update last_drawn_hand = 0 LEDS_PER_CIRCLE = NUM_LEDS//2 timezone_offset_sync = 0 network_list = None if DAYLIGHT_SAVING: timezone_offset_mod = TIMEZONE_OFFSET + 1 gps_loc = [] GPS_SCAN_TIMEOUT_SECONDS = 5 WIFI_SCAN_TIMEOUT_SECONDS = 10 segment_len = NUM_LEDS//3 * 2 network_list = None async def wifi_scan(): global network_list, segment_len while True: networks = wlan.scan() if networks is not None: #print(f"GPS: {gps_string}") #if gps_string is None: #gps_string = "None" print("WiFi Network Count:", len(networks)) if not network_list: network_list = [] segment_len = len(networks) for network in networks: if network not in network_list: network_list.append(network) #ssid = network[0] #print("WiFi Network:", ssid) #print(f"Network obj: {network}") #network_string = str(network) await uasyncio.sleep(WIFI_SCAN_TIMEOUT_SECONDS) # Setup timer interrupt #wifi_scan_timer = machine.Timer(-1) #wifi_scan_timer.init(period=10000, mode=machine.Timer.PERIODIC, callback=wifi_scan) def convert_coordinates(sections): if sections[0] == 0: # sections[0] contains the degrees return None # sections[1] contains the minutes data = sections[0] + (sections[1] / 60.0) # sections[2] contains 'E', 'W', 'N', 'S' if sections[2] == 'S': data = -data if sections[2] == 'W': data = -data data = '{0:.6f}'.format(data) # 6 decimal places return str(data) ## GPS scan task async def gps_scan(): global gps_loc gps_module = UART(1, baudrate=9600, tx=Pin(4), rx=Pin(5)) time_zone = -3 gps = MicropyGPS(time_zone) while True: #print("Checking for GPS updates") length = gps_module.any() if length > 0: data = gps_module.read() gps_loc.append(f"{data}\n") print(data) for byte in data: message = gps.update(chr(byte)) latitude = convert_coordinates(gps.latitude) longitude = convert_coordinates(gps.longitude) if latitude is None or longitude is None: continue print('Lat: ' + latitude) print('Lon: ' + longitude) await uasyncio.sleep(GPS_SCAN_TIMEOUT_SECONDS) # TODO: Explain better batching up our MQTT messages to be shipped off for power saving. See if we can save a local file to pick up on restart async def publish_list_of_mqtt_messages(): global network_list, gps_loc while True: if network_list is not None: for network in network_list: publish_to_mqtt("wifi_data",f"{network},{MQTT_CLIENT_ID}") network_list = None if gps_loc is not None: for message in gps_loc: message_str = str(message) publish_to_mqtt("gps_data",f"{message_str},{MQTT_CLIENT_ID}") gps_loc = [] await uasyncio.sleep(10) def get_time(): return utime.localtime() async def set_time(): global timezone_offset_sync ntptime.host = NTP_SERVER while True: try: cur_time = get_time() print("Local time before synchronization: %s" % str(get_time())) # Make sure to have internet connection ntptime.settime() new_time = get_time() if new_time[6]-cur_time[6] > 1: #we got ahead, need to go back #adjust things to sync with the offset timezone_offset_sync = cur_time[6]-new_time[6] else: timezone_offset_sync = 0 print("Local time after synchronization: %s" % str(get_time())) except Exception as e: print("Error syncing time:", e) await uasyncio.sleep(3600) ticked = False tick_number = 0 def handle_time_message(msg): global SECOND_HAND_POS, LAST_UPDATE, MINUTE_HAND_POS, HOUR_HAND_POS, ticked, tick_number now = utime.time() try: # Print the received message for debugging #print("Received time message:", msg) tick_number_str, time_str = msg.split(',') tick_number = int(tick_number_str.strip()) # Convert tick_number to an integer time_parts = time_str.split(':') if len(time_parts) != 3: print("Unexpected time format:", time_str) return hours, minutes, seconds = [int(part.strip()) for part in time_parts] # Update minute hand position MINUTE_HAND_POS = int((minutes * LEDS_PER_CIRCLE // 60 + LEDS_PER_CIRCLE) % LEDS_PER_CIRCLE) # Update hour hand position (approximation) HOUR_HAND_POS = int(((hours % 12) * LEDS_PER_CIRCLE // 12 + minutes // 12) % LEDS_PER_CIRCLE) #ticks = seconds * 2 SECOND_HAND_POS = int((seconds * LEDS_PER_CIRCLE // 60) % LEDS_PER_CIRCLE) #seconds % NUM_LEDS #SECOND_HAND_POS = int(SECOND_HAND_POS % LEDS_PER_CIRCLE + LEDS_PER_CIRCLE) if SECOND_HAND_POS < NUM_LEDS: SECOND_HAND_POS = int(SECOND_HAND_POS + LEDS_PER_CIRCLE) if MINUTE_HAND_POS < NUM_LEDS: MINUTE_HAND_POS = int(MINUTE_HAND_POS + LEDS_PER_CIRCLE) if HOUR_HAND_POS > LEDS_PER_CIRCLE: HOUR_HAND_POS = int(HOUR_HAND_POS - LEDS_PER_CIRCLE) LAST_UPDATE = utime.time() ticked = True #print(f"Handled time message, {tick_number}, {SECOND_HAND_POS}") #print(f"Handled time message, {tick_number}, {SECOND_HAND_POS}") #print(f"Buzzing haptics!") #drv2605.set_realtime_input(255) #await uasyncio.sleep(0.01) #drv2605.set_realtime_input(0) #print("Handled time message, MINUTE_HAND_POS:", MINUTE_HAND_POS) #print("Handled time message, HOUR_HAND_POS:", HOUR_HAND_POS) except Exception as e: print("Error in handle_time_message:", str(e)) async def handle_ticking(): global SECOND_HAND_POS, LAST_UPDATE, MINUTE_HAND_POS, HOUR_HAND_POS, ticked, tick_number now = utime.time() while True: # Print the received message for debugging #print("Received time message:", msg) #tick_number_str, time_str = msg.split(',') #tick_number = int(tick_number_str.strip()) # Convert tick_number to an integer #time_parts = time_str.split(':') dateTimeObj = utime.localtime() Dyear, Dmonth, Dday, Dhour, Dmin, Dsec, Dweekday, Dyearday = (dateTimeObj) time_parts = [Dhour,Dmin,Dsec] if len(time_parts) != 3: print("Unexpected time format:", time_str) return hours, minutes, seconds = [part for part in time_parts] print(f"Ticked Manually: {Dhour}, {Dmin}, {Dsec}") # Update minute hand position MINUTE_HAND_POS = int((minutes * LEDS_PER_CIRCLE // 60 + LEDS_PER_CIRCLE) % LEDS_PER_CIRCLE) # Update hour hand position (approximation) HOUR_HAND_POS = int(((hours % 12) * LEDS_PER_CIRCLE // 12 + minutes // 12) % LEDS_PER_CIRCLE) #ticks = seconds * 2 SECOND_HAND_POS = int((seconds * LEDS_PER_CIRCLE // 60) % LEDS_PER_CIRCLE) #seconds % NUM_LEDS #SECOND_HAND_POS = int(SECOND_HAND_POS % LEDS_PER_CIRCLE + LEDS_PER_CIRCLE) if SECOND_HAND_POS < NUM_LEDS: SECOND_HAND_POS = int(SECOND_HAND_POS + LEDS_PER_CIRCLE) if MINUTE_HAND_POS < NUM_LEDS: MINUTE_HAND_POS = int(MINUTE_HAND_POS + LEDS_PER_CIRCLE) if HOUR_HAND_POS > LEDS_PER_CIRCLE: HOUR_HAND_POS = int(HOUR_HAND_POS - LEDS_PER_CIRCLE) LAST_UPDATE = utime.time() ticked = True await uasyncio.sleep_ms(1000) # TODO: Move these to a better place pause_animation = False pause_timeout = 0 def hex_to_rgb(hex_str): hex_str = hex_str.lstrip('#') return tuple(int(hex_str[i:i+2], 16) for i in (0, 2, 4)) def normalize_color(r, g, b, max_value=BRIGHTNESS): max_current = max(r, g, b) if max_current <= max_value: return r, g, b # No need to normalize scale_factor = max_value / max_current return round(r * scale_factor), round(g * scale_factor), round(b * scale_factor) def make_leds_color(color_hex="770000,4"): global current_color, pause_animation, pause_timeout data = color_hex.split(",") pause_animation = True pause_timeout = float(data[1]) current_color = data[0] r, g, b = hex_to_rgb(current_color) #print(f"make_leds_color {r},{g},{b}") for i in range(NUM_LEDS): led_strip[i] = (r, g, b) led_strip.write() # Update the strip ## TODO: Better document and move this section which is for telling you Wifi isn't connected # Set the range of pins we want width_of_wifi_status_leds = 2 # Figure out where our center LED for the wifi indicator will be wifi_led_center = NUM_LEDS/2 - NUM_LEDS/4 wifi_status_cur_led = wifi_led_center wifi_status_led_range = [wifi_led_center-width_of_wifi_status_leds, wifi_led_center+width_of_wifi_status_leds] # Start by using the range from the middle of the wifi status indicator to the edge. It'll be 5 wide and pulse back and forth with no more than three on at a time. # def hue_offset(index, offset, divisor = 2): return (float(index) / (NUM_LEDS // divisor) + offset) % 1.0 # Global variable for direction change direction_change = False loop_count = 0 hue_cache = {} def update_strip(position, length, cycle, direction, hue_increment): global hue_cache # Turn off all LEDs for i in range(NUM_LEDS): led_strip[i] = (0, 0, 0) # Base hue adjusted by the cycle for dynamic coloring base_hue = (cycle / MAX_COLOR_CYCLE) % 1 # Ensure hue is between 0 and 1 # Adjust the increment based on the direction of the chase hue_increment = hue_increment * (1 if direction == 1 else -1) # Turn on LEDs in the specified segment with a trailing rainbow effect for offset in range(length): # Calculate hue based on direction current_hue = (base_hue + offset * hue_increment) % 1 # Wrap around the hue color = hsv_to_rgb(current_hue, 1, BRIGHTNESS) idx = (position + offset) % NUM_LEDS # Handle LED index wrap-around led_strip[idx] = color led_strip.write() async def set_leds(led_settings): global ticked """ Set the LEDs based on a list of settings. Each setting in the list should be in the format [LED, H, S, V, SleepTime]. """ for setting in led_settings: led_index, hue, saturation, value, sleep_time = setting # Convert HSV to RGB r, g, b = hsv_to_rgb(hue, saturation, value) # Convert float RGB values (0 to 1 range) to integer RGB (0 to 255 range) r, g, b = int(r * 255), int(g * 255), int(b * 255) # Set the LED color #r,g,b = normalize_color(r,g,b) #print(f"set_leds {r},{g},{b}") led_strip[led_index] = (r, g, b) led_strip.write() #if ticked: #print("Cancel this run!") #return False, led_settings # Return the remaining settings return True, None def hsv_to_rgb(h, s, v): if s == 0.0: return (v, v, v) i = int(h*6.) f = (h*6.)-i p,q,t = int(255*(v*(1.-s))), int(255*(v*(1.-s*f))), int(255*(v*(1.-s*(1.-f)))) v = int(255*v) i %= 6 if i == 0: return (v, t, p) if i == 1: return (q, v, p) if i == 2: return (p, v, t) if i == 3: return (p, q, v) if i == 4: return (t, p, v) if i == 5: return (v, p, q) # Interrupt handler function def toggle_direction(timer): global direction_change, ticked, STANDALONE_MODE toggle_direction = not direction_change if STANDALONE_MODE: ticked = True # Setup timer interrupt timer = machine.Timer(-1) #timer.init(period=500, mode=machine.Timer.PERIODIC, callback=toggle_direction) SEGMENT_LENGTH = NUM_LEDS//3 #NUM_LEDS // 3 seg_length = 0 MAX_COLOR_CYCLE = 360 # Maximum value for the color cycle position = 0 async def chase(): global direction_change, loop_count, quit_animation, pause_animation, pause_timeout, position, ticked, segment_len direction = -1 # Start with any direction, -1 or 1 cycle = 1 hue_increment = 60.0 / 360 while not quit_animation: if pause_animation: # print(f"Pausing chase for {pause_timeout} seconds") await uasyncio.sleep(pause_timeout) pause_timeout = 0 pause_animation = False update_strip(position, segment_len, cycle, direction, hue_increment) # Pass direction to update_strip # Update the position and cycle position += direction cycle += 1 if cycle > MAX_COLOR_CYCLE: cycle = 1 # Check for direction change if ticked: direction *= -1 ticked = False await uasyncio.sleep(.1) # Non-blocking sleep # TODO: Move this to be by the other time stuff def get_clock_hand_positions(): # Get the current time t = utime.localtime() hour = t.tm_hour % 12 # Convert to 12-hour format minute = t.tm_min second = t.tm_sec # Calculate positions hour_pos = int((hour / 12.0) * NUM_LEDS) minute_pos = int((minute / 60.0) * NUM_LEDS) second_pos = int((second / 60.0) * NUM_LEDS) return hour_pos, minute_pos, second_pos # TODO: Document how the rainbows work SPEED = 5 UPDATES = 1000 async def rainbows(timeout_mod = 0): global SPEED, UPDATES offset = 0.0 run_on_timeout = False if timeout_mod > 0: run_on_timeout = True timeout = 150 start_time = utime.time() print(f"Making it all RAINBOWS up in here one sec...") while True: current_time = utime.time() if current_time - start_time > timeout and run_on_timeout: uasyncio.create_task(run_animation("chase")) break SPEED = min(255, max(1, SPEED)) offset += float(SPEED) / 500.0 pins_to_skip = set() if not wifi_connected: pins_to_skip.update(range(int(wifi_status_led_range[0]), int(wifi_status_led_range[1]))) if mqtt_connected: hour_hand_positions = [(HOUR_HAND_POS + offset) % NUM_LEDS for offset in range(-2, 3)] minute_hand_positions = [(MINUTE_HAND_POS + offset) % NUM_LEDS for offset in range(-1, 2)] pins_to_skip.update(hour_hand_positions) pins_to_skip.update(minute_hand_positions) # Setting second hand on both rows second_hand_positions = [SECOND_HAND_POS % NUM_LEDS] second_hand_hue = (hue_offset(SECOND_HAND_POS, offset) + 0.5) % 1.0 complementary_hue = (hue_offset(0, offset) + 0.5) % 1.0 #hue = float(i) / (NUM_LEDS // 2) rgb = hsv_to_rgb(complementary_hue, 1.0, BRIGHTNESS) rgb_int = tuple(int(c * 255) for c in rgb) for pos in second_hand_positions: led_strip[pos] = rgb_int #led_strip.set_hsv(pos, second_hand_hue, 1.0, 1.0) pins_to_skip.add(pos) complementary_hue = (hue_offset(pos, offset) + 0.5) % 1.0 #hue = float(i) / (NUM_LEDS // 2) rgb = hsv_to_rgb(complementary_hue, 1.0, BRIGHTNESS) rgb_int = tuple(int(c * 255) for c in rgb) # Setting hour and minute hands with offset hue for pos in hour_hand_positions + minute_hand_positions: led_strip[pos] = rgb_int #led_strip.set_hsv(pos, complementary_hue, 1.0, 1.0) pins_to_skip.add(pos) for i in range(NUM_LEDS): if i in pins_to_skip: continue hue = hue_offset(i, offset) rgb = hsv_to_rgb(hue + offset, 1.0, BRIGHTNESS) #rgb_int = tuple(int(c * BRIGHTNESS * 255) for c in rgb) r = rgb[0] g = rgb[1] b = rgb[2] #print(f"Rainbows {r},{g},{b}") led_strip[i] = rgb #led_strip.set_hsv(i, hue, 1.0, BRIGHTNESS) led_strip.write() await uasyncio.sleep(0.01) # TODO: Document how the rainbows work SPEED = 5 UPDATES = 1000 async def movie_ticker(timeout_mod = 0): border_led_position = [0,1,2,3,4,5,6,7,8,23,24,39,40,55,56,57,58,59,60,61,62,63,48,47,32,31,16,15] global SPEED, UPDATES offset = 0.0 run_on_timeout = False if timeout_mod > 0: run_on_timeout = True timeout = 150 start_time = utime.time() print(f"Making it SHOW TIME up in here one sec...") while True: current_time = utime.time() if current_time - start_time > timeout and run_on_timeout: uasyncio.create_task(run_animation("chase")) break SPEED = min(255, max(1, SPEED)) offset += float(SPEED) / 500.0 pins_to_skip = set() for i in border_led_position: if i not in border_led_position: continue hue = hue_offset(i, offset) rgb = hsv_to_rgb(hue + offset, 1.0, BRIGHTNESS) #rgb_int = tuple(int(c * BRIGHTNESS * 255) for c in rgb) r = rgb[0] g = rgb[1] b = rgb[2] #print(f"Rainbows {r},{g},{b}") led_strip[i] = rgb #led_strip.set_hsv(i, hue, 1.0, BRIGHTNESS) led_strip.write() await uasyncio.sleep(0.01) async def i_dont_know_why_this_works(color=1): global current_color, loop_count hue_1, hue_2 = (100, 220) if color == "1" else (0, 45) if color == "2" else (150, 180) # Assuming BRIGHTNESS is now a value between 0 and 255 brightness_scale = MAX_SOLID_BRIGHTNESS / 255.0 # Convert to 0-1 scale while True: for i in range(NUM_LEDS): cycle = hue_1 if i % 2 == 0 else hue_2 update_strip(i, 1, cycle / MAX_COLOR_CYCLE) # Update each LED individually led_strip.write() await uasyncio.sleep(UPDATE_INTERVAL_BLINKIES) for i in range(NUM_LEDS): cycle = hue_2 if i % 2 == 0 else hue_1 update_strip(i, 1, cycle / MAX_COLOR_CYCLE) # Update each LED individually led_strip.write() await uasyncio.sleep(UPDATE_INTERVAL_BLINKIES) # Transition to the rainbows animation animation_task = uasyncio.create_task(rainbows()) await animation_task async def alternating_blinkies(color="1"): if color == "1": hue_1, hue_2 = 50, 220 elif color == "2": hue_1, hue_2 = 0, 30 elif color == "3": hue_1, hue_2 = 220, 30 else: hue_1, hue_2 = 50, 100 # Assuming BRIGHTNESS is now a value between 0 and 255 brightness_scale = BRIGHTNESS # Convert to 0-1 scale start_time = utime.time() timeout = 60 while True: current_time = utime.time() if current_time - start_time > timeout: break # First pattern led_settings = [] for i in range(NUM_LEDS): hue = hue_1 if i % 2 == 0 else hue_2 led_settings.append([i, hue / 360, 1.0, BRIGHTNESS, 0]) # Sleep time is set to 0 await set_all_leds_once(led_settings) await uasyncio.sleep(UPDATE_INTERVAL_BLINKIES) # Second pattern led_settings = [] for i in range(NUM_LEDS): hue = hue_2 if i % 2 == 0 else hue_1 led_settings.append([i, hue / 360, 1.0, BRIGHTNESS, 0]) # Sleep time is set to 0 await set_all_leds_once(led_settings) await uasyncio.sleep(UPDATE_INTERVAL_BLINKIES) uasyncio.create_task(run_animation(STARTING_ANIMATION)) async def set_all_leds_once(led_settings): for setting in led_settings: led_index, hue, saturation, value, _ = setting # Ignore sleep time here r, g, b = hsv_to_rgb(hue, saturation, value) r, g, b = int(r * 255), int(g * 255), int(b * 255) #r,g,b = normalize_color(r,g,b) #print(f"set_all_leds_at_once {r},{g},{b}") led_strip[led_index] = (r, g, b) led_strip.write() async def run_animation(animation_name, color=1): global quit_animation global animation_task #quit_animation = False print(f"Running animation {animation_name}..") if animation_task: # quit_animation = True animation_task.cancel() if animation_name == "alternating_blinkies": animation_task = uasyncio.get_event_loop().create_task(alternating_blinkies(color)) elif animation_name == "rainbows": animation_task = uasyncio.get_event_loop().create_task(rainbows()) elif animation_name == "chase": animation_task = uasyncio.get_event_loop().create_task(chase()) elif animation_name == "idk": animation_task = uasyncio.get_event_loop().create_task(i_dont_know_why_this_works()) elif animation_name == "movie_ticker": animation_task = uasyncio.get_event_loop().create_task(movie_ticker()) await animation_task def update_file_from_mqtt_message(msg_string): print(f"Starting update process for {msg_string}...") update_file_replace(msg_string) def sub_cb(topic, msg): msg_string = msg.decode("UTF-8") #print(f"Received message: '{msg_string}' on topic: '{topic}'") # Debugging output if topic == b'color_change': print("Changing LED color...") # Debugging output make_leds_color(msg_string) elif topic == b'scores': print("Processing scores...") # Debugging output data = msg_string.split(",") game_outcome = data[1] result_match = 0 team = data[0] print(f"Team: {team}, Game Outcome: {game_outcome}") # Debugging output if team == TEAM_ASSIGNED: print("Running alternating blinkies animation...") # Debugging output if game_outcome == "start": uasyncio.create_task(run_animation("rainbows")) result_match = "1" elif game_outcome == "tie": result_match = "3" elif game_outcome == "loss": result_match = "2" elif game_outcome == "win": result_match = "1" uasyncio.create_task(run_animation("alternating_blinkies", result_match)) elif topic == b'animate': print("Running custom animation...") # Debugging output data = msg_string.split(",") animation_string = data[0] color_blinkies = data[1] if len(data) > 1 else None print(f"Animation: {animation_string}, Color: {color_blinkies}") # Debugging output uasyncio.create_task(run_animation(animation_string, color_blinkies)) elif topic == b'audio_reactive': uasyncio.create_task(handle_audio_data(msg_string)) elif topic == b'time': handle_time_message(msg_string) elif topic == b'update': print(f"Triggered msg string {msg_string}") #update_file_from_mqtt_message(msg_string) async def mqtt_task(client): while True: try: client.check_msg() await uasyncio.sleep(0.5) except Exception as e: print(f"Errors checking messages: {e}") reset() async def connect_to_wifi(): global wifi_connected global wlan wifi_connected = False # TODO update this to look for all networks in the list WIFI_LIST=[["WhyFhy","WhyKnot42!"],["IoT","1234567890"]] and try to connect to the ones it finds # set up wifi connection_attempts=0 try: #status_handler("Scanning for your wifi network one sec") # This is being moved to setup so wifi scanning will still work in the background # Setup should be done globally now oops #wlan = network.WLAN(network.STA_IF) #wlan.active(True) nets = wlan.scan() for net in nets: print(f'Network seen: {net}') for network_config in WIFI_LIST: ssid_to_find = network_config[0] if ssid_to_find == net[0].decode('utf-8'): print(f'Network found! {ssid_to_find}') print(f"Attempting to connect to SSID: {ssid_to_find}") if len(network_config) == 0: wlan.connect(ssid_to_find) else: wlan.connect(ssid_to_find, network_config[1]) while not wlan.isconnected(): #await status_handler(f"Waiting to connect to the network: {ssid_to_find}...") connection_attempts += 1 #await uasyncio.sleep(1) if connection_attempts > MAX_WIFI_CONNECT_TIMEOUT: print("Exceeded MAX_WIFI_CONNECT_TIMEOUT!!!") break wifi_connected = True print('WLAN connection succeeded!') break else: print(f"Unable to find SSID: {ssid_to_find}") if wifi_connected: break except Exception as e: print(f"Setup failed: {e}") # Status handler function row_one = False async def status_handler(message): global wifi_connected, row_one print(message) if row_one: print(f"Row one") for i in range(NUM_LEDS//2): led_strip[i] = (0, 0, 0) await uasyncio.sleep(NUM_LEDS // 2 * 0.0005) else: print(f"Row Two!!") for i in range(NUM_LEDS//2, NUM_LEDS): led_strip[i] = (0, 0, 0) await uasyncio.sleep(NUM_LEDS // 2 * 0.0005) led_strip.write() row_one = not row_one if row_one: for i in range(NUM_LEDS//2): make_leds_color("008800") await uasyncio.sleep(NUM_LEDS//2 * 0.001) else: for i in range(NUM_LEDS//2, NUM_LEDS): led_strip[i] = (100, 100, 100) await uasyncio.sleep(NUM_LEDS//2 * 0.001) led_strip.write() def connectMQTT(): global mqtt_connected client = MQTTClient( client_id=MQTT_CLIENT_ID, server=MQTT_SERVER, port=0, user=MQTT_USERNAME, password=MQTT_PASSWORD, keepalive=0 ) client.set_callback(sub_cb) try: client.connect() mqtt_connected = True except Exception as e: print('Error connecting to %s MQTT broker error: %s' % (MQTT_SERVER, e)) topics = [b'time',b'color_change', b'scores', b'animate', b'audio_reactive', b'chase', b'update'] if mqtt_connected: for topic in topics: try: client.subscribe(topic) print('Connected to {} MQTT broker, subscribed to {} topic'.format(MQTT_SERVER, topic.decode())) except Exception as e: print('Error subscribing to %s topic! Error: %s' % (topic.decode(), e)) return client def publish_to_mqtt(topic, message): try: client.publish(topic, message) #print('Message published to topic {}: {}'.format(topic, message)) except Exception as e: print(f'Error publishing message to topic {topic}: {message}, {e}') async def setup_wireless(): global wifi_connected global mqtt_connected global wlan try: if not wifi_connected: for i in range(0, 3): await connect_to_wifi() if wifi_connected: print('Wifi connection successful!') wifi_status = "connected" for _ in range(2): # Flash red green times make_leds_color(color_hex="000900,0.25") time.sleep(0.5) make_leds_color(color_hex="000000,0.25") time.sleep(0.5) make_leds_color(color_hex="09000F,0.25") break else: print(f'Wifi connection failed!') wifi_status = "failed" for _ in range(2): # Flash red three times make_leds_color(color_hex="090000,0.25") time.sleep(.5) make_leds_color(color_hex="000000,0.25") time.sleep(0.5) except Exception as e: print(f'Wifi connection failed! {e}') wifi_status = "failed" wifi_connected = False for _ in range(4): # Flash red three times make_leds_color(color_hex="090000,0.25") time.sleep(.5) make_leds_color(color_hex="000000,0.25") time.sleep(0.5) # if no wifi, then you get... if wifi_connected: set_time() counter = 0 for i in range(0,5): try: await uasyncio.sleep(2) print("Attempting to connect to MQTT broker...") client = connectMQTT() if mqtt_connected: for _ in range(2): # Flash red green times make_leds_color(color_hex="000F0F,0.25") time.sleep(0.25) make_leds_color(color_hex="000000,0.25") time.sleep(0.25) make_leds_color(color_hex="000F0F,0.25") mqtt_connected = True else: print(f'MQTT connection failed!') wifi_status = "failed" for _ in range(2): # Flash red three times make_leds_color(color_hex="080000,0.25") time.sleep(.5) make_leds_color(color_hex="000000,0.25") time.sleep(0.5) mqtt_connected = False #make_leds_color(color_hex="005500,2") return client except Exception as e: print("Failed to connect to MQTT: %s" % e) #make_leds_color(color_hex="FF0000,2") # async def check_connections(): global wifi_connected, mqtt_connected while True: if not wlan.isconnected(): mqtt_connected = False print('WiFi disconnected, attempting to reconnect WiFi and MQTT...') await setup_wireless() await uasyncio.sleep(10) # Check connection status every 10 seconds def start_wifi_card(): global wlan print("Kicking on the wifi card one sec..") try: wlan = network.WLAN(network.STA_IF) wlan.active(True) print("WiFi card initialized") except: print("Errors initializing WiFi") async def main(): global wifi_connected, mqtt_connected global client, stand_alone # Run our beginning animation print(f"Turning on the starting animation: {STARTING_ANIMATION}") uasyncio.create_task(run_animation(STARTING_ANIMATION)) # Start the Wifi Card start_wifi_card() uasyncio.create_task(handle_ticking()) # Enable if you want to scan for wifi #uasyncio.create_task(wifi_scan()) # If we're not in standalone mode IE No WiFi mode go ahead and try to connect to a network if not STANDALONE_MODE: max_attempts = 3 for x in range(0,3): if not wifi_connected or not mqtt_connected: client = await setup_wireless() else: break if wifi_connected: # This is janky, need to make it work better uasyncio.create_task(check_connections()) # This will reach out to the interwebs to grab the time remotely every hour uasyncio.create_task(set_time()) # This is how you check in with my server for updates! if mqtt_connected: uasyncio.create_task(mqtt_task(client)) uasyncio.create_task(publish_list_of_mqtt_messages()) while True: await uasyncio.sleep_ms(1) uasyncio.run(main())