Twitter API (+ Tweepy)

Author

A demo by Breanna E. Green // Powered by Quarto

#Imports and Installs

import json
from   collections import defaultdict
from   glob import glob
import pandas as pd
import numpy as np
import os
import string
import sys
import tweepy 

try:
    import lzma
except ImportError:
    from backports import lzma

You will need your own keys here!

#Tweepy
# Exmaple code from: (https://www.geeksforgeeks.org/python-api-lookup_users-in-tweepy/)

# assign the values accordingly 
consumer_key = "LETTERSANDNUMBERS" 
consumer_secret = "LETTERSANDNUMBERS" 
access_token = "LETTERSANDNUMBERS" 
access_token_secret = "LETTERSANDNUMBERS" 
  
# authorization of consumer key and consumer secret 
auth = tweepy.OAuthHandler(consumer_key, consumer_secret) 
  
# set access to user's access key and access secret  
auth.set_access_token(access_token, access_token_secret) 
  
# calling the api  
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True) 
## Test that this worked! (It does, keep it commented out)

print("We can find users by userid: \n")
# list of user_ids 
user_ids = [1188495786801278981, 17369110,  254791849]
  
# getting the users by user_ids 
users = api.lookup_users(user_ids) 
  
# printing the user details 
for user in users: 
    print("The id by twitter id is : " + str(user.id)) 
    print("The screen name by twitter id is : " + user.screen_name, end = "\n\n")


print('\n', '___'*25, '\n') 
    
    
print("Or we can find users by user handle: \n")
# list of screen_names 
handles = ["BreannaEGreen", "Cornell", "SciPyTip"] 
  
# getting the users by screen_names 
users = api.lookup_users(screen_names = handles) 
  
# printing the user details 
for user in users: 
    print("The id by handle is : " + str(user.id)) 
    print("The screen name by handle is : " + user.screen_name, end = "\n\n") 

Ok, so we know that the above code chunk works. Nice!

Let’s work on somegthing a bit more detailed. The below code requires a list of Twitter IDs that you’re interesting in pulling tweets from – similar to the code chunk above:

# list of user_ids 
user_ids = [1188495786801278981, 17369110,  254791849]

The below code assumes you’ve received more advanced API access – researcher access. You might need to make some changes to the code depending on the level of access you have. I’ve commented notes within this code indcating what I’m doing and why.

Happy Coding!

### Full Archive Search

### Pick an endpoint you are pulling code from... Here we search all tweets 
archive_search_url = "https://api.twitter.com/2/tweets/search/all"

### This is when Twitter began! March 25, 2006. I want to pull as early as I possibly can, so why not start near the day the website launched?
twitter_start = datetime(2006, 3, 25).isoformat()+'Z'
now = datetime.now()

### Define some useful functions

### You bearer token can be found where you pulled you tokens

bearer_token="LETTERSANDNUMBERS" ### put your bearer token here!

def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """

    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2FullArchiveSearchPython"
    return r

### connect to your endpoint using parameters you set and return a json snippet
def connect_to_endpoint(url, params):
    response = requests.request("GET", archive_search_url, auth=bearer_oauth, params=params)
#     print(response.status_code)
    if response.status_code != 200:
        print(response.headers)
        raise Exception(response.status_code, response.text)
    return response.json()


### main function that uses the above functions, as well as the paramters you want
def main(id_):

    fileDir = os.path.dirname(os.path.realpath('__file__')) ### the current path that you're working in
    store = fileDir.replace("_Notebooks", "_Data")+'/TLs_subset_1500/' ### name of folder where you want to store these results

    
    json_file_name = store + now.strftime("%Y_%m_%d") + '_' + str(id_) + '.json' ### json file name as DATE_USERID_.json
    log_name = store + "log_TLs_1500"+now.strftime("%Y_%m_%d")+".text" ### creating a log of what was pulled in case somethign breaks (which happens!)
    
    search_archive_done = False
        
    # Loop for pull
    while (search_archive_done == False):
        with open(json_file_name, "w") as file:
            time.sleep(5)
            query_params = {'query': '(from:{} OR to:{}) lang:en -is:retweet'.format(id_, id_),
            'tweet.fields': 'created_at,public_metrics,'
                                    'attachments,'
                                    'conversation_id,context_annotations,'
                                    'entities,geo,id,'
                                    'in_reply_to_user_id,lang,'
                                    'possibly_sensitive,reply_settings,'
                                    'referenced_tweets,'
                                    'source,text,withheld',
            'start_time': twitter_start,
            "user.fields": 'created_at,description,entities,id,location,name,' 
                                'profile_image_url,protected,public_metrics,url,username,verified,withheld',
            "place.fields":'contained_within,country,country_code,full_name,geo,id,name,place_type',
            'expansions':'author_id,referenced_tweets.id,in_reply_to_user_id,'
                                'attachments.media_keys,attachments.poll_ids,geo.place_id,'
                                'entities.mentions.username,referenced_tweets.id.author_id', 
            'max_results': 100,
            'next_token': None
            }
            json_response_ids = connect_to_endpoint(archive_search_url, query_params)
            if "result_count" in json_response_ids.get("meta", {}).keys():
                result_count = json_response_ids["meta"]["result_count"]
                if result_count > 0:
                    json.dump(json_response_ids, file, sort_keys=True,indent=2, separators=(',', ': '))
                    while "next_token" in json_response_ids.get("meta", {}).keys():
                        file.write(",\n")
                        next_token = json_response_ids["meta"]["next_token"]
                        new_param = {"next_token": next_token}
                        query_params.update(new_param)
                        time.sleep(4.2)
                        json_response_ids = connect_to_endpoint(archive_search_url, query_params)
                        json.dump(json_response_ids, file, sort_keys=True,indent=2, separators=(',', ': '))
                    else:
                        search_archive_done = True
                        file.write(",\n")
#                         print('{} tweets finished pull at {}'.format(id_, datetime.now()))
                        pass
                else:
                    search_archive_done = True
                    print('{} has no tweets, is private, or is suspended. finished pull at {}'.format(id_, datetime.now()))
                    break
            else:
                search_archive_done = True
                print('{} is likely private or suspended.'.format(id_))
                break
       
    file.close()
    search_archive_done = True
    print('{} tweets finished pull at {}'.format(id_, datetime.now()))
    time.sleep(3)
    
    with open(log_name, "a") as log:
        log.write('{} finished pull at {}.\n'.format(id_, datetime.now()))
    log.close()
                

if __name__ == "__main__":
    for idx in userid_list: ### userid_list is the list of userids you're interested in pulling tweets from!
        main(str(idx))

Home | Return to Twitter Scraping Demo Home Page | << Set Up Twitter API | Requests and BeautifulSoup >>