#!/usr/bin/python3 from twitter.twitter_utils import parse_media_file def archive_media(media_url): """ Parse the media file from the URL using Twitter API's parse_media_file method, then write the resulting temporary file to data/[filename]. Arguments: media_url {str} -- String noting the URL of the uploaded media file Returns: str -- Resulting filename """ temp_media_file, filename, size, media_type = parse_media_file( media_url) with open(f'data/{filename}', 'bw+') as archive_file: archive_file.writelines(temp_media_file.readlines()) archive_file.close() temp_media_file.close() return filename def filter_out_bad_content_type(video_variant_list): """ Helper method to filter out unusable video variants from the list Arguments: video_variant_list {list} -- List of video variant definitions from the tweet Returns: list -- Resulting filtered video variant list """ filtered_variant_list = [] for video in video_variant_list: if(video['content_type'] == 'video/mp4'): filtered_variant_list.append(video) return filtered_variant_list def select_video_variant(video_variant_list): """ Iterate the video variants and pick the highest quality variant Arguments: video_variant_list {list} -- List of video variant definitions from the tweet Returns: str -- Resulting URL for the highest quality video variant """ filtered_variant_list = filter_out_bad_content_type(video_variant_list) highest_bitrate_variant = filtered_variant_list[0] for video in filtered_variant_list: if(highest_bitrate_variant['bitrate'] < video['bitrate']): highest_bitrate_variant = video return highest_bitrate_variant['url'] def archive_media_status(status): """ Locally archive any media from the given Tweepy API "Status" object Arguments: status {obj} -- Tweepy API "Status" object Returns: str -- None, or a list of filenames of the media archived locally """ try: if(not ('retweeted_status' in status._json.keys()) and not (status.in_reply_to_status_id) and not (status.in_reply_to_user_id)): if('extended_tweet' in status._json.keys()): if('extended_entities' in status.extended_tweet.keys()): if('media' in status.extended_tweet['extended_entities'].keys()): archive_filenames = [] for media_dict in status.extended_tweet['extended_entities']['media']: if(media_dict['type'] == 'photo'): archive_filenames.append( archive_media(media_dict['media_url'])) elif(media_dict['type'] == 'video'): archive_filenames.append( archive_media(select_video_variant(media_dict['video_info']['variants']))) return archive_filenames elif('extended_entities' in status._json.keys()): if('media' in status.extended_entities.keys()): archive_filenames = [] for media_dict in status.extended_entities['media']: if(media_dict['type'] == 'photo'): archive_filenames.append( archive_media(media_dict['media_url'])) elif(media_dict['type'] == 'video'): archive_filenames.append( archive_media(select_video_variant(media_dict['video_info']['variants']))) return archive_filenames except Exception as e: print(e) return ''