| |
|
|
| import imaplib |
| import poplib |
| import email |
| import sys |
| import re |
| try: |
| from HTMLParser import HTMLParser |
| except: |
| from html.parser import HTMLParser |
|
|
| if sys.version_info[0] == 2: |
| reload(sys) |
| sys.setdefaultencoding('utf-8') |
| else: |
| from email.header import decode_header |
| from email.utils import parsedate_tz, mktime_tz, parseaddr |
|
|
| sys.path.append("class/") |
| import public |
|
|
|
|
| class XssHtml(HTMLParser): |
| allow_tags = ['a', 'img', 'br', 'strong', 'b', 'code', 'pre', |
| 'p', 'div', 'em', 'span', 'h1', 'h2', 'h3', 'h4', |
| 'h5', 'h6', 'blockquote', 'ul', 'ol', 'tr', 'th', 'td', |
| 'hr', 'li', 'u', 'embed', 's', 'table', 'thead', 'tbody', |
| 'caption', 'small', 'q', 'sup', 'sub'] |
| common_attrs = ["style", "class", "name"] |
| nonend_tags = ["img", "hr", "br", "embed"] |
| tags_own_attrs = { |
| "img": ["src", "width", "height", "alt", "align"], |
| "a": ["href", "target", "rel", "title"], |
| "embed": ["src", "width", "height", "type", "allowfullscreen", "loop", "play", "wmode", "menu"], |
| "table": ["border", "cellpadding", "cellspacing"], |
| } |
|
|
| _regex_url = re.compile(r'^(http|https|ftp)://.*', re.I | re.S) |
| _regex_style_1 = re.compile(r'(\\|&#|/\*|\*/)', re.I) |
| _regex_style_2 = re.compile(r'e.*x.*p.*r.*e.*s.*s.*i.*o.*n', re.I | re.S) |
|
|
| def __init__(self, allows=[]): |
| HTMLParser.__init__(self) |
| self.allow_tags = allows if allows else self.allow_tags |
| self.result = [] |
| self.start = [] |
| self.data = [] |
|
|
| def getHtml(self): |
| """ |
| Get the safe html code |
| """ |
| for i in range(0, len(self.result)): |
| self.data.append(self.result[i]) |
| return ''.join(self.data) |
|
|
| def handle_startendtag(self, tag, attrs): |
| self.handle_starttag(tag, attrs) |
|
|
| def handle_starttag(self, tag, attrs): |
| if tag not in self.allow_tags: |
| return |
| end_diagonal = ' /' if tag in self.nonend_tags else '' |
| if not end_diagonal: |
| self.start.append(tag) |
| attdict = {} |
| for attr in attrs: |
| attdict[attr[0]] = attr[1] |
|
|
| attdict = self._wash_attr(attdict, tag) |
| if hasattr(self, "node_%s" % tag): |
| attdict = getattr(self, "node_%s" % tag)(attdict) |
| else: |
| attdict = self.node_default(attdict) |
|
|
| attrs = [] |
| for (key, value) in attdict.items(): |
| attrs.append('%s="%s"' % (key, self._htmlspecialchars(value))) |
| attrs = (' ' + ' '.join(attrs)) if attrs else '' |
| self.result.append('<' + tag + attrs + end_diagonal + '>') |
|
|
| def handle_endtag(self, tag): |
| if self.start and tag == self.start[len(self.start) - 1]: |
| self.result.append('</' + tag + '>') |
| self.start.pop() |
|
|
| def handle_data(self, data): |
| self.result.append(self._htmlspecialchars(data)) |
|
|
| def handle_entityref(self, name): |
| if name.isalpha(): |
| self.result.append("&%s;" % name) |
|
|
| def handle_charref(self, name): |
| if name.isdigit(): |
| self.result.append("&#%s;" % name) |
|
|
| def node_default(self, attrs): |
| attrs = self._common_attr(attrs) |
| return attrs |
|
|
| def node_a(self, attrs): |
| attrs = self._common_attr(attrs) |
| attrs = self._get_link(attrs, "href") |
| attrs = self._set_attr_default(attrs, "target", "_blank") |
| attrs = self._limit_attr(attrs, { |
| "target": ["_blank", "_self"] |
| }) |
| return attrs |
|
|
| def node_embed(self, attrs): |
| attrs = self._common_attr(attrs) |
| attrs = self._get_link(attrs, "src") |
| attrs = self._limit_attr(attrs, { |
| "type": ["application/x-shockwave-flash"], |
| "wmode": ["transparent", "window", "opaque"], |
| "play": ["true", "false"], |
| "loop": ["true", "false"], |
| "menu": ["true", "false"], |
| "allowfullscreen": ["true", "false"] |
| }) |
| attrs["allowscriptaccess"] = "never" |
| attrs["allownetworking"] = "none" |
| return attrs |
|
|
| def _true_url(self, url): |
| if self._regex_url.match(url): |
| return url |
| else: |
| return "http://%s" % url |
|
|
| def _true_style(self, style): |
| if style: |
| style = self._regex_style_1.sub('_', style) |
| style = self._regex_style_2.sub('_', style) |
| return style |
|
|
| def _get_style(self, attrs): |
| if "style" in attrs: |
| attrs["style"] = self._true_style(attrs.get("style")) |
| return attrs |
|
|
| def _get_link(self, attrs, name): |
| if name in attrs: |
| attrs[name] = self._true_url(attrs[name]) |
| return attrs |
|
|
| def _wash_attr(self, attrs, tag): |
| if tag in self.tags_own_attrs: |
| other = self.tags_own_attrs.get(tag) |
| else: |
| other = [] |
|
|
| _attrs = {} |
| if attrs: |
| for (key, value) in attrs.items(): |
| if key in self.common_attrs + other: |
| _attrs[key] = value |
| return _attrs |
|
|
| def _common_attr(self, attrs): |
| attrs = self._get_style(attrs) |
| return attrs |
|
|
| def _set_attr_default(self, attrs, name, default=''): |
| if name not in attrs: |
| attrs[name] = default |
| return attrs |
|
|
| def _limit_attr(self, attrs, limit={}): |
| for (key, value) in limit.items(): |
| if key in attrs and attrs[key] not in value: |
| del attrs[key] |
| return attrs |
|
|
| def _htmlspecialchars(self, html): |
| return html.replace("<", "<")\ |
| .replace(">", ">")\ |
| .replace('"', """)\ |
| .replace("'", "'") |
|
|
|
|
| class ReceiveMail(object): |
|
|
| def xss_encode(self, text): |
| parser = XssHtml() |
| parser.feed(text) |
| parser.close() |
| return parser.getHtml() |
|
|
| |
| def getTime(self, msg): |
| if not msg['date']: return 0 |
| if sys.version_info[0] == 2: |
| deContent = email.Header.decode_header(msg['date'])[0] |
| else: |
| deContent = decode_header(msg['date'])[0] |
| if deContent[1] is not None: |
| if sys.version_info[0] == 2: |
| date_str = unicode(deContent[0], deContent[1]) |
| else: |
| date_str = str(deContent[0], deContent[1]) |
| else: |
| date_str = deContent[0] |
| if sys.version_info[0] == 2: |
| date_tuple = email.Utils.parsedate_tz(date_str) |
| time_stamp = email.Utils.mktime_tz(date_tuple) |
| else: |
| date_tuple = parsedate_tz(date_str) |
| time_stamp = mktime_tz(date_tuple) |
| return time_stamp |
|
|
| |
| def getSenderInfo(self, msg): |
| if sys.version_info[0] == 2: |
| address = email.Utils.parseaddr(msg["from"])[1] |
| name = email.Utils.parseaddr(msg["from"])[0] |
| deName = email.Header.decode_header(name) |
| else: |
| address = parseaddr(msg["from"])[1] |
| name = parseaddr(msg["from"])[0] |
| deName = decode_header(name) |
| s = "" |
| for content in deName: |
| if type(content[0]) == str: |
| s += content[0] |
| continue |
| if content[1] is not None: |
| s += content[0].decode(content[1]) |
| else: |
| s += content[0].decode('utf-8') |
| name = s.strip() |
| return '{0} <{1}>'.format(name, address).strip() |
|
|
| |
| def getReceiverInfo(self, msg): |
| to_list = msg["to"].split(', ') |
| to_str_list = list() |
| for to_addr in to_list: |
| if sys.version_info[0] == 2: |
| address = email.Utils.parseaddr(to_addr)[1] |
| name = email.Utils.parseaddr(to_addr)[0] |
| deName = email.Header.decode_header(name)[0] |
| else: |
| address = parseaddr(to_addr)[1] |
| name = parseaddr(to_addr)[0] |
| deName = decode_header(name)[0] |
| if deName[1] is not None: |
| if sys.version_info[0] == 2: |
| name = unicode(deName[0], deName[1]) |
| else: |
| name = str(deName[0], deName[1]) |
| to_str_list.append('{0} <{1}>'.format(name, address).strip()) |
| return ';'.join(to_str_list) |
|
|
| |
| def getSubjectContent(self, msg): |
| if not msg['subject']: |
| return '' |
| if sys.version_info[0] == 2: |
| deContent = email.Header.decode_header(msg['subject']) |
| else: |
| deContent = decode_header(msg['subject']) |
| s = "" |
| for content in deContent: |
| if type(content[0]) == str: |
| s += content[0] |
| continue |
| if content[1] is not None: |
| s += content[0].decode(content[1]) |
| else: |
| s += content[0].decode('utf-8') |
| return s.strip() |
|
|
| def parse_attachment(self, message_part): |
| ''' |
| 判断是否有附件,并解析 |
| ''' |
| content_disposition = message_part.get("Content-Disposition", None) |
| if content_disposition: |
| dispositions = content_disposition.strip().split(";") |
| if bool(content_disposition and dispositions[0].lower() == "attachment"): |
|
|
| file_data = message_part.get_payload(decode=True) |
| attachment = dict() |
| attachment["content_type"] = message_part.get_content_type() |
| attachment["size"] = len(file_data) |
| if sys.version_info[0] == 2: |
| deName = email.Header.decode_header(message_part.get_filename())[0] |
| else: |
| deName = decode_header(message_part.get_filename())[0] |
| name = deName[0] |
| if deName[1] is not None: |
| if sys.version_info[0] == 2: |
| name = unicode(deName[0], deName[1]) |
| else: |
| name = str(deName[0], deName[1]) |
| attachment["name"] = name |
| |
| |
| |
| |
| |
| return attachment |
| return None |
|
|
| |
| def guess_charset(self, msg): |
| charset = msg.get_charset() |
| if charset is None: |
| content_type = msg.get('Content-Type', '').lower() |
| if 'charset' in content_type: |
| charset = content_type.split('charset=')[1].strip() |
| if ';' in charset: |
| charset = charset.split(';')[0] |
| return charset |
| return charset |
|
|
| def getMailInfo(self, msg): |
| from email import policy |
| from email.parser import BytesParser |
| from email.utils import parsedate_to_datetime |
| import time |
| import base64 |
|
|
| msg = BytesParser(policy=policy.default).parsebytes(msg.encode('utf-8')) |
|
|
| |
| date_str = msg["date"] |
| if date_str: |
| dt = parsedate_to_datetime(date_str) |
| timestamp = int(time.mktime(dt.timetuple())) |
| else: |
| timestamp = None |
|
|
| headers = { |
| "from": msg["from"], |
| "to": msg["to"], |
| "subject": msg["subject"], |
| "time": timestamp, |
| } |
|
|
| |
| body = {"body": "", "html": ""} |
| if msg.is_multipart(): |
| for part in msg.iter_parts(): |
| content_type = part.get_content_type() |
| content_disposition = str(part.get("Content-Disposition")) |
|
|
| if part.get_content_maintype() == "multipart" and part.get_content_subtype() == "alternative": |
| |
| for subpart in part.iter_parts(): |
| sub_type = subpart.get_content_type() |
| if sub_type == "text/html": |
| body["html"] = subpart.get_content() |
| elif sub_type == "text/plain" and not body["html"]: |
| body["body"] = subpart.get_content() |
| continue |
|
|
| |
| if content_type == "text/plain" and "attachment" not in content_disposition: |
| body["body"] = part.get_content() |
| elif content_type == "text/html" and "attachment" not in content_disposition: |
| body["html"] = part.get_content() |
|
|
| |
| elif "attachment" in content_disposition: |
| filename = part.get_filename() |
| content = part.get_payload(decode=True) |
| body.setdefault("attachments", []).append({"filename": filename, "content": base64.b64encode(content).decode("utf-8")}) |
| else: |
| |
| content_type = msg.get_content_type() |
| if content_type == "text/plain": |
| body["body"] = msg.get_content() |
| elif content_type == "text/html": |
| body["html"] = msg.get_content() |
|
|
| headers.update(body) |
|
|
| return headers |
|
|
|
|
| class ImapReceiveMail(ReceiveMail): |
|
|
| def __init__(self, username, password, server, is_ssl=False): |
| if is_ssl: |
| self.mail = imaplib.IMAP4_SSL(server) |
| else: |
| self.mail = imaplib.IMAP4(server) |
| self.mail.login(username, password) |
| self.select("INBOX") |
|
|
| |
| def showFolders(self): |
| return self.mail.list() |
|
|
| |
| def select(self, selector): |
| return self.mail.select(selector) |
|
|
| |
| def search(self, charset, *criteria): |
| try: |
| return self.mail.search(charset, *criteria) |
| except : |
| self.select("INBOX") |
| return self.mail.search(charset, *criteria) |
|
|
| |
| def getUnread(self): |
| return self.search(None, "Unseen") |
|
|
| |
| def getAll(self): |
| return self.search(None, "All")[1][0].split() |
|
|
| |
| def getEmailFormat(self, num): |
| data = self.mail.fetch(num, 'RFC822') |
| if data[0] == 'OK': |
| return email.message_from_string(data[1][0][1]) |
| else: |
| return "fetch error" |
|
|
| |
| def getEmailUid(self, num): |
| data = self.mail.fetch(num, 'UID') |
| if data[0] == 'OK': |
| return data[1][0].split()[2].rstrip(')') |
| else: |
| return "get uid error" |
|
|
|
|
| class PopReceiveMail(ReceiveMail): |
|
|
| def __init__(self, username, password, server, is_ssl=False): |
| if is_ssl: |
| self.mail = poplib.POP3_SSL(server) |
| else: |
| self.mail = poplib.POP3(server) |
| self.mail.user(username) |
| self.mail.pass_(password) |
|
|
| def getAll(self): |
| return range(1, self.mail.stat()[0] + 1) |
|
|
| def getEmailFormat(self, num): |
| response, message, octets = self.mail.retr(num) |
| if 'OK' in response: |
| return email.message_from_string('\n'.join(message)) |
| else: |
| return "get email error" |
|
|
| def getEmailUid(self, num): |
| response, _, uid = self.mail.uidl(num).split() |
| if 'OK' in response: |
| return uid |
| else: |
| return "get uid error" |
|
|