2023-11-03 11:16:11 +00:00
import os
import yt_dlp
from collections import OrderedDict
import shelve
import time
import asyncio
import multiprocessing
import logging
import re
from dl_formats import get_format , get_opts , AUDIO_FORMATS
2023-11-16 13:16:00 +00:00
from datetime import datetime
2023-11-03 11:16:11 +00:00
log = logging . getLogger ( ' ytdl ' )
class DownloadQueueNotifier :
async def added ( self , dl ) :
raise NotImplementedError
async def updated ( self , dl ) :
raise NotImplementedError
async def completed ( self , dl ) :
raise NotImplementedError
async def canceled ( self , id ) :
raise NotImplementedError
async def cleared ( self , id ) :
raise NotImplementedError
class DownloadInfo :
2023-11-16 17:50:02 +00:00
def __init__ ( self , id , title , url , quality , format , folder , custom_name_prefix , error ) :
2023-11-03 11:16:11 +00:00
self . id = id if len ( custom_name_prefix ) == 0 else f ' { custom_name_prefix } . { id } '
self . title = title if len ( custom_name_prefix ) == 0 else f ' { custom_name_prefix } . { title } '
self . url = url
self . quality = quality
self . format = format
self . folder = folder
self . custom_name_prefix = custom_name_prefix
2023-12-09 04:35:31 +00:00
self . msg = self . percent = self . speed = self . eta = None
self . status = " pending "
2023-11-03 11:16:11 +00:00
self . timestamp = time . time_ns ( )
2023-11-16 12:54:36 +00:00
self . error = error
2023-11-03 11:16:11 +00:00
class Download :
manager = None
def __init__ ( self , download_dir , temp_dir , output_template , output_template_chapter , quality , format , ytdl_opts , info ) :
self . download_dir = download_dir
self . temp_dir = temp_dir
self . output_template = output_template
self . output_template_chapter = output_template_chapter
self . format = get_format ( format , quality )
self . ytdl_opts = get_opts ( format , quality , ytdl_opts )
self . info = info
self . canceled = False
self . tmpfilename = None
self . status_queue = None
self . proc = None
self . loop = None
self . notifier = None
2023-12-09 04:35:31 +00:00
2023-11-03 11:16:11 +00:00
def _download ( self ) :
try :
def put_status ( st ) :
self . status_queue . put ( { k : v for k , v in st . items ( ) if k in (
' tmpfilename ' ,
' filename ' ,
' status ' ,
' msg ' ,
' total_bytes ' ,
' total_bytes_estimate ' ,
' downloaded_bytes ' ,
' speed ' ,
' eta ' ,
) } )
def put_status_postprocessor ( d ) :
if d [ ' postprocessor ' ] == ' MoveFiles ' and d [ ' status ' ] == ' finished ' :
if ' __finaldir ' in d [ ' info_dict ' ] :
filename = os . path . join ( d [ ' info_dict ' ] [ ' __finaldir ' ] , os . path . basename ( d [ ' info_dict ' ] [ ' filepath ' ] ) )
else :
filename = d [ ' info_dict ' ] [ ' filepath ' ]
self . status_queue . put ( { ' status ' : ' finished ' , ' filename ' : filename } )
ret = yt_dlp . YoutubeDL ( params = {
' quiet ' : True ,
' no_color ' : True ,
#'skip_download': True,
' paths ' : { " home " : self . download_dir , " temp " : self . temp_dir } ,
' outtmpl ' : { " default " : self . output_template , " chapter " : self . output_template_chapter } ,
' format ' : self . format ,
' socket_timeout ' : 30 ,
2023-11-16 12:54:36 +00:00
' ignore_no_formats_error ' : True ,
2023-11-03 11:16:11 +00:00
' progress_hooks ' : [ put_status ] ,
' postprocessor_hooks ' : [ put_status_postprocessor ] ,
* * self . ytdl_opts ,
} ) . download ( [ self . info . url ] )
self . status_queue . put ( { ' status ' : ' finished ' if ret == 0 else ' error ' } )
except yt_dlp . utils . YoutubeDLError as exc :
self . status_queue . put ( { ' status ' : ' error ' , ' msg ' : str ( exc ) } )
async def start ( self , notifier ) :
if Download . manager is None :
Download . manager = multiprocessing . Manager ( )
self . status_queue = Download . manager . Queue ( )
self . proc = multiprocessing . Process ( target = self . _download )
self . proc . start ( )
self . loop = asyncio . get_running_loop ( )
self . notifier = notifier
self . info . status = ' preparing '
await self . notifier . updated ( self . info )
asyncio . create_task ( self . update_status ( ) )
return await self . loop . run_in_executor ( None , self . proc . join )
def cancel ( self ) :
if self . running ( ) :
self . proc . kill ( )
self . canceled = True
def close ( self ) :
if self . started ( ) :
self . proc . close ( )
self . status_queue . put ( None )
def running ( self ) :
try :
return self . proc is not None and self . proc . is_alive ( )
except ValueError :
return False
def started ( self ) :
return self . proc is not None
async def update_status ( self ) :
while True :
status = await self . loop . run_in_executor ( None , self . status_queue . get )
if status is None :
return
self . tmpfilename = status . get ( ' tmpfilename ' )
if ' filename ' in status :
self . info . filename = os . path . relpath ( status . get ( ' filename ' ) , self . download_dir )
# Set correct file extension for thumbnails
if ( self . info . format == ' thumbnail ' ) :
self . info . filename = re . sub ( r ' \ .webm$ ' , ' .jpg ' , self . info . filename )
self . info . status = status [ ' status ' ]
self . info . msg = status . get ( ' msg ' )
if ' downloaded_bytes ' in status :
total = status . get ( ' total_bytes ' ) or status . get ( ' total_bytes_estimate ' )
if total :
self . info . percent = status [ ' downloaded_bytes ' ] / total * 100
self . info . speed = status . get ( ' speed ' )
self . info . eta = status . get ( ' eta ' )
await self . notifier . updated ( self . info )
class PersistentQueue :
def __init__ ( self , path ) :
pdir = os . path . dirname ( path )
if not os . path . isdir ( pdir ) :
os . mkdir ( pdir )
with shelve . open ( path , ' c ' ) :
pass
self . path = path
self . dict = OrderedDict ( )
def load ( self ) :
for k , v in self . saved_items ( ) :
self . dict [ k ] = Download ( None , None , None , None , None , None , { } , v )
def exists ( self , key ) :
return key in self . dict
def get ( self , key ) :
return self . dict [ key ]
def items ( self ) :
return self . dict . items ( )
def saved_items ( self ) :
with shelve . open ( self . path , ' r ' ) as shelf :
return sorted ( shelf . items ( ) , key = lambda item : item [ 1 ] . timestamp )
def put ( self , value ) :
key = value . info . url
self . dict [ key ] = value
with shelve . open ( self . path , ' w ' ) as shelf :
shelf [ key ] = value . info
def delete ( self , key ) :
del self . dict [ key ]
with shelve . open ( self . path , ' w ' ) as shelf :
shelf . pop ( key )
def next ( self ) :
k , v = next ( iter ( self . dict . items ( ) ) )
return k , v
def empty ( self ) :
return not bool ( self . dict )
class DownloadQueue :
def __init__ ( self , config , notifier ) :
self . config = config
self . notifier = notifier
self . queue = PersistentQueue ( self . config . STATE_DIR + ' /queue ' )
self . done = PersistentQueue ( self . config . STATE_DIR + ' /completed ' )
2023-12-09 04:35:31 +00:00
self . pending = PersistentQueue ( self . config . STATE_DIR + ' /pending ' )
2023-11-03 11:16:11 +00:00
self . done . load ( )
async def __import_queue ( self ) :
for k , v in self . queue . saved_items ( ) :
await self . add ( v . url , v . quality , v . format , v . folder , v . custom_name_prefix )
async def initialize ( self ) :
self . event = asyncio . Event ( )
asyncio . create_task ( self . __download ( ) )
asyncio . create_task ( self . __import_queue ( ) )
def __extract_info ( self , url ) :
return yt_dlp . YoutubeDL ( params = {
' quiet ' : True ,
' no_color ' : True ,
' extract_flat ' : True ,
2023-11-16 12:54:36 +00:00
' ignore_no_formats_error ' : True ,
2024-01-02 10:05:51 +00:00
' paths ' : { " home " : self . config . DOWNLOAD_DIR , " temp " : self . config . TEMP_DIR } ,
2023-11-03 11:16:11 +00:00
* * self . config . YTDL_OPTIONS ,
} ) . extract_info ( url , download = False )
def __calc_download_path ( self , quality , format , folder ) :
""" Calculates download path from quality, format and folder attributes.
Returns :
Tuple dldirectory , error_message both of which might be None ( but not at the same time )
"""
# Keep consistent with frontend
base_directory = self . config . DOWNLOAD_DIR if ( quality != ' audio ' and format not in AUDIO_FORMATS ) else self . config . AUDIO_DOWNLOAD_DIR
if folder :
if not self . config . CUSTOM_DIRS :
return None , { ' status ' : ' error ' , ' msg ' : f ' A folder for the download was specified but CUSTOM_DIRS is not true in the configuration. ' }
dldirectory = os . path . realpath ( os . path . join ( base_directory , folder ) )
real_base_directory = os . path . realpath ( base_directory )
if not dldirectory . startswith ( real_base_directory ) :
return None , { ' status ' : ' error ' , ' msg ' : f ' Folder " { folder } " must resolve inside the base download directory " { real_base_directory } " ' }
if not os . path . isdir ( dldirectory ) :
if not self . config . CREATE_CUSTOM_DIRS :
return None , { ' status ' : ' error ' , ' msg ' : f ' Folder " { folder } " for download does not exist inside base directory " { real_base_directory } " , and CREATE_CUSTOM_DIRS is not true in the configuration. ' }
os . makedirs ( dldirectory , exist_ok = True )
else :
dldirectory = base_directory
return dldirectory , None
2023-12-09 04:35:31 +00:00
async def __add_entry ( self , entry , quality , format , folder , custom_name_prefix , auto_start , already ) :
2023-11-03 11:16:11 +00:00
if not entry :
return { ' status ' : ' error ' , ' msg ' : " Invalid/empty data was given. " }
2023-11-16 12:54:36 +00:00
error = None
if " live_status " in entry and " release_timestamp " in entry and entry . get ( " live_status " ) == " is_upcoming " :
2023-11-16 13:16:00 +00:00
dt_ts = datetime . fromtimestamp ( entry . get ( " release_timestamp " ) ) . strftime ( ' % Y- % m- %d % H: % M: % S % z ' )
error = f " Live stream is scheduled to start at { dt_ts } "
2023-11-16 12:54:36 +00:00
else :
if " msg " in entry :
error = entry [ " msg " ]
2023-11-03 11:16:11 +00:00
etype = entry . get ( ' _type ' ) or ' video '
if etype == ' playlist ' :
entries = entry [ ' entries ' ]
log . info ( f ' playlist detected with { len ( entries ) } entries ' )
playlist_index_digits = len ( str ( len ( entries ) ) )
results = [ ]
for index , etr in enumerate ( entries , start = 1 ) :
etr [ " playlist " ] = entry [ " id " ]
etr [ " playlist_index " ] = ' {{ 0:0 {0:d} d}} ' . format ( playlist_index_digits ) . format ( index )
for property in ( " id " , " title " , " uploader " , " uploader_id " ) :
if property in entry :
etr [ f " playlist_ { property } " ] = entry [ property ]
2023-12-09 04:35:31 +00:00
results . append ( await self . __add_entry ( etr , quality , format , folder , custom_name_prefix , auto_start , already ) )
2023-11-03 11:16:11 +00:00
if any ( res [ ' status ' ] == ' error ' for res in results ) :
return { ' status ' : ' error ' , ' msg ' : ' , ' . join ( res [ ' msg ' ] for res in results if res [ ' status ' ] == ' error ' and ' msg ' in res ) }
return { ' status ' : ' ok ' }
elif etype == ' video ' or etype . startswith ( ' url ' ) and ' id ' in entry and ' title ' in entry :
if not self . queue . exists ( entry [ ' id ' ] ) :
2023-11-16 12:54:36 +00:00
dl = DownloadInfo ( entry [ ' id ' ] , entry [ ' title ' ] , entry . get ( ' webpage_url ' ) or entry [ ' url ' ] , quality , format , folder , custom_name_prefix , error )
2023-11-03 11:16:11 +00:00
dldirectory , error_message = self . __calc_download_path ( quality , format , folder )
if error_message is not None :
return error_message
output = self . config . OUTPUT_TEMPLATE if len ( custom_name_prefix ) == 0 else f ' { custom_name_prefix } . { self . config . OUTPUT_TEMPLATE } '
output_chapter = self . config . OUTPUT_TEMPLATE_CHAPTER
for property , value in entry . items ( ) :
if property . startswith ( " playlist " ) :
output = output . replace ( f " %( { property } )s " , str ( value ) )
2023-12-09 04:35:31 +00:00
if auto_start is True :
self . queue . put ( Download ( dldirectory , self . config . TEMP_DIR , output , output_chapter , quality , format , self . config . YTDL_OPTIONS , dl ) )
self . event . set ( )
else :
self . pending . put ( Download ( dldirectory , self . config . TEMP_DIR , output , output_chapter , quality , format , self . config . YTDL_OPTIONS , dl ) )
2023-11-03 11:16:11 +00:00
await self . notifier . added ( dl )
return { ' status ' : ' ok ' }
elif etype . startswith ( ' url ' ) :
2023-12-09 04:35:31 +00:00
return await self . add ( entry [ ' url ' ] , quality , format , folder , custom_name_prefix , auto_start , already )
2023-11-03 11:16:11 +00:00
return { ' status ' : ' error ' , ' msg ' : f ' Unsupported resource " { etype } " ' }
2023-12-09 04:35:31 +00:00
async def add ( self , url , quality , format , folder , custom_name_prefix , auto_start = True , already = None ) :
2023-11-03 11:16:11 +00:00
log . info ( f ' adding { url } : { quality =} { format =} { already =} { folder =} { custom_name_prefix =} ' )
already = set ( ) if already is None else already
if url in already :
log . info ( ' recursion detected, skipping ' )
return { ' status ' : ' ok ' }
else :
already . add ( url )
try :
entry = await asyncio . get_running_loop ( ) . run_in_executor ( None , self . __extract_info , url )
except yt_dlp . utils . YoutubeDLError as exc :
return { ' status ' : ' error ' , ' msg ' : str ( exc ) }
2023-12-09 04:35:31 +00:00
return await self . __add_entry ( entry , quality , format , folder , custom_name_prefix , auto_start , already )
async def start_pending ( self , ids ) :
for id in ids :
if not self . pending . exists ( id ) :
log . warn ( f ' requested start for non-existent download { id } ' )
continue
dl = self . pending . get ( id )
self . queue . put ( dl )
self . pending . delete ( id )
self . event . set ( )
return { ' status ' : ' ok ' }
2023-11-03 11:16:11 +00:00
async def cancel ( self , ids ) :
for id in ids :
if not self . queue . exists ( id ) :
log . warn ( f ' requested cancel for non-existent download { id } ' )
continue
if self . queue . get ( id ) . started ( ) :
self . queue . get ( id ) . cancel ( )
else :
self . queue . delete ( id )
await self . notifier . canceled ( id )
return { ' status ' : ' ok ' }
async def clear ( self , ids ) :
for id in ids :
if not self . done . exists ( id ) :
log . warn ( f ' requested delete for non-existent download { id } ' )
continue
if self . config . DELETE_FILE_ON_TRASHCAN :
dl = self . done . get ( id )
try :
dldirectory , _ = self . __calc_download_path ( dl . info . quality , dl . info . format , dl . info . folder )
os . remove ( os . path . join ( dldirectory , dl . info . filename ) )
except Exception as e :
log . warn ( f ' deleting file for download { id } failed with error message { e !r} ' )
self . done . delete ( id )
await self . notifier . cleared ( id )
return { ' status ' : ' ok ' }
def get ( self ) :
2023-12-09 04:35:31 +00:00
return ( list ( ( k , v . info ) for k , v in self . queue . items ( ) ) + list ( ( k , v . info ) for k , v in self . pending . items ( ) ) ,
2023-11-03 11:16:11 +00:00
list ( ( k , v . info ) for k , v in self . done . items ( ) ) )
async def __download ( self ) :
while True :
while self . queue . empty ( ) :
log . info ( ' waiting for item to download ' )
await self . event . wait ( )
self . event . clear ( )
id , entry = self . queue . next ( )
log . info ( f ' downloading { entry . info . title } ' )
await entry . start ( self . notifier )
if entry . info . status != ' finished ' :
if entry . tmpfilename and os . path . isfile ( entry . tmpfilename ) :
try :
os . remove ( entry . tmpfilename )
except :
pass
entry . info . status = ' error '
entry . close ( )
if self . queue . exists ( id ) :
self . queue . delete ( id )
if entry . canceled :
await self . notifier . canceled ( id )
else :
self . done . put ( entry )
await self . notifier . completed ( entry . info )