parallel processing - Python RQ: pattern for callback -
i have big number of documents process , using python rq parallelize task.
i pipeline of work done different operations performed on each document. example: a -> b -> c means pass document function a, after a done, proceed b , last c.
however, python rq not seem support pipeline stuff nicely.
here simple dirty of doing this. in 1 word, each function along pipeline call next function in nesting way.
for example, pipeline a->b->c.
at top level, code written this:
q.enqueue(a, the_doc)
where q queue instance , in function a there code like:
q.enqueue(b, the_doc)
and in b, there this:
q.enqueue(c, the_doc)
is there other way more elegant this? example code in one function:
q.enqueue(a, the_doc) q.enqueue(b, the_doc, after = a) q.enqueue(c, the_doc, after= b)
depends_on parameter closest 1 requirement, however, running like:
a_job = q.enqueue(a, the_doc) q.enqueue(b, depends_on=a_job )
won't work. q.enqueue(b, depends_on=a_job ) executed after a_job = q.enqueue(a, the_doc) executed. time b enqueued, result might not ready takes time process.
ps:
if python rq not @ this, else tool in python can use achieve same purpose:
- round-robin parallelization
- pipeline processing support
depends_on parameter closest 1 requirement, however, running like:
a_job = q.enqueue(a, the_doc) q.enqueue(b, depends_on=a_job )
won't work. q.enqueue(b, depends_on=a_job ) executed after a_job = q.enqueue(a, the_doc) executed. time b enqueued, result might not ready takes time process.
for case, q.enqueue(b, depends_on=a_job) run once a_job finishes. if result not ready, q.enqueue(b, depends_on=a_job) wait until ready.
it not support out of box, using other technologies possible.
in case, used caching keep track of previous job in chain, when want enqueue new function(to run right after) can set 'depends_on' parameter when calling enqueue()
note use of additional parameters enqueue: 'timeout, result_ttl, ttl'. these used since running long jobs on rq. can reference use in python rq docs.
i used django_rq.enqueue() derived python rq
# main.py def process_job(): ... # create cache key every chain of methods want call. # note: used web development, in case may want # use variable or database, not caching # number of time cache , keep results in rq two_hrs = 60 * 60 * 2 cache_key = 'update-data-key-%s' % obj.id previous_job_id = cache.get(cache_key) job = django_rq.enqueue(update_metadata, campaign=campaign, list=chosen_list, depends_on=previous_job_id, timeout=two_hrs, result_ttl=two_hrs, ttl=two_hrs) # set value recent finished job, next function # in chain can set proper value 'depends_on' cache.set(token_key, job.id, two_hrs) # utils.py def update_metadata(campaign, list): # code goes here update campaign object list object pass 'depends_on' - the rq docs:
depends_on - specifies job (or job id) must complete before job queued
Very nice blog,keep sharing more blogs with us.
ReplyDeletethank you..
hadoop admin training