LAMBDA / kernel.py
maryxz933's picture
Upload folder using huggingface_hub
5a88bac verified
import queue
import re
import base64
from io import BytesIO
import re
import os
from typing import Tuple, Any
import jupyter_client
from PIL import Image
from subprocess import PIPE
from pprint import pprint
import nbformat
from nbformat import v4 as nbf
import time
import ansi2html
from utils.utils import check_install_kernel
IPYKERNEL = os.environ.get('IPYKERNEL', 'lambda')
class CodeKernel(object):
def __init__(self,
kernel_name='kernel',
kernel_id=None,
kernel_config_path="",
python_path=None,
ipython_path=None,
init_file_path="./startup.py",
session_cache_path="",
max_exe_time=18000,
verbose=1):
self.kernel_name = kernel_name
self.kernel_id = kernel_id
self.kernel_config_path = kernel_config_path
self.python_path = python_path
self.ipython_path = ipython_path
self.init_file_path = init_file_path
self.session_cache_path = session_cache_path
# self.executed_cells = []
self.max_exe_time = max_exe_time
self.nb = nbf.new_notebook()
self.nb_path = os.path.join(session_cache_path, 'notebook.ipynb')
self.verbose = verbose
self.interrupt_signal = False
if python_path is None and ipython_path is None:
env = None
else:
env = {"PATH": self.python_path + ":$PATH", "PYTHONPATH": self.python_path}
check_install_kernel('lambda')
self.kernel_manager = jupyter_client.KernelManager(
kernel_name=IPYKERNEL,
connection_file=self.kernel_config_path,
exec_files=[self.init_file_path],
env=env)
# if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
# self.kernel_manager = jupyter_client.KernelManager(
# connection_file=self.kernel_config_path,
# exec_files=[self.init_file_path],
# env=env)
# else:
# self.kernel_manager = jupyter_client.KernelManager(
# kernel_name=IPYKERNEL,
# connection_file=self.kernel_config_path,
# exec_files=[self.init_file_path],
# env=env)
if self.kernel_config_path:
self.kernel_manager.load_connection_file()
self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
print("Backend kernel started with the configuration: {}".format(
self.kernel_config_path))
else:
self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
print("Backend kernel started with the configuration: {}".format(
self.kernel_manager.connection_file))
if verbose:
pprint(self.kernel_manager.get_connection_info())
self.kernel = self.kernel_manager.blocking_client()
self.kernel.start_channels()
print("Code kernel started.")
def execute_code_(self, code):
msg_id = self.kernel.execute(code)
# Get the output of the code
msg_list = []
while True:
try:
iopub_msg = self.kernel.get_iopub_msg(timeout=self.max_exe_time)
msg_list.append(iopub_msg)
if iopub_msg['msg_type'] == 'status' and iopub_msg['content'].get('execution_state') == 'idle':
break
except:
if self.interrupt_signal:
self.kernel_manager.interrupt_kernel()
self.interrupt_signal = False
continue
all_output = []
# sign = None
for iopub_msg in msg_list:
if iopub_msg['msg_type'] == 'stream':
if iopub_msg['content'].get('name') == 'stdout':
output = iopub_msg['content']['text']
all_output.append(('stdout', output))
elif iopub_msg['msg_type'] == 'execute_result':
if 'data' in iopub_msg['content']:
if 'text/plain' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/plain']
all_output.append(('execute_result_text', output))
if 'text/html' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/html']
all_output.append(('execute_result_html', output))
if 'image/png' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/png']
all_output.append(('execute_result_png', output))
save_b64_2_img(output, self.session_cache_path)
if 'image/jpeg' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/jpeg']
all_output.append(('execute_result_jpeg', output))
save_b64_2_img(output, self.session_cache_path)
elif iopub_msg['msg_type'] == 'display_data':
if 'data' in iopub_msg['content']:
if 'text/plain' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/plain']
all_output.append(('display_text', output))
if 'text/html' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/html']
all_output.append(('display_html', output))
if 'image/png' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/png']
all_output.append(('display_png', output))
save_b64_2_img(output, self.session_cache_path)
if 'image/jpeg' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/jpeg']
all_output.append(('display_jpeg', output))
save_b64_2_img(output, self.session_cache_path)
elif iopub_msg['msg_type'] == 'error':
if 'traceback' in iopub_msg['content']:
output = '\n'.join(iopub_msg['content']['traceback'])
all_output.append(('error', output))
# print("len of console messages: " + str(len(all_output)))
return all_output
def execute_code(self, code) -> Tuple[
list, str, str]: # list[list, list, list]: # Return: 1. sginal of resut, eg: text, error. 2. test to LLM. 3. The content to display.
text_to_llm = ["Summary of console output:\n"]
sign = list()
content_to_display = []
images = []
result = self.execute_code_(code)
self.add_code_cell_to_notebook(code)
# print("Console output: " ,content_to_display)
for mark, out_str in result:
if mark in ('stdout', 'execute_result_text', 'display_text'):
sign.append('text') # sign.append(mark)
text_to_llm.append(out_str)
content_to_display.append(out_str)
self.add_code_cell_output_to_notebook(out_str)
elif mark in ('execute_result_png', 'execute_result_jpeg', 'display_png', 'display_jpeg'):
sign.append("image")
text_to_llm.append(f'Generated an image file in {self.session_cache_path}.')
# content_to_display.append(out_str)
if 'png' in mark:
images.append(('png', out_str))
self.add_image_to_notebook(out_str, 'image/png')
else:
images.append(('jpg', out_str))
self.add_image_to_notebook(out_str, 'image/jpeg')
elif mark == 'error':
text_to_llm.append(delete_color_control_char(out_str)) # the error msg gave to LLM should be clean
sign.append('error')
self.add_code_cell_error_to_notebook(out_str)
return sign, '\n'.join(text_to_llm), '\n'.join(content_to_display) # '\n'.join(text_to_gpt), content_to_display
# return sign, '\n'.join(text_to_llm), '\n'.join(content_to_display)
# def export(self, file_path):
# # nb = nbf.v4.new_notebook()
# # nb.cells = self.executed_cells
# # print("cell: ", nb.cells)
# with open(file_path, 'w', encoding='utf-8') as f:
# nbf.write(nb, f)
# print(f"Notebook exported to {file_path}")
# def export(self, file_path):
# # nb = nbf.v4.new_notebook()
# # nb.cells = self.executed_cells
# # print("cell: ", nb.cells)
# with open(file_path, 'w', encoding='utf-8') as f:
# nbf.write(nb, f)
# print(f"Notebook exported to {file_path}")
def execute_interactive(self, code, verbose=False):
shell_msg = self.kernel.execute_interactive(code)
if shell_msg is queue.Empty:
if verbose:
print("Timeout waiting for shell message.")
self.check_msg(shell_msg, verbose=verbose)
return shell_msg
def inspect(self, code, verbose=False):
msg_id = self.kernel.inspect(code)
shell_msg = self.kernel.get_shell_msg(timeout=30)
if shell_msg is queue.Empty:
if verbose:
print("Timeout waiting for shell message.")
self.check_msg(shell_msg, verbose=verbose)
return shell_msg
def get_error_msg(self, msg, verbose=False):
if msg['content']['status'] == 'error':
try:
error_msg = msg['content']['traceback']
except:
try:
error_msg = msg['content']['traceback'][-1].strip()
except:
error_msg = "Traceback Error"
if verbose:
print("Error: ", error_msg)
return error_msg
return None
def check_msg(self, msg, verbose=False):
status = msg['content']['status']
if status == 'ok':
if verbose:
print("Execution succeeded.")
elif status == 'error':
for line in msg['content']['traceback']:
if verbose:
print(line)
def shutdown(self):
# Shutdown the backend kernel
self.kernel_manager.shutdown_kernel(now=True)
print("Backend kernel shutdown.")
# Shutdown the code kernel
self.kernel.shutdown()
print("Code kernel shutdown.")
def restart(self):
# Restart the backend kernel
self.kernel_manager.restart_kernel()
print("Backend kernel restarted.")
def start(self):
# Initialize the code kernel
self.kernel = self.kernel_manager.blocking_client()
# self.kernel.load_connection_file()
self.kernel.start_channels()
print("Code kernel started.")
def interrupt(self):
# Interrupt the backend kernel
self.kernel_manager.interrupt_kernel()
print("Backend kernel interrupted.")
def is_alive(self):
return self.kernel.is_alive()
def add_code_cell_to_notebook(self, code):
code_cell = nbf.new_code_cell(source=code)
self.nb['cells'].append(code_cell)
def add_code_cell_output_to_notebook(self, output):
html_content = ansi_to_html(output)
cell_output = nbf.new_output(output_type='display_data', data={'text/html': html_content})
self.nb['cells'][-1]['outputs'].append(cell_output)
def add_code_cell_error_to_notebook(self, error):
nbf_error_output = nbf.new_output(
output_type='error',
ename='Error',
evalue='Error message',
traceback=[error]
)
self.nb['cells'][-1]['outputs'].append(nbf_error_output)
def add_image_to_notebook(self, image, mime_type):
image_output = nbf.new_output(output_type='display_data', data={mime_type: image})
self.nb['cells'][-1]['outputs'].append(image_output)
def add_markdown_to_notebook(self, content, title=None):
if title:
content = "##### " + title + ":\n" + content
markdown_cell = nbf.new_markdown_cell(content)
self.nb['cells'].append(markdown_cell)
def write_to_notebook(self, notebook_path):
with open(notebook_path, 'w', encoding='utf-8') as f:
nbformat.write(self.nb, f)
print(f"Notebook exported to {notebook_path}")
def ansi_to_html(ansi_text):
converter = ansi2html.Ansi2HTMLConverter()
html_text = converter.convert(ansi_text)
return html_text
def delete_color_control_char(string):
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
return ansi_escape.sub('', string)
def save_b64_2_img(data, path):
bs64_img = base64.b64decode(data)
img_path = os.path.join(path, f"{hash(time.time())}.png")
with open(img_path, 'wb') as f:
f.write(bs64_img)
print(f"Executing: Image saved in {img_path}")
return f"Image saved in {img_path}" # Image.open(buff)
def clean_ansi_codes(input_string):
ansi_escape = re.compile(r'(\x9B|\x1B\[|\u001b\[)[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', input_string)
def execute(code, kernel: CodeKernel):
msg = kernel.execute_code(code)
return msg
# @st.cache_resource
def get_kernel():
kernel = CodeKernel()
return kernel
if __name__ == '__main__':
kernel = CodeKernel(session_cache_path="./cache/cache_test/")
text_code = "print('Hello world!')"
print(text_code, kernel)
table_code = """
import pandas as pd
data = pd.read_csv('data/wine.csv')
data.head()
"""
img_code = """
import matplotlib.pyplot as plt
x = [1, 2, 3, 4, 5]
y = [2, 3, 5, 7, 6]
plt.plot(x, y)
plt.title('Simple Line Plot')
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.show()
"""
file_code = """
text_content = "This is file test."
with open('example.txt', 'w', encoding='utf-8') as file:
file.write(text_content)
print("saved example.txt")
"""
error_code = "print(ss)"
none_code = "a = None"
# res_type, res = execute(img_code, kernel)
# print(res_type,res)
all_test_code = [text_code, table_code, img_code, file_code, error_code, none_code]
for i in all_test_code:
msg = execute(i, kernel)
print(msg)
kernel.write_to_notebook("./cache/cache_test/my_notebook2.ipynb")