| from flask import Flask, render_template, request, redirect, url_for, flash, jsonify |
| from flask_sqlalchemy import SQLAlchemy |
| from sqlalchemy import select, Column, exists, or_, Table, ForeignKey, Integer, String, DateTime, Text, and_, alias |
| from sqlalchemy.orm import aliased, relationship |
| from flask_login import UserMixin, LoginManager, login_user, login_required, logout_user, current_user |
| from flask_wtf.csrf import generate_csrf |
| from werkzeug.security import generate_password_hash, check_password_hash |
| from werkzeug.utils import secure_filename |
| from datetime import datetime |
| from flask_migrate import Migrate |
| import uuid |
| import os |
| from sqlalchemy import or_, func |
| from FastTelethonhelper import fast_download |
| from FastTelethonhelper import fast_upload |
| from telethon import TelegramClient, events,sync |
| import asyncio |
| import time |
|
|
|
|
|
|
| app_id = os.getenv("APP_ID") |
| api_hash = os.getenv("API_HASH") |
| btoken = os.getenv("BOT") |
| chnl = os.getenv("CHN") |
|
|
|
|
| api_id = int(app_id) |
| api_hash = str(api_hash) |
| bot_token = str(btoken) |
| channel= int(chnl) |
|
|
| print('dl sec') |
|
|
| |
|
|
| async def downdb(): |
| client = TelegramClient(None, api_id, api_hash) |
| await client.start(bot_token=bot_token) |
| |
| os.remove('instance/database.db') |
| @client.on(events.NewMessage) |
| async def handler(event): |
| last_message = event.message |
| await fast_download(client,last_message) |
| await asyncio.sleep(10) |
| await client.disconnect() |
| |
|
|
| return '2' |
|
|
|
|
|
|
|
|
| async def updb(): |
| client = TelegramClient(None, api_id, api_hash) |
| await client.start(bot_token=bot_token) |
| path = 'instance/database.db' |
| fle = await fast_upload(client, file_location=path) |
| entity = await client.get_entity(channel) |
|
|
| await client.send_file(entity,fle, force_document=True) |
| await client.disconnect() |
| return '2' |
|
|
| app = Flask("Simplz") |
| app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' |
| app.config['SECRET_KEY'] = 'your_secret_key' |
| db = SQLAlchemy(app) |
| migrate = Migrate(app, db) |
|
|
| app.config['UPLOAD_FOLDER'] = 'static/users/uploaded_images' |
| app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif'} |
|
|
| followers = db.Table( |
| 'followers', |
| db.Column('follower_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), |
| db.Column('followed_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), |
| extend_existing=True |
| ) |
|
|
| class Comment(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| content = db.Column(db.Text, nullable=False) |
| timestamp = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False) |
| author = db.relationship('User', backref='comments', lazy=True) |
|
|
|
|
| class Like(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False) |
| liked_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
|
|
|
| class Follow(db.Model): |
| __tablename__ = 'followers' |
| follower_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) |
| followed_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) |
|
|
| follower = db.relationship('User', foreign_keys=[follower_id], backref='following') |
| followed = db.relationship('User', foreign_keys=[followed_id], backref='followers') |
|
|
| __table_args__ = {'extend_existing': True} |
|
|
| class Message(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| sender_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| recipient_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| content = db.Column(db.Text, nullable=False) |
| timestamp = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) |
| sender = db.relationship('User', foreign_keys=[sender_id], backref='sent_messages') |
| recipient = db.relationship('User', foreign_keys=[recipient_id], backref='received_messages') |
|
|
|
|
|
|
| class User(UserMixin, db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| username = db.Column(db.String(100), unique=True, nullable=False) |
| email = db.Column(db.String(100), unique=True, nullable=False) |
| password = db.Column(db.String(100), nullable=False) |
| first_name = db.Column(db.String(50), default="Per") |
| last_name = db.Column(db.String(50), default="Son") |
| profession = db.Column(db.String(100), default="Person") |
| location = db.Column(db.String(100), default="Earth") |
| bio = db.Column(db.Text, default="Hello there...") |
| profile_pic = db.Column(db.String(100), default='default.jpg') |
| posts = db.relationship('Post', backref='author', lazy=True) |
|
|
| following_list = db.relationship( |
| 'User', |
| secondary=followers, |
| primaryjoin=(followers.c.follower_id == id), |
| secondaryjoin=(followers.c.followed_id == id), |
| backref=db.backref('followers_of', lazy='dynamic'), |
| lazy='dynamic', |
| overlaps="follower,followers_of,following,following_list" |
| ) |
|
|
| followers_list = db.relationship( |
| 'User', |
| secondary=followers, |
| primaryjoin=(followers.c.followed_id == id), |
| secondaryjoin=(followers.c.follower_id == id), |
| backref=db.backref('following_by', lazy='dynamic'), |
| lazy='dynamic', |
| overlaps="followed,followers,followers_of,following_list" |
| ) |
|
|
| def is_following(self, user): |
| return self.following_list.filter_by(id=user.id).first() is not None |
|
|
| def follow(self, user): |
| if not self.is_following(user) and self.id != user.id: |
| self.following_list.append(user) |
| db.session.commit() |
|
|
| def unfollow(self, user): |
| if self.is_following(user): |
| self.following_list.remove(user) |
| db.session.commit() |
|
|
| def like_post(self, post): |
| if not self.has_liked_post(post): |
| like = Like(user_id=self.id, post_id=post.id) |
| db.session.add(like) |
| db.session.commit() |
|
|
| def unlike_post(self, post): |
| like = Like.query.filter_by(user_id=self.id, post_id=post.id).first() |
| if like: |
| db.session.delete(like) |
| db.session.commit() |
|
|
| def has_liked_post(self, post): |
| return Like.query.filter_by(user_id=self.id, post_id=post.id).count() > 0 |
|
|
| def add_comment(self, post, content): |
| comment = Comment(user_id=self.id, post_id=post.id, content=content) |
| db.session.add(comment) |
| db.session.commit() |
|
|
| def delete_comment(self, comment): |
| if comment.user_id == self.id: |
| db.session.delete(comment) |
| db.session.commit() |
|
|
|
|
|
|
| class Post(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| content = db.Column(db.Text, nullable=False) |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
| filename = db.Column(db.String(100)) |
| comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan') |
| likes = db.relationship('Like', backref='post', lazy=True) |
| @property |
| def likes_count(self): |
| return Like.query.filter_by(post_id=self.id).count() |
|
|
|
|
| login_manager = LoginManager(app) |
| login_manager.login_view = 'login' |
|
|
| def allowed_file(filename): |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] |
|
|
| @login_manager.user_loader |
| def load_user(user_id): |
| return db.session.get(User, int(user_id)) |
|
|
|
|
| @app.route('/') |
| @login_required |
| def index(): |
| followed_user = aliased(User, name='followed_user') |
| |
| own_posts_query = Post.query.filter_by(user_id=current_user.id) |
| |
| followed_posts_query = Post.query.join( |
| Follow, |
| or_( |
| and_(Post.user_id == Follow.followed_id, Follow.follower_id == current_user.id), |
| and_(Post.user_id == Follow.follower_id, Follow.followed_id == current_user.id) |
| ) |
| ) |
|
|
| combined_posts_query = own_posts_query.union_all(followed_posts_query) |
| posts = combined_posts_query.order_by(Post.created_at.desc()).all() |
|
|
| csrf_token = request.environ.get('HTTP_X_CSRFTOKEN') |
|
|
| return render_template('index.html', posts=posts, csrf_token=csrf_token) |
|
|
|
|
|
|
| @app.route('/register', methods=['GET', 'POST']) |
| def register(): |
| if request.method == 'POST': |
| email = request.form['email'] |
| username = request.form['username'] |
| password = request.form['password'] |
| password2 = request.form['password2'] |
|
|
| if password != password2: |
| flash('Passwords do not match. Please try again.', 'error') |
| return render_template('register.html') |
|
|
| existing_user_with_username = User.query.filter_by(username=username).first() |
| existing_user_with_email = User.query.filter_by(email=email).first() |
|
|
| if existing_user_with_username: |
| flash('Username already exists. Please choose a different username.', 'error') |
| return render_template('register.html') |
|
|
| if existing_user_with_email: |
| flash('Email address already registered. Please use a different email.', 'error') |
| return render_template('register.html') |
|
|
| hashed_password = generate_password_hash(password, method='scrypt') |
| new_user = User(username=username, email=email, password=hashed_password) |
| db.session.add(new_user) |
| db.session.commit() |
|
|
| flash('Account created successfully! Please log in.', 'success') |
| return redirect(url_for('login')) |
|
|
| return render_template('register.html') |
|
|
| @app.route('/login', methods=['GET', 'POST']) |
| def login(): |
| if request.method == 'POST': |
| username = request.form['username'] |
| password = request.form['password'] |
| user = User.query.filter_by(username=username).first() |
| if user and check_password_hash(user.password, password): |
| login_user(user) |
| return redirect(url_for('index')) |
| else: |
| flash('Invalid username or password. Fields are case sensitive.', 'error') |
| return render_template('login.html') |
|
|
| @app.route('/logout') |
| @login_required |
| def logout(): |
| logout_user() |
| return redirect(url_for('index')) |
|
|
| @app.route('/create_post', methods=['POST']) |
| @login_required |
| def create_post(): |
| content = request.form['content'] |
| image = request.files['image'] |
|
|
| if image and allowed_file(image.filename): |
| filename = str(uuid.uuid4()) + secure_filename(image.filename) |
| image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) |
| else: |
| filename = None |
|
|
| new_post = Post(content=content, author=current_user, filename=filename) |
| db.session.add(new_post) |
| db.session.commit() |
| return redirect(url_for('index')) |
|
|
| @app.route('/delete_post/<int:post_id>', methods=['POST']) |
| @login_required |
| def delete_post(post_id): |
| post = db.session.get(Post, post_id) |
| if post and post.author == current_user: |
| if post.filename: |
| try: |
| os.remove(os.path.join(app.config['UPLOAD_FOLDER'], post.filename)) |
| except Exception as e: |
| flash(f"Error deleting image file: {str(e)}", "error") |
| |
| |
| Like.query.filter_by(post_id=post_id).delete() |
| db.session.delete(post) |
| db.session.commit() |
| return redirect(url_for('index')) |
|
|
|
|
|
|
|
|
|
|
| @app.route('/follow/<int:user_id>', methods=['POST']) |
| @login_required |
| def follow(user_id): |
| |
| user_to_follow = User.query.get(user_id) |
| if user_to_follow is None: |
| flash('User not found.', 'error') |
| return redirect(url_for('explore')) |
|
|
| if current_user.is_following(user_to_follow): |
| flash('You are already following this user.', 'info') |
| return redirect(url_for('view_profile', user_id=user_id)) |
|
|
| try: |
| current_user.follow(user_to_follow) |
| except Exception as e: |
| flash('Failed to follow the user. Please try again.', 'error') |
| print(f"Error: {str(e)}") |
| return redirect(url_for('view_profile', user_id=user_id)) |
|
|
| flash(f"You are now following {user_to_follow.username}.", 'success') |
| return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
|
| @app.route('/unfollow/<int:user_id>', methods=['POST']) |
| @login_required |
| def unfollow(user_id, page): |
| user_to_unfollow = User.query.get(user_id) |
| if user_to_unfollow is None: |
| flash('User not found.', 'danger') |
| return redirect(url_for('index')) |
|
|
| if current_user.is_following(user_to_unfollow): |
| current_user.unfollow(user_to_unfollow) |
| flash('You have unfollowed {}.'.format(user_to_unfollow.username), 'success') |
| else: |
| flash('You are not following this user.', 'info') |
| if page == 'profile': |
| return redirect(url_for('view_profile', user_id=user_id)) |
| elif page == 'search': |
| return redirect(url_for('search_results', query=request.form['query'])) |
| else: |
| return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
|
| @app.route('/account', methods=['GET', 'POST']) |
| @login_required |
| def account(): |
| if request.method == 'POST': |
| current_user.username = request.form['username'] |
| current_user.email = request.form['email'] |
| current_user.first_name = request.form['first_name'] |
| current_user.last_name = request.form['last_name'] |
| current_user.bio = request.form['bio'] |
| current_user.location = request.form['location'] |
| current_user.profession = request.form['profession'] |
|
|
| profile_picture = request.files['profile_picture'] |
| if profile_picture and allowed_file(profile_picture.filename): |
| filename = str(uuid.uuid4()) + secure_filename(profile_picture.filename) |
| profile_picture.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) |
|
|
| if current_user.profile_pic is not None and current_user.profile_pic != 'default.jpg': |
| old_profile_picture_path = os.path.join(app.config['UPLOAD_FOLDER'], current_user.profile_pic) |
| os.remove(old_profile_picture_path) |
|
|
| current_user.profile_pic = filename |
|
|
| new_password = request.form['new_password'] |
| if new_password: |
| current_user.password = generate_password_hash(new_password, method='scrypt') |
|
|
| db.session.commit() |
| flash('Profile updated successfully!', 'success') |
| return redirect(url_for('account')) |
|
|
| return render_template('account.html') |
|
|
| @app.route('/search_user', methods=['GET']) |
| def search_user(): |
| search_query = request.args.get('search_query', '') |
| users = User.query.filter( |
| |
| or_( |
| User.username.ilike(f'%{search_query}%'), |
| User.first_name.ilike(f'%{search_query}%'), |
| User.last_name.ilike(f'%{search_query}%'), |
| (User.first_name + User.last_name).like(f'%{search_query}%') |
| ) |
| ).all() |
|
|
|
|
| |
| csrf_token = generate_csrf() |
| return render_template('search_results.html', users=users, csrf_token=csrf_token, searchq=search_query ) |
|
|
| @app.route('/user/<int:user_id>') |
| @login_required |
| def view_profile(user_id): |
| user = User.query.get(user_id) |
| if not user: |
| flash('User not found.', 'error') |
| return redirect(url_for('search_user')) |
|
|
| posts = Post.query.filter_by(user_id=user.id).order_by(Post.created_at.desc()).all() |
| csrf_token = generate_csrf() |
| return render_template('user_profile.html', user=user, posts=posts, csrf_token=csrf_token, profile_user_id=user_id) |
|
|
| @app.route('/add_comment/<int:post_id>', methods=['POST']) |
| @login_required |
| def add_comment(post_id): |
| content = request.form['content'] |
| post = Post.query.get(post_id) |
| if not post: |
| flash('Post not found.', 'error') |
| return redirect(url_for('index')) |
| current_user.add_comment(post, content) |
|
|
| flash('Comment added successfully.', 'success') |
| return redirect(url_for('index')) |
|
|
| @login_required |
| @app.route('/like_post/<int:post_id>', methods=['POST']) |
| def like_post(post_id): |
| post = Post.query.get(post_id) |
| if not post: |
| flash('Post not found.', 'error') |
| return redirect(url_for('index')) |
|
|
| if current_user.has_liked_post(post): |
| current_user.unlike_post(post) |
| db.session.commit() |
| liked = False |
| else: |
| current_user.like_post(post) |
| db.session.commit() |
| liked = True |
|
|
| likes_count = post.likes_count |
| response_data = {'likes_count': likes_count, 'liked': liked} |
| return jsonify(response_data) |
|
|
|
|
| @app.route('/send_message', methods=['POST']) |
| @login_required |
| def send_message(): |
| recipient_username = request.form['recipient_username'] |
| message_content = request.form['message_content'] |
|
|
| recipient = User.query.filter_by(username=recipient_username).first() |
| if not recipient: |
| flash('Recipient not found.', 'error') |
| return redirect(url_for('conversations')) |
|
|
| new_message = Message(sender_id=current_user.id, recipient_id=recipient.id, content=message_content) |
| db.session.add(new_message) |
| db.session.commit() |
|
|
| flash('Message sent successfully!', 'success') |
| return redirect(url_for('conversation', recipient_id=recipient.id)) |
|
|
|
|
| @app.route('/conversations') |
| @login_required |
| def conversations(): |
| conversations = db.session.query(User).join( |
| Message, or_( |
| and_(Message.sender_id == current_user.id, Message.recipient_id == User.id), |
| and_(Message.sender_id == User.id, Message.recipient_id == current_user.id) |
| ) |
| ).distinct(User.id).all() |
|
|
| return render_template('conversations.html', conversations=conversations) |
|
|
| @app.route('/new_conversation', methods=['GET']) |
| @login_required |
| def new_conversation(): |
| return render_template('new_conversation.html') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| @app.route('/updb', methods=['GET']) |
| @login_required |
| def updbur(): |
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| |
| |
| session.close() |
| rp = loop.run_until_complete(updb()) |
| loop.close() |
| |
| if rp == '2': |
| return '200' |
| return 'no' |
|
|
|
|
|
|
|
|
| @app.route('/conversation/<int:recipient_id>', methods=['GET', 'POST']) |
| @login_required |
| def conversation(recipient_id): |
| recipient = User.query.get(recipient_id) |
| if not recipient: |
| flash('Recipient not found.', 'error') |
| return redirect(url_for('conversations')) |
|
|
| if request.method == 'POST': |
| message_content = request.form['message_content'] |
|
|
| new_message = Message(sender_id=current_user.id, recipient_id=recipient.id, content=message_content) |
| db.session.add(new_message) |
| db.session.commit() |
|
|
| flash('Message sent successfully!', 'success') |
| return redirect(url_for('conversation', recipient_id=recipient_id)) |
|
|
| messages = Message.query.filter(or_( |
| and_(Message.sender_id == current_user.id, Message.recipient_id == recipient_id), |
| and_(Message.sender_id == recipient_id, Message.recipient_id == current_user.id) |
| )).order_by(Message.timestamp).all() |
|
|
| return render_template('conversation.html', recipient=recipient, messages=messages) |
|
|
|
|
|
|
| def start_flask_app(): |
| with app.app_context(): |
| db.create_all() |
| app.run(debug=True, use_reloader=False, host="0.0.0.0", port=7860) |
|
|
| if __name__ == '__main__': |
| start_flask_app() |