GGSheng's picture
feat: deploy Gemma 4 to hf space
3a5cf48 verified
import copy
from datetime import datetime
import typing
from mod.base import public_aap as public
import time
from mailModel.power_mta.maillog_stat import query_maillog_with_time_section
class overview:
def __init__(self):
import PluginLoader
self.__IS_PRO_MEMBER: bool = PluginLoader.get_auth_state() > 0
# 过滤与准备时间范围
def __filter_and_prepare_time_section(self, start_time: int = -1, end_time: int = -1) -> typing.Tuple[int, int]:
if start_time > 0 and end_time < 0:
end_time = int(time.time())
if start_time > end_time:
raise public.HintException(public.lang('end_time must greater than start_time'))
return start_time, end_time
# 创建统计数据查询基础query
def __build_base_query(self, campaign_id: int = -1, domain: typing.Optional[str] = None, start_time: int = -1, end_time: int = -1) -> public.S:
query = public.S('send_mails').alias('sm').prefix('')
query.left_join('senders s', 'sm.postfix_message_id=s.postfix_message_id')
query.where('s.postfix_message_id is not null')
if domain is not None:
query.where('s.sender like ?', '%@{}'.format(domain))
if start_time > 0:
query.where('sm.log_time > ?', start_time - 1)
if end_time > 0:
query.where('sm.log_time < ?', end_time + 1)
return query
# 概览图表
def overview(self, campaign_id: int = -1, domain: typing.Optional[str] = None, start_time: int = -1, end_time: int = -1) -> typing.Dict:
return {
'dashboard': self.overview_dashboard(campaign_id, domain, start_time, end_time),
'mail_providers': self.overview_providers(campaign_id, domain, start_time, end_time),
'send_mail_chart': self.chart_send_mail(campaign_id, domain, start_time, end_time),
'bounce_rate_chart': self.chart_bounce_rate(campaign_id, domain, start_time, end_time),
'open_rate_chart': self.chart_open_rate(campaign_id, domain, start_time, end_time),
'click_rate_chart': self.chart_click_rate(campaign_id, domain, start_time, end_time),
}
# 仪表盘数据总览
def overview_dashboard(self, campaign_id: int = -1, domain: typing.Optional[str] = None, start_time: int = -1, end_time: int = -1) -> typing.Dict:
start_time, end_time = self.__filter_and_prepare_time_section(start_time, end_time)
query = self.__build_base_query(campaign_id, domain, start_time, end_time)
query.left_join(public.S('opened').prefix('').field('distinct `postfix_message_id`').build_sql(True, 'o'), 'sm.postfix_message_id=o.postfix_message_id')
query.left_join(public.S('clicked').prefix('').field('distinct `postfix_message_id`').build_sql(True, 'c'), 'sm.postfix_message_id=c.postfix_message_id')
query.field('count(*) as `sends`')
query.field('ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) as `delivered`')
if self.__IS_PRO_MEMBER:
query.field('count(o.postfix_message_id) as `opened`')
query.field('count(c.postfix_message_id) as `clicked`')
query.field('ifnull(sum(`status`=\'bounced\'), 0) as `bounced`')
ret = query_maillog_with_time_section(query, start_time, end_time)
aggregate = {
'sends': 0,
'delivered': 0,
'opened': 0,
'clicked': 0,
'bounced': 0,
}
for item in ret:
for k, v in item.items():
if k in aggregate:
aggregate[k] += int(v)
aggregate['delivery_rate'] = round(aggregate['delivered'] / aggregate['sends'] * 100, 2) if aggregate['sends'] > 0 else 0
if self.__IS_PRO_MEMBER:
aggregate['bounce_rate'] = round(aggregate['bounced'] / aggregate['sends'] * 100, 2) if aggregate['sends'] > 0 else 0
aggregate['open_rate'] = round(aggregate['opened'] / aggregate['delivered'] * 100, 2) if aggregate['delivered'] > 0 else 0
aggregate['click_rate'] = round(aggregate['clicked'] / aggregate['delivered'] * 100, 2) if aggregate['delivered'] > 0 else 0
else:
aggregate['opened'] = -1
aggregate['clicked'] = -1
aggregate['bounced'] = -1
aggregate['bounce_rate'] = -1
aggregate['open_rate'] = -1
aggregate['click_rate'] = -1
return aggregate
# 统计邮件服务商
def overview_providers(self, campaign_id: int = -1, domain: typing.Optional[str] = None, start_time: int = -1, end_time: int = -1) -> typing.List:
if not self.__IS_PRO_MEMBER:
return [
{
'mail_provider': 'google',
'sends': -1,
'delivered': -1,
'opened': -1,
'clicked': -1,
'bounce_rate': -1,
'delivery_rate': -1,
'open_rate': -1,
'click_rate': -1,
},
{
'mail_provider': 'outlook',
'sends': -1,
'delivered': -1,
'opened': -1,
'clicked': -1,
'bounce_rate': -1,
'delivery_rate': -1,
'open_rate': -1,
'click_rate': -1,
},
{
'mail_provider': 'yahoo',
'sends': -1,
'delivered': -1,
'opened': -1,
'clicked': -1,
'bounce_rate': -1,
'delivery_rate': -1,
'open_rate': -1,
'click_rate': -1,
},
{
'mail_provider': 'other',
'sends': -1,
'delivered': -1,
'opened': -1,
'clicked': -1,
'bounce_rate': -1,
'delivery_rate': -1,
'open_rate': -1,
'click_rate': -1,
}
]
start_time, end_time = self.__filter_and_prepare_time_section(start_time, end_time)
query = self.__build_base_query(campaign_id, domain, start_time, end_time)
query.left_join(public.S('opened').prefix('').field('distinct `postfix_message_id`').build_sql(True, 'o'),
'sm.postfix_message_id=o.postfix_message_id')
query.left_join(public.S('clicked').prefix('').field('distinct `postfix_message_id`').build_sql(True, 'c'),
'sm.postfix_message_id=c.postfix_message_id')
query.field('sm.mail_provider')
query.field('count(*) as `sends`')
query.field('ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) as `delivered`')
query.field('count(o.postfix_message_id) as `opened`')
query.field('count(c.postfix_message_id) as `clicked`')
query.field('ifnull(sum(`status`=\'bounced\'), 0) as `bounced`')
query.group('sm.mail_provider')
ret = query_maillog_with_time_section(query, start_time, end_time)
aggregate = {
'sends': 0,
'delivered': 0,
'opened': 0,
'clicked': 0,
'bounced': 0,
}
m = {}
for item in ret:
if 'mail_provider' not in m:
m[item['mail_provider']] = copy.deepcopy(aggregate)
for k, v in item.items():
if k in aggregate:
m[item['mail_provider']][k] += int(v)
lst = []
for k, item in m.items():
item['mail_provider'] = k
item['delivery_rate'] = round(item['delivered'] / item['sends'] * 100, 2) if item['sends'] > 0 else 0
item['bounce_rate'] = round(item['bounced'] / item['sends'] * 100, 2) if item['sends'] > 0 else 0
item['open_rate'] = round(item['opened'] / item['delivered'] * 100, 2) if item['delivered'] > 0 else 0
item['click_rate'] = round(item['clicked'] / item['delivered'] * 100, 2) if item['delivered'] > 0 else 0
lst.append(item)
return sorted(lst, key=lambda x: x['sends'], reverse=True)
# 图表数据序列填充
def fill_chart_data(self, data: typing.List, fill_item: typing.Dict, fill_type: str = 'daily', fill_key: str = 'x', start_time: int = -1, end_time: int = -1) -> typing.List:
'''
@name 图表数据序列填充
@param data list 图表数据序列
@param fill_item dict 填充项
@param fill_type str 填充类型 daily|hourly|monthly
@param fill_key str 填充字段
@return list
'''
if fill_type == 'daily':
return self.fill_chart_data_daily(data, fill_item, fill_key, start_time, end_time)
elif fill_type == 'hourly':
return self.fill_chart_data_hourly(data, fill_item, fill_key)
elif fill_type == 'monthly':
return self.fill_chart_data_monthly(data, fill_item, fill_key, start_time, end_time)
else:
return data
# 图表数据序列小时填充
def fill_chart_data_hourly(self, data: typing.List, fill_item: typing.Dict, fill_key: str = 'x') -> typing.List:
for item in data:
item[fill_key] = int(item['x'])
if len(data) == 24:
return data
m = {}
for item in data:
m[int(item[fill_key])] = item
lst = []
for i in range(24):
if i in m:
lst.append(m[i])
continue
item = copy.deepcopy(fill_item)
item[fill_key] = i
lst.append(item)
return lst
# 图表数据序列日填充
def fill_chart_data_daily(self, data: typing.List, fill_item: typing.Dict, fill_key: str = 'x', start_time: int = -1, end_time: int = -1) -> typing.List:
if start_time < 0 or end_time < 0:
return data
if start_time > 0 and end_time < 0:
end_time = int(time.time())
if start_time > end_time:
return data
m = {}
for item in data:
m[item[fill_key]] = item
lst = []
for i in range(start_time, end_time + 1, 86400):
day_date_obj = datetime.fromtimestamp(i)
day_date = day_date_obj.strftime('%Y-%m-%d')
day_time = int(datetime(day_date_obj.year, day_date_obj.month, day_date_obj.day, 0, 0, 0).timestamp())
if day_date in m:
m[day_date][fill_key] = day_time
lst.append(m[day_date])
continue
item = copy.deepcopy(fill_item)
item[fill_key] = day_time
lst.append(item)
return lst
# 图表数据序列月填充
def fill_chart_data_monthly(self, data: typing.List, fill_item: typing.Dict, fill_key: str = 'x',
start_time: int = -1, end_time: int = -1) -> typing.List:
if start_time < 0 or end_time < 0:
return data
if start_time > 0 and end_time < 0:
end_time = int(time.time())
if start_time > end_time:
return data
m = {}
for item in data:
m[item[fill_key]] = item
lst = []
for i in range(start_time, end_time + 1, 86400):
day_date_obj = datetime.fromtimestamp(i)
day_date = day_date_obj.strftime('%Y-%m-%d')
day_time = int(datetime(day_date_obj.year, day_date_obj.month, day_date_obj.day, 0, 0, 0).timestamp())
if day_date in m:
m[day_date][fill_key] = day_time
lst.append(m[day_date])
continue
item = copy.deepcopy(fill_item)
item[fill_key] = day_time
lst.append(item)
return lst
# 统计邮件发送数据总览
def send_mail_dashboard(self, campaign_id: int = -1, domain: typing.Optional[str] = None, start_time: int = -1, end_time: int = -1) -> typing.Dict:
start_time, end_time = self.__filter_and_prepare_time_section(start_time, end_time)
query = self.__build_base_query(campaign_id, domain, start_time, end_time)
query.field('count(*) as `sends`')
query.field('ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) as `delivered`')
query.field('count(*) - ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) as `failed`')
ret = query_maillog_with_time_section(query, start_time, end_time)
aggregate = {
'sends': 0,
'delivered': 0,
'failed': 0,
}
for item in ret:
for k, v in item.items():
if k in aggregate:
aggregate[k] += int(v)
aggregate['delivery_rate'] = round(aggregate['delivered'] / aggregate['sends'] * 100, 2) if aggregate['sends'] > 0 else 0
aggregate['failure_rate'] = round(aggregate['failed'] / aggregate['sends'] * 100, 2) if aggregate['sends'] > 0 else 0
return aggregate
# 准备邮件发送数据图表
def __prepare_chart_data(self, start_time: int = -1, end_time: int = -1) -> typing.Tuple[str, str]:
column_type = 'daily'
datetime_format = '%Y-%m-%d'
secs = end_time - start_time
if secs < 86400:
column_type = 'hourly'
datetime_format = '%H'
x_axis_field = 'strftime(\'{}\', `sm`.`log_time`, \'unixepoch\', \'localtime\') as `x`'.format(datetime_format)
return column_type, x_axis_field
# 统计邮件发送数据图表
def chart_send_mail(self, campaign_id: int = -1, domain: typing.Optional[str] = None, start_time: int = -1, end_time: int = -1) -> typing.Dict:
start_time, end_time = self.__filter_and_prepare_time_section(start_time, end_time)
query = self.__build_base_query(campaign_id, domain, start_time, end_time)
column_type, x_axis_field = self.__prepare_chart_data(start_time, end_time)
query.field(x_axis_field)
query.field('count(*) as `sends`')
query.field('ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) as `delivered`')
query.field('count(*) - ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) as `failed`')
query.group('x')
query.order('x')
ret = query_maillog_with_time_section(query, start_time, end_time)
fill_item = {
'sends': 0,
'delivered': 0,
'failed': 0,
}
return {
'column_type': column_type,
'dashboard': self.send_mail_dashboard(campaign_id, domain, start_time, end_time),
'data': self.fill_chart_data(ret, fill_item, column_type, 'x', start_time, end_time)
}
# 统计邮件退件率数据图表
def chart_bounce_rate(self, campaign_id: int = -1, domain: typing.Optional[str] = None, start_time: int = -1, end_time: int = -1) -> typing.Dict:
if not self.__IS_PRO_MEMBER:
return {
'column_type': None,
'data': None,
}
start_time, end_time = self.__filter_and_prepare_time_section(start_time, end_time)
query = self.__build_base_query(campaign_id, domain, start_time, end_time)
column_type, x_axis_field = self.__prepare_chart_data(start_time, end_time)
query.field(x_axis_field)
query.field('case when count(*) > 0 then round(1.0 * ifnull(sum(`status`=\'bounced\'), 0) / count(*) * 100.0, 2) else 0.0 end as `bounce_rate`')
query.group('x')
query.order('x')
ret = query_maillog_with_time_section(query, start_time, end_time)
fill_item = {
'bounce_rate': 0.0,
}
return {
'column_type': column_type,
'data': self.fill_chart_data(ret, fill_item, column_type, 'x', start_time, end_time),
}
# 统计邮件打开率数据图表
def chart_open_rate(self, campaign_id: int = -1, domain: typing.Optional[str] = None, start_time: int = -1, end_time: int = -1) -> typing.Dict:
if not self.__IS_PRO_MEMBER:
return {
'column_type': None,
'data': None,
}
start_time, end_time = self.__filter_and_prepare_time_section(start_time, end_time)
query = self.__build_base_query(campaign_id, domain, start_time, end_time)
column_type, x_axis_field = self.__prepare_chart_data(start_time, end_time)
query.left_join(public.S('opened').prefix('').field('distinct `postfix_message_id`').build_sql(True, 'o'),
'sm.postfix_message_id=o.postfix_message_id')
query.field(x_axis_field)
query.field('case when ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) > 0 then round(1.0 * count(o.postfix_message_id) / ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) * 100, 2) else 0.0 end as `open_rate`')
query.group('x')
ret = query_maillog_with_time_section(query, start_time, end_time)
fill_item = {
'open_rate': 0.0,
}
return {
'column_type': column_type,
'data': self.fill_chart_data(ret, fill_item, column_type, 'x', start_time, end_time),
}
# 统计邮件点击率数据图表
def chart_click_rate(self, campaign_id: int = -1, domain: typing.Optional[str] = None, start_time: int = -1, end_time: int = -1) -> typing.Dict:
if not self.__IS_PRO_MEMBER:
return {
'column_type': None,
'data': None,
}
start_time, end_time = self.__filter_and_prepare_time_section(start_time, end_time)
query = self.__build_base_query(campaign_id, domain, start_time, end_time)
column_type, x_axis_field = self.__prepare_chart_data(start_time, end_time)
query.left_join(public.S('clicked').prefix('').field('distinct `postfix_message_id`').build_sql(True, 'c'),
'sm.postfix_message_id=c.postfix_message_id')
query.field(x_axis_field)
query.field('case when ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) > 0 then round(1.0 * count(c.postfix_message_id) / ifnull(sum(`status`=\'sent\' and `dsn`=\'2.0.0\'), 0) * 100, 2) else 0.0 end as `click_rate`')
query.group('x')
ret = query_maillog_with_time_section(query, start_time, end_time)
fill_item = {
'click_rate': 0.0,
}
return {
'column_type': column_type,
'data': self.fill_chart_data(ret, fill_item, column_type, 'x', start_time, end_time),
}