mirror of
https://github.com/krateng/maloja.git
synced 2025-05-20 00:31:20 +03:00
Unified range generation and moved all time-related methods to appropriate module
This commit is contained in:
parent
0c42531218
commit
2468b8d706
252
database.py
252
database.py
@ -179,14 +179,14 @@ def get_scrobbles(**keys):
|
||||
|
||||
|
||||
|
||||
# DEPRECATED
|
||||
# UNUSED
|
||||
@dbserver.route("/amounts")
|
||||
def get_amounts_external():
|
||||
return get_amounts() #really now
|
||||
|
||||
def get_amounts():
|
||||
return {"scrobbles":len(SCROBBLES),"tracks":len(TRACKS),"artists":len(ARTISTS)}
|
||||
# UNUSED
|
||||
#@dbserver.route("/amounts")
|
||||
#def get_amounts_external():
|
||||
# return get_amounts() #really now
|
||||
#
|
||||
#def get_amounts():
|
||||
# return {"scrobbles":len(SCROBBLES),"tracks":len(TRACKS),"artists":len(ARTISTS)}
|
||||
|
||||
|
||||
@dbserver.route("/numscrobbles")
|
||||
@ -204,16 +204,16 @@ def get_scrobbles_num(**keys):
|
||||
r = db_query(**{k:keys[k] for k in keys if k in ["artist","track","artists","title","since","to","within","associated"]})
|
||||
return len(r)
|
||||
|
||||
# DEPRECATED
|
||||
# UNUSED
|
||||
@dbserver.route("/charts")
|
||||
def get_charts_external():
|
||||
keys = FormsDict.decode(request.query)
|
||||
ckeys = {}
|
||||
ckeys["since"], ckeys["to"], ckeys["within"] = keys.get("since"), keys.get("to"), keys.get("in")
|
||||
|
||||
result = get_scrobbles_num(**ckeys)
|
||||
return {"number":result}
|
||||
# UNUSED
|
||||
#@dbserver.route("/charts")
|
||||
#def get_charts_external():
|
||||
# keys = FormsDict.decode(request.query)
|
||||
# ckeys = {}
|
||||
# ckeys["since"], ckeys["to"], ckeys["within"] = keys.get("since"), keys.get("to"), keys.get("in")
|
||||
#
|
||||
# result = get_scrobbles_num(**ckeys)
|
||||
# return {"number":result}
|
||||
|
||||
#def get_charts(**keys):
|
||||
# return db_aggregate(**{k:keys[k] for k in keys if k in ["since","to","within"]})
|
||||
@ -315,26 +315,14 @@ def get_pulse_external():
|
||||
results = get_pulse(**ckeys)
|
||||
return {"list":results}
|
||||
|
||||
def get_pulse(step="month",stepn=1,trail=1,**keys):
|
||||
|
||||
(ts_start,ts_end) = time_stamps(**{k:keys[k] for k in keys if k in ["since","to","within"]})
|
||||
d_start = getStartOf(ts_start,step)
|
||||
d_end = getStartOf(ts_end,step)
|
||||
d_start = getNext(d_start,step,stepn) # first range should end right after the first active scrobbling week / month / whatever relevant step
|
||||
d_start = getNext(d_start,step,stepn * trail * -1) # go one range back to begin
|
||||
def get_pulse(**keys):
|
||||
|
||||
rngs = ranges(**{k:keys[k] for k in keys if k in ["since","to","within","step","stepn","trail"]})
|
||||
results = []
|
||||
|
||||
d_current = d_start
|
||||
while True:
|
||||
#d_current_end = getNext(d_current,step,stepn * trail)
|
||||
d_current_end = getEnd(d_current,step,stepn * trail)
|
||||
#res = db_aggregate(since=d_current,to=d_current_end)
|
||||
res = len(db_query(since=d_current,to=d_current_end,**{k:keys[k] for k in keys if k in ["artists","artist","track","title","associated"]}))
|
||||
results.append({"from":d_current,"to":d_current_end,"scrobbles":res})
|
||||
d_current = getNext(d_current,step,stepn)
|
||||
if isPast(d_current,d_end):
|
||||
break
|
||||
for (a,b) in rngs:
|
||||
res = len(db_query(since=a,to=b,**{k:keys[k] for k in keys if k in ["artists","artist","track","title","associated"]}))
|
||||
results.append({"from":a,"to":b,"scrobbles":res})
|
||||
|
||||
return results
|
||||
|
||||
@ -359,27 +347,17 @@ def get_top_artists_external():
|
||||
results = get_top_artists(**ckeys)
|
||||
return {"list":results}
|
||||
|
||||
def get_top_artists(step="month",stepn=1,trail=3,**keys):
|
||||
|
||||
(ts_start,ts_end) = time_stamps(**{k:keys[k] for k in keys if k in ["since","to","within"]})
|
||||
d_start = getStartOf(ts_start,step)
|
||||
d_end = getStartOf(ts_end,step)
|
||||
d_start = getNext(d_start,step,stepn) # first range should end right after the first active scrobbling week / month / whatever relevant step
|
||||
d_start = getNext(d_start,step,stepn * trail * -1) # go one range back to begin
|
||||
def get_top_artists(**keys):
|
||||
|
||||
rngs = ranges(**{k:keys[k] for k in keys if k in ["since","to","within","step","stepn","trail"]})
|
||||
results = []
|
||||
|
||||
d_current = d_start
|
||||
while True:
|
||||
d_current_end = getNext(d_current,step,stepn * trail)
|
||||
for (a,b) in rngs:
|
||||
try:
|
||||
res = db_aggregate(since=d_current,to=d_current_end,by="ARTIST")[0]
|
||||
results.append({"from":d_current,"to":d_current_end,"artist":res["artist"],"scrobbles":res["scrobbles"]})
|
||||
res = db_aggregate(since=a,to=b,by="ARTIST")[0]
|
||||
results.append({"from":a,"to":b,"artist":res["artist"],"scrobbles":res["scrobbles"]})
|
||||
except:
|
||||
results.append({"from":d_current,"to":d_current_end,"artist":None,"scrobbles":0})
|
||||
d_current = getNext(d_current,step,stepn)
|
||||
if isPast(d_current,d_end):
|
||||
break
|
||||
results.append({"from":a,"to":b,"artist":None,"scrobbles":0})
|
||||
|
||||
return results
|
||||
|
||||
@ -405,28 +383,17 @@ def get_top_tracks_external():
|
||||
results = get_top_tracks(**ckeys)
|
||||
return {"list":results}
|
||||
|
||||
def get_top_tracks(step="month",stepn=1,trail=3,**keys):
|
||||
|
||||
(ts_start,ts_end) = time_stamps(**{k:keys[k] for k in keys if k in ["since","to","within"]})
|
||||
d_start = getStartOf(ts_start,step)
|
||||
d_end = getStartOf(ts_end,step)
|
||||
d_start = getNext(d_start,step,stepn) # first range should end right after the first active scrobbling week / month / whatever relevant step
|
||||
d_start = getNext(d_start,step,stepn * trail * -1) # go one range back to begin
|
||||
|
||||
def get_top_tracks(**keys):
|
||||
|
||||
rngs = ranges(**{k:keys[k] for k in keys if k in ["since","to","within","step","stepn","trail"]})
|
||||
results = []
|
||||
|
||||
d_current = d_start
|
||||
while True:
|
||||
d_current_end = getNext(d_current,step,stepn * trail)
|
||||
for (a,b) in rngs:
|
||||
try:
|
||||
res = db_aggregate(since=d_current,to=d_current_end,by="TRACK")[0]
|
||||
results.append({"from":d_current,"to":d_current_end,"track":res["track"],"scrobbles":res["scrobbles"]})
|
||||
res = db_aggregate(since=a,to=b,by="TRACK")[0]
|
||||
results.append({"from":a,"to":b,"track":res["track"],"scrobbles":res["scrobbles"]})
|
||||
except:
|
||||
results.append({"from":d_current,"to":d_current_end,"track":None,"scrobbles":0})
|
||||
d_current = getNext(d_current,step,stepn)
|
||||
if isPast(d_current,d_end):
|
||||
break
|
||||
results.append({"from":a,"to":b,"track":None,"scrobbles":0})
|
||||
|
||||
return results
|
||||
|
||||
@ -440,71 +407,6 @@ def get_top_tracks(step="month",stepn=1,trail=3,**keys):
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def getStartOf(timestamp,unit):
|
||||
date = datetime.datetime.utcfromtimestamp(timestamp)
|
||||
if unit == "year":
|
||||
#return [date.year,1,1]
|
||||
return [date.year]
|
||||
elif unit == "month":
|
||||
#return [date.year,date.month,1]
|
||||
return [date.year,date.month]
|
||||
elif unit == "day":
|
||||
return [date.year,date.month,date.day]
|
||||
elif unit == "week":
|
||||
change = (date.weekday() + 1) % 7
|
||||
d = datetime.timedelta(days=change)
|
||||
newdate = date - d
|
||||
return [newdate.year,newdate.month,newdate.day]
|
||||
|
||||
def getNext(time,unit="auto",step=1):
|
||||
result = time[:]
|
||||
if unit == "auto":
|
||||
# see how long the list is, increment by the last specified unit
|
||||
unit = [None,"year","month","day"][len(time)]
|
||||
#while len(time) < 3:
|
||||
# time.append(1)
|
||||
|
||||
if unit == "year":
|
||||
#return [time[0] + step,time[1],time[2]]
|
||||
result[0] += step
|
||||
return result
|
||||
elif unit == "month":
|
||||
#result = [time[0],time[1] + step,time[2]]
|
||||
result[1] += step
|
||||
while result[1] > 12:
|
||||
result[1] -= 12
|
||||
result[0] += 1
|
||||
while result[1] < 1:
|
||||
result[1] += 12
|
||||
result[0] -= 1
|
||||
return result
|
||||
elif unit == "day":
|
||||
dt = datetime.datetime(time[0],time[1],time[2])
|
||||
d = datetime.timedelta(days=step)
|
||||
newdate = dt + d
|
||||
return [newdate.year,newdate.month,newdate.day]
|
||||
#eugh
|
||||
elif unit == "week":
|
||||
return getNext(time,"day",step * 7)
|
||||
|
||||
# like getNext(), but gets the last INCLUDED day / month whatever
|
||||
def getEnd(time,unit="auto",step=1):
|
||||
if step == 1:
|
||||
if unit == "auto": return time[:]
|
||||
if unit == "year" and len(time) == 1: return time[:]
|
||||
if unit == "month" and len(time) == 2: return time[:]
|
||||
if unit == "day" and len(time) == 3: return time[:]
|
||||
exc = getNext(time,unit,step)
|
||||
inc = getNext(exc,"auto",-1)
|
||||
return inc
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@dbserver.route("/artistinfo")
|
||||
def artistInfo_external():
|
||||
keys = FormsDict.decode(request.query)
|
||||
@ -550,18 +452,6 @@ def trackInfo(artists,title):
|
||||
|
||||
|
||||
|
||||
def isPast(date,limit):
|
||||
date_, limit_ = date[:], limit[:]
|
||||
while len(date_) != 3: date_.append(1)
|
||||
while len(limit_) != 3: limit_.append(1)
|
||||
if not date_[0] == limit_[0]:
|
||||
return date_[0] > limit_[0]
|
||||
if not date_[1] == limit_[1]:
|
||||
return date_[1] > limit_[1]
|
||||
return (date_[2] > limit_[2])
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -849,13 +739,6 @@ def sync():
|
||||
|
||||
# Queries the database
|
||||
def db_query(artist=None,artists=None,title=None,track=None,since=None,to=None,within=None,associated=False):
|
||||
# print(artists)
|
||||
# print(title)
|
||||
# print(track)
|
||||
# print(since)
|
||||
# print(to)
|
||||
# print(within)
|
||||
# print(associated)
|
||||
|
||||
(since, to) = time_stamps(since,to,within)
|
||||
|
||||
@ -960,75 +843,6 @@ def db_search(query,type=None):
|
||||
####
|
||||
|
||||
|
||||
# Takes user inputs like YYYY/MM and returns the timestamps. Returns timestamp if timestamp was already given.
|
||||
# to dates are interpreted differently (from 2010 and to 2010 both include all of 2010)
|
||||
# NOW DONE IN THE MALOJATIME MODULE
|
||||
#def getTimestamps(since=None,to=None,within=None):
|
||||
#
|
||||
# f,t,i = since,to,within
|
||||
#
|
||||
# if i is not None:
|
||||
# f = i
|
||||
# t = i
|
||||
#
|
||||
# if isinstance(f, str) and f.lower() == "today":
|
||||
# tod = datetime.datetime.utcnow()
|
||||
# f = [tod.year,tod.month,tod.day]
|
||||
# if isinstance(t, str) and t.lower() == "today":
|
||||
# tod = datetime.datetime.utcnow()
|
||||
# t = [tod.year,tod.month,tod.day]
|
||||
#
|
||||
#
|
||||
# if isinstance(f, str) and f.lower() == "month":
|
||||
# tod = datetime.datetime.utcnow()
|
||||
# f = [tod.year,tod.month]
|
||||
# if isinstance(t, str) and t.lower() == "month":
|
||||
# tod = datetime.datetime.utcnow()
|
||||
# t = [tod.year,tod.month]
|
||||
#
|
||||
#
|
||||
# if isinstance(f, str) and f.lower() == "year":
|
||||
# tod = datetime.datetime.utcnow()
|
||||
# f = [tod.year]
|
||||
# if isinstance(t, str) and t.lower() == "year":
|
||||
# tod = datetime.datetime.utcnow()
|
||||
# t = [tod.year]
|
||||
#
|
||||
#
|
||||
# if isinstance(f, str):
|
||||
# f = [int(x) for x in f.split("/")]
|
||||
#
|
||||
# if isinstance(t, str):
|
||||
# t = [int(x) for x in t.split("/")]
|
||||
#
|
||||
#
|
||||
# # this step is done if either the input is a list or the first step was done (which creates a list)
|
||||
# if isinstance(f, list):
|
||||
# date = [1970,1,1]
|
||||
# date[:len(f)] = f
|
||||
# #while len(f) < 3: f.append(1) # padding month and day
|
||||
# f = date
|
||||
# #f = int(datetime.datetime(date[0],date[1],date[2],date[3],date[4],tzinfo=datetime.timezone.utc).timestamp())
|
||||
# f = int(datetime.datetime(f[0],f[1],f[2],tzinfo=datetime.timezone.utc).timestamp())
|
||||
#
|
||||
# if isinstance(t, list):
|
||||
# t = getNext(t)
|
||||
# #while len(t) < 3: t.append(1)
|
||||
# date = [1970,1,1]
|
||||
# date[:len(t)] = t
|
||||
# t = date
|
||||
# #t = int(datetime.datetime(date[0],date[1],date[2],date[3],date[4],tzinfo=datetime.timezone.utc).timestamp())
|
||||
# t = int(datetime.datetime(t[0],t[1],t[2],tzinfo=datetime.timezone.utc).timestamp())
|
||||
#
|
||||
# if (f==None):
|
||||
# f = min(timestamps)
|
||||
# if (t==None):
|
||||
# t = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).timestamp()
|
||||
#
|
||||
# return (f,t)
|
||||
#
|
||||
#
|
||||
|
||||
|
||||
|
||||
def getArtistId(nameorid):
|
||||
|
126
malojatime.py
126
malojatime.py
@ -294,7 +294,6 @@ def range_desc(since=None,to=None,within=None,short=False):
|
||||
|
||||
def time_stamps(since=None,to=None,within=None):
|
||||
|
||||
from database import getNext
|
||||
|
||||
if within is not None:
|
||||
since = within
|
||||
@ -313,7 +312,7 @@ def time_stamps(since=None,to=None,within=None):
|
||||
if (to==None): stamp2 = int(datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).timestamp())
|
||||
else:
|
||||
to = time_fix(to)
|
||||
to = getNext(to)
|
||||
to = _get_next(to)
|
||||
date = [1970,1,1]
|
||||
date[:len(to)] = to
|
||||
stamp2 = int(datetime.datetime(date[0],date[1],date[2],tzinfo=datetime.timezone.utc).timestamp())
|
||||
@ -324,7 +323,7 @@ def time_stamps(since=None,to=None,within=None):
|
||||
|
||||
|
||||
|
||||
def delimit_desc(step,stepn,trail):
|
||||
def delimit_desc(step="month",stepn=1,trail=1):
|
||||
txt = ""
|
||||
if stepn is not 1: txt += _num(stepn) + "-"
|
||||
txt += {"year":"Yearly","month":"Monthly","day":"Daily"}[step.lower()]
|
||||
@ -340,30 +339,97 @@ def _num(i):
|
||||
else: return str(i)
|
||||
|
||||
|
||||
#def _getNext(time,unit="auto",step=1):
|
||||
# result = time[:]
|
||||
# if unit == "auto":
|
||||
# # see how long the list is, increment by the last specified unit
|
||||
# unit = [None,"year","month","day"][len(time)]
|
||||
#
|
||||
# if unit == "year":
|
||||
# result[0] += step
|
||||
# return result
|
||||
# elif unit == "month":
|
||||
# result[1] += step
|
||||
# while result[1] > 12:
|
||||
# result[1] -= 12
|
||||
# result[0] += 1
|
||||
# while result[1] < 1:
|
||||
# result[1] += 12
|
||||
# result[0] -= 1
|
||||
# return result
|
||||
# elif unit == "day":
|
||||
# dt = datetime.datetime(time[0],time[1],time[2])
|
||||
# d = datetime.timedelta(days=step)
|
||||
# newdate = dt + d
|
||||
# return [newdate.year,newdate.month,newdate.day]
|
||||
# #eugh
|
||||
# elif unit == "week":
|
||||
# return getNext(time,"day",step * 7)
|
||||
#
|
||||
|
||||
|
||||
|
||||
|
||||
def ranges(since=None,to=None,within=None,step="month",stepn=1,trail=1,max_=None):
|
||||
|
||||
(firstincluded,lastincluded) = time_stamps(since=since,to=to,within=within)
|
||||
|
||||
d_start = _get_start_of(firstincluded,step)
|
||||
d_end = _get_start_of(lastincluded,step)
|
||||
d_start = _get_next(d_start,step,stepn) # first range should end right after the first active scrobbling week / month / whatever relevant step
|
||||
d_start = _get_next(d_start,step,stepn * trail * -1) # go one range back to begin
|
||||
|
||||
|
||||
i = 0
|
||||
d_current = d_start
|
||||
while not _is_past(d_current,d_end) and (max_ is None or i < max_):
|
||||
d_current_end = _get_end(d_current,step,stepn * trail)
|
||||
yield (d_current,d_current_end)
|
||||
d_current = _get_next(d_current,step,stepn)
|
||||
i += 1
|
||||
|
||||
|
||||
|
||||
def _get_start_of(timestamp,unit):
|
||||
date = datetime.datetime.utcfromtimestamp(timestamp)
|
||||
if unit == "year":
|
||||
#return [date.year,1,1]
|
||||
return [date.year]
|
||||
elif unit == "month":
|
||||
#return [date.year,date.month,1]
|
||||
return [date.year,date.month]
|
||||
elif unit == "day":
|
||||
return [date.year,date.month,date.day]
|
||||
elif unit == "week":
|
||||
change = (date.weekday() + 1) % 7
|
||||
d = datetime.timedelta(days=change)
|
||||
newdate = date - d
|
||||
return [newdate.year,newdate.month,newdate.day]
|
||||
|
||||
def _get_next(time,unit="auto",step=1):
|
||||
result = time[:]
|
||||
if unit == "auto":
|
||||
# see how long the list is, increment by the last specified unit
|
||||
unit = [None,"year","month","day"][len(time)]
|
||||
#while len(time) < 3:
|
||||
# time.append(1)
|
||||
|
||||
if unit == "year":
|
||||
#return [time[0] + step,time[1],time[2]]
|
||||
result[0] += step
|
||||
return result
|
||||
elif unit == "month":
|
||||
#result = [time[0],time[1] + step,time[2]]
|
||||
result[1] += step
|
||||
while result[1] > 12:
|
||||
result[1] -= 12
|
||||
result[0] += 1
|
||||
while result[1] < 1:
|
||||
result[1] += 12
|
||||
result[0] -= 1
|
||||
return result
|
||||
elif unit == "day":
|
||||
dt = datetime.datetime(time[0],time[1],time[2])
|
||||
d = datetime.timedelta(days=step)
|
||||
newdate = dt + d
|
||||
return [newdate.year,newdate.month,newdate.day]
|
||||
#eugh
|
||||
elif unit == "week":
|
||||
return _get_next(time,"day",step * 7)
|
||||
|
||||
# like _get_next(), but gets the last INCLUDED day / month whatever
|
||||
def _get_end(time,unit="auto",step=1):
|
||||
if step == 1:
|
||||
if unit == "auto": return time[:]
|
||||
if unit == "year" and len(time) == 1: return time[:]
|
||||
if unit == "month" and len(time) == 2: return time[:]
|
||||
if unit == "day" and len(time) == 3: return time[:]
|
||||
exc = _get_next(time,unit,step)
|
||||
inc = _get_next(exc,"auto",-1)
|
||||
return inc
|
||||
|
||||
|
||||
|
||||
def _is_past(date,limit):
|
||||
date_, limit_ = date[:], limit[:]
|
||||
while len(date_) != 3: date_.append(1)
|
||||
while len(limit_) != 3: limit_.append(1)
|
||||
if not date_[0] == limit_[0]:
|
||||
return date_[0] > limit_[0]
|
||||
if not date_[1] == limit_[1]:
|
||||
return date_[1] > limit_[1]
|
||||
return (date_[2] > limit_[2])
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user