| def show(): |
| import random |
| import time |
|
|
| |
| from os import environ |
| import requests |
| import streamlit as st |
|
|
| MONGODB_URI = environ["MONGODB_URI"] |
|
|
| HIVEMQ_BASE_URL = environ["HIVEMQ_BASE_URL"] |
| HIVEMQ_API_TOKEN = environ["HIVEMQ_API_TOKEN"] |
|
|
| from pymongo.mongo_client import MongoClient |
|
|
| |
| |
| |
| microscope = "microscope2" |
| access_time = 180 |
| database_name = "openflexure-microscope" |
| collection_name = "Cluster0" |
| microscopes = [ |
| "microscope", |
| "microscope2", |
| "deltastagetransmission", |
| "deltastagereflection", |
| ] |
|
|
| client = MongoClient(MONGODB_URI) |
| db = client[database_name] |
| collection = db[collection_name] |
|
|
| try: |
| client.admin.command("ping") |
| print("Pinged your deployment. You successfully connected to MongoDB!") |
| except Exception as e: |
| st.write(e) |
|
|
| def check_variable(variable_name): |
| try: |
|
|
| document = collection.find_one({"variable_name": variable_name}) |
| if document: |
| return document.get("value", "Variable not found.") |
| else: |
| return "Variable not found in the collection." |
| except Exception as e: |
| return f"An error occurred: {e}" |
|
|
| def create_user(username, password): |
| api_url = HIVEMQ_BASE_URL + "/mqtt/credentials" |
| headers = { |
| "Authorization": f"Bearer {HIVEMQ_API_TOKEN}", |
| "Content-Type": "application/json", |
| } |
|
|
| new_user = {"credentials": {"username": username, "password": password}} |
|
|
| requests.post(api_url, json=new_user, headers=headers) |
|
|
| def delete_user(username): |
| headers = { |
| "Authorization": f"Bearer {HIVEMQ_API_TOKEN}", |
| "Content-Type": "application/json", |
| } |
|
|
| api_url = HIVEMQ_BASE_URL + "/mqtt/credentials/username/" + username |
| requests.delete(api_url, headers=headers) |
|
|
| def role_user(username, role): |
| headers = { |
| "Authorization": f"Bearer {HIVEMQ_API_TOKEN}", |
| "Content-Type": "application/json", |
| } |
| api_url = HIVEMQ_BASE_URL + "/user/" + username + "/roles/" + role + "/attach" |
| requests.put(api_url, headers=headers) |
|
|
| def update_variable(variable_name, new_value): |
| try: |
| result = collection.update_one( |
| {"variable_name": variable_name}, |
| {"$set": {"value": new_value}}, |
| upsert=True, |
| ) |
| if result.matched_count > 0: |
| return "Variable updated successfully." |
| else: |
| return "Variable created and updated successfully." |
| except Exception as e: |
| return f"An error occurred: {e}" |
|
|
| def update_variable_test(): |
| update_variable(microscope, random.randint(1, 10)) |
|
|
| def check_variable_test(): |
| st.write(check_variable(microscope)) |
|
|
| def get_current_time(): |
| |
| |
| |
| |
| |
| |
| |
| |
| unix_time = int(time.time()) |
| return unix_time |
|
|
| def button(): |
| st.session_state.button_clicked = True |
|
|
| if "button_clicked" not in st.session_state: |
| st.session_state.button_clicked = False |
| if "previous_selected_value" not in st.session_state: |
| st.session_state.previous_selected_value = microscopes[1] |
|
|
| st.write(f"Keys will last {900/60} minutes before being overridable") |
| st.write("Usernames:") |
| st.code( |
| """ |
| microscope -> microscopeclientuser |
| microscope2 -> microscope2clientuser |
| deltastagereflection -> deltastagereflectionclientuser |
| deltastagetransmission -> deltastagetransmissionclientuser |
| """ |
| ) |
|
|
| microscope = st.selectbox( |
| "Choose a microscope:", microscopes, index=microscopes.index("microscope2") |
| ) |
| if microscope != st.session_state.get("previous_selected_value", microscope): |
|
|
| st.session_state.button_clicked = False |
|
|
| st.session_state["previous_selected_value"] = microscope |
|
|
| st.button( |
| "Request temporary access", |
| help="If somebody is using the microscope, you will need to wait", |
| on_click=button, |
| ) |
|
|
| if 'last_key' not in st.session_state: |
| st.session_state.last_key = "No generated keys!" |
| st.success("Last key you generated (may not still be valid): "+st.session_state.last_key) |
| |
| if st.session_state.button_clicked: |
| display_text = st.empty() |
| ctime = get_current_time() |
| var = check_variable(microscope) |
| if ctime >= var + access_time: |
|
|
| access_key = "Microscope" + str(random.randint(10000000, 99999999)) |
| delete_user(microscope + "clientuser") |
| create_user(microscope + "clientuser", access_key) |
| if microscope == "microscope2": |
| role_user(microscope + "clientuser", "3") |
| elif microscope == "microscope": |
| role_user(microscope + "clientuser", "4") |
| elif microscope == "deltastagereflection": |
| role_user(microscope + "clientuser", "5") |
| elif microscope == "deltastagetransmission": |
| role_user(microscope + "clientuser", "6") |
|
|
| display_text.success( |
| "Access key: " + access_key |
| ) |
| st.session_state.last_key = access_key |
| update_variable(microscope, ctime) |
|
|
| else: |
| while True: |
| if access_time - ctime + var <= 0: |
| display_text.success("Access key ready!") |
| break |
| if (access_time - ctime + var) % 60 < 10: |
| seconds = "0" + str((access_time - ctime + var) % 60) |
| else: |
| seconds = str((access_time - ctime + var) % 60) |
| display_text.error( |
| "Please wait " |
| + str( |
| int( |
| ( |
| access_time |
| - ctime |
| + var |
| - (access_time - ctime + var) % 60 |
| ) |
| / 60 |
| ) |
| ) |
| + ":" |
| + seconds |
| ) |
|
|
| ctime = ctime + 1 |
| if ctime % 15 == 0: |
| ctime = get_current_time() + 1 |
| time.sleep(1) |
| while True: |
| time.sleep(5) |
| cutime = get_current_time() |
| var = check_variable(microscope) |
| if cutime <= var + access_time: |
| display_text.error("The access key was taken!") |
| break |
| time.sleep(10) |
|
|