| import tweepy |
| import datetime |
| import os |
|
|
| def get_tweets_from_user_list_v2(user_list, client, tweets_per_user=10): |
| """ |
| Retrieves the latest tweets from a list of Twitter users within the last 24 hours using Tweepy API v2. |
| |
| Args: |
| user_list (list): A list of Twitter usernames (strings). |
| client (tweepy.Client): An authenticated Tweepy Client object. |
| tweets_per_user (int): The maximum number of tweets to retrieve per user. |
| |
| Returns: |
| list: A list of tweet objects (dict) that are within the last 24 hours. The structure of the tweet objects is different in v2. |
| Returns an empty list if there are errors. |
| """ |
|
|
| all_tweets = [] |
| now = datetime.datetime.now(datetime.timezone.utc) |
| yesterday = now - datetime.timedelta(days=1) |
|
|
| for username in user_list: |
| try: |
| |
| user = client.get_user(username=username) |
| if user.errors: |
| print(f"Error getting user ID for {username}: {user.errors}") |
| continue |
|
|
| user_id = user.data['id'] |
|
|
| |
| response = client.get_users_tweets( |
| id=user_id, |
| max_results=tweets_per_user, |
| tweet_fields=["created_at", "text"], |
| ) |
|
|
| if response.errors: |
| print(f"Error fetching tweets for {username} (ID: {user_id}): {response.errors}") |
| continue |
|
|
| tweets = response.data |
|
|
| if tweets: |
| for tweet in tweets: |
| tweet_time = tweet['created_at'] |
|
|
| if tweet_time >= yesterday: |
| all_tweets.append(tweet) |
| else: |
| break |
| except Exception as e: |
| print(f"Unexpected error fetching tweets for {username}: {e}") |
| continue |
|
|
| return all_tweets |
|
|
|
|
| def authenticate_v2(): |
| """Authenticates with the Twitter API v2 using a Bearer Token. |
| |
| Returns: |
| tweepy.Client: An authenticated Tweepy Client object, or None if authentication fails. |
| """ |
| bearer_token = (f"AAAAAAAAAAAAAAAAAAAAAKmVzAEAAAAAhVM%2BPoJytUvDoQ%2FgIGfLdJXdocU%3DvS5ZF10O60kLJcY6MGiErCDHV7awcUKB75QqWpIlKh84rK5CFw") |
|
|
| if not bearer_token: |
| print("Error: Missing Twitter Bearer Token. Please set the environment variable.") |
| return None |
|
|
| try: |
| client = tweepy.Client(bearer_token) |
| |
| user = client.get_me() |
| if user.errors: |
| print(f"Authentication check failed: {user.errors}") |
| return None |
|
|
| print("Authentication successful (API v2)!") |
| return client |
| except Exception as e: |
| print(f"Authentication failed (API v2): {e}") |
| return None |
|
|
|
|
| if __name__ == '__main__': |
| |
| client = authenticate_v2() |
|
|
| if client is None: |
| print("Exiting due to authentication failure.") |
| exit() |
|
|
| |
| user_list = ["dacefupan","aiwangupiao"] |
| tweets = get_tweets_from_user_list_v2(user_list, client, tweets_per_user=5) |
|
|
| if tweets: |
| print(f"Found {len(tweets)} tweets from the last 24 hours:") |
| for tweet in tweets: |
| print(f"Text: {tweet['text']}\n") |
| print(f"Tweeted at: {tweet['created_at']}\n") |
| else: |
| print("No tweets found in the last 24 hours or an error occurred.") |