| import streamlit as st |
| import re |
| import lark_oapi as lark |
| from lark_oapi.api.bitable.v1 import * |
| import time |
| import openpyxl |
| import os |
| from dotenv import load_dotenv |
| import logging |
| from tenacity import retry, stop_after_attempt, wait_exponential |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| load_dotenv() |
|
|
| def parse_bitable_url(url): |
| """ |
| Extracts app_token and table_id from a Bitable URL. |
| Expected format: .../base/<app_token>?table=<table_id>... |
| """ |
| try: |
| |
| app_token_match = re.search(r'/base/([^/?]+)', url) |
| if not app_token_match: |
| return None, None |
| app_token = app_token_match.group(1) |
|
|
| |
| table_id_match = re.search(r'table=([^&]+)', url) |
| if not table_id_match: |
| return None, None |
| table_id = table_id_match.group(1) |
| |
| return app_token, table_id |
| except Exception: |
| return None, None |
|
|
| def get_bitable_client(app_id, app_secret): |
| return lark.Client.builder().app_id(app_id).app_secret(app_secret).build() |
|
|
| def get_table_fields(client, app_token, table_id): |
| logger.info(f"Fetching fields for app_token={app_token}, table_id={table_id}") |
| fields = [] |
| page_token = None |
| has_more = True |
| |
| while has_more: |
| req = ListAppTableFieldRequest.builder() \ |
| .app_token(app_token) \ |
| .table_id(table_id) \ |
| .page_size(100) |
| |
| if page_token: |
| req.page_token(page_token) |
| |
| req = req.build() |
| |
| try: |
| resp = client.bitable.v1.app_table_field.list(req) |
| except Exception as e: |
| logger.error(f"API call failed: {e}", exc_info=True) |
| raise |
|
|
| if not resp.success(): |
| logger.error(f"Failed to list fields: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}") |
| raise Exception(f"Failed to list fields: {resp.msg}, log_id: {resp.get_log_id()}") |
| |
| if resp.data and resp.data.items: |
| fields.extend(resp.data.items) |
| logger.info(f"Fetched {len(resp.data.items)} fields. Current total: {len(fields)}") |
| logger.info(f"Page token: {page_token} -> {resp.data.page_token}, Has more: {resp.data.has_more}") |
| |
| |
| |
| |
| |
| |
| items_count = len(resp.data.items) if (resp.data and resp.data.items) else 0 |
| |
| if items_count < 100: |
| has_more = False |
| else: |
| has_more = resp.data.has_more |
| |
| |
| new_page_token = resp.data.page_token |
| |
| if has_more and new_page_token == page_token: |
| logger.warning("Infinite loop detected: page_token did not update. Breaking.") |
| has_more = False |
| |
| page_token = new_page_token |
| |
| logger.info(f"Total fields fetched: {len(fields)}") |
| return fields |
|
|
| def validate_primary_keys(excel_columns, target_fields): |
| |
| |
| |
| |
| |
| target_pk = next((f for f in target_fields if f.is_primary), None) |
| |
| if not target_pk: |
| return False, "Target table has no primary key." |
| |
| if target_pk.field_name not in excel_columns: |
| return False, f"Target Primary Key '{target_pk.field_name}' not found in Excel columns." |
| |
| |
| |
| return True, f"Primary key '{target_pk.field_name}' found in Excel." |
|
|
| @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) |
| def _send_batch(client, app_token, table_id, records): |
| req = BatchCreateAppTableRecordRequest.builder() \ |
| .app_token(app_token) \ |
| .table_id(table_id) \ |
| .request_body(BatchCreateAppTableRecordRequestBody.builder().records(records).build()) \ |
| .build() |
| |
| resp = client.bitable.v1.app_table_record.batch_create(req) |
| if not resp.success(): |
| logger.error(f"Batch create failed: code={resp.code}, msg={resp.msg}, log_id={resp.get_log_id()}") |
| if resp.error: |
| logger.error(f"Error details: {resp.error}") |
| raise Exception(f"Batch create failed: {resp.msg}, log_id: {resp.get_log_id()}") |
|
|
| def sync_data(client, excel_rows, excel_columns, special_col_name, manual_source_value, excel_file_name, target_app_token, target_table_id, target_fields): |
| target_field_names = {f.field_name: f for f in target_fields} |
| |
| batch_records = [] |
| total_records = len(excel_rows) |
| |
| if total_records == 0: |
| st.warning("No records to sync.") |
| return |
|
|
| progress_bar = st.progress(0) |
| status_text = st.empty() |
| processed_count = 0 |
| |
| for row in excel_rows: |
| new_fields = {} |
| |
| |
| row_dict = dict(zip(excel_columns, row)) |
| |
| |
| for col_name, val in row_dict.items(): |
| if col_name in target_field_names and val is not None: |
| |
| |
| |
| |
| |
| |
| target_type = target_field_names[col_name].type |
| |
| if target_type == 5: |
| if hasattr(val, 'timestamp'): |
| new_fields[col_name] = int(val.timestamp() * 1000) |
| else: |
| new_fields[col_name] = val |
| elif target_type == 4: |
| |
| if isinstance(val, str): |
| new_fields[col_name] = [v.strip() for v in val.split(',')] |
| else: |
| new_fields[col_name] = val |
| elif target_type == 11: |
| |
| |
| |
| |
| pass |
| else: |
| new_fields[col_name] = val |
| |
| |
| if special_col_name and special_col_name in target_field_names: |
| target_col_def = target_field_names[special_col_name] |
| |
| |
| final_val = manual_source_value if manual_source_value else f"Imported from {excel_file_name}" |
| |
| if target_col_def.type == 15: |
| |
| |
| link_url = str(final_val) if final_val else "" |
| if not link_url.startswith("http"): |
| link_url = "" |
| |
| new_fields[special_col_name] = { |
| "link": link_url, |
| "text": str(final_val) |
| } |
| else: |
| new_fields[special_col_name] = final_val |
|
|
| batch_records.append(AppTableRecord.builder().fields(new_fields).build()) |
| |
| if len(batch_records) >= 100: |
| _send_batch(client, target_app_token, target_table_id, batch_records) |
| processed_count += len(batch_records) |
| progress_bar.progress(processed_count / total_records) |
| status_text.text(f"Synced {processed_count}/{total_records} records") |
| batch_records = [] |
| |
| if batch_records: |
| _send_batch(client, target_app_token, target_table_id, batch_records) |
| processed_count += len(batch_records) |
| progress_bar.progress(1.0) |
| status_text.text(f"Synced {processed_count}/{total_records} records") |
|
|
| def main(): |
| st.set_page_config(page_title="Bitbcpy - 多维表格合并工具", layout="wide") |
| st.title("Bitbcpy - 多维表格合并脚本") |
| st.markdown("将本地 Excel 的行抄到目标表,同名列合并,忽略目标表不存在的列。") |
|
|
| if 'target_fields' not in st.session_state: |
| st.session_state.target_fields = [] |
| if 'target_fields_loaded' not in st.session_state: |
| st.session_state.target_fields_loaded = False |
|
|
| with st.sidebar: |
| st.header("配置") |
| |
| default_app_id = os.getenv("LARK_OAPI_APPID", "") |
| default_app_secret = os.getenv("LARK_OAPI_SECRET", "") |
| |
| app_id = st.text_input("App ID", value=default_app_id) |
| app_secret = st.text_input("App Secret", type="password", value=default_app_secret) |
| |
| target_url = st.text_input("目标文档链接") |
| |
| if st.button("读取目标表字段"): |
| if not app_id or not app_secret: |
| st.error("请填写 App ID 和 App Secret") |
| elif not target_url: |
| st.error("请填写目标文档链接") |
| else: |
| t_token, t_table = parse_bitable_url(target_url) |
| if not (t_token and t_table): |
| st.error("目标链接解析失败") |
| else: |
| client = get_bitable_client(app_id, app_secret) |
| try: |
| with st.spinner("Fetching schema..."): |
| fields = get_table_fields(client, t_token, t_table) |
| st.session_state.target_fields = fields |
| st.session_state.target_fields_loaded = True |
| st.success("字段加载成功") |
| except Exception as e: |
| st.error(f"Error: {str(e)}") |
| |
| source_col = None |
| manual_source_val = None |
| |
| if st.session_state.target_fields_loaded: |
| field_names = [f.field_name for f in st.session_state.target_fields] |
| source_col = st.selectbox("哪一列记录来源", options=field_names) |
| manual_source_val = st.text_input("该列统一填写的来源信息", help="例如:2023年Q1销售数据") |
|
|
| uploaded_file = st.file_uploader("上传来源 Excel (xlsx)", type=['xlsx']) |
|
|
| if st.button("开始合并"): |
| if not app_id or not app_secret: |
| st.error("请在左侧侧边栏填写 App ID 和 App Secret") |
| return |
| |
| if not uploaded_file: |
| st.error("请上传来源 Excel 文件") |
| return |
| |
| t_token, t_table = parse_bitable_url(target_url) |
| |
| if not (t_token and t_table): |
| st.error("目标链接解析失败,请检查链接格式") |
| return |
| |
| client = get_bitable_client(app_id, app_secret) |
| |
| try: |
| |
| wb = openpyxl.load_workbook(uploaded_file, data_only=True) |
| sheet = wb.active |
| |
| |
| rows = list(sheet.iter_rows(values_only=True)) |
| if not rows: |
| st.error("Excel 文件为空") |
| return |
| |
| header = rows[0] |
| data_rows = rows[1:] |
| |
| |
| excel_columns = [str(h) for h in header if h is not None] |
| |
| |
| t_fields = st.session_state.target_fields |
| if not t_fields: |
| with st.spinner("Fetching target table schema..."): |
| t_fields = get_table_fields(client, t_token, t_table) |
| |
| valid, msg = validate_primary_keys(excel_columns, t_fields) |
| if not valid: |
| st.error(f"主键校验失败: {msg}") |
| return |
| else: |
| st.success("主键校验通过") |
| |
| st.info(f"Found {len(data_rows)} records in Excel.") |
| |
| sync_data(client, data_rows, excel_columns, source_col, manual_source_val, uploaded_file.name, t_token, t_table, t_fields) |
| st.success("合并完成!") |
| |
| except Exception as e: |
| logger.error(f"Error in main loop: {e}", exc_info=True) |
| st.error(f"Error: {str(e)}") |
|
|
| if __name__ == "__main__": |
| main() |
|
|