En lisant le document sur le céleri, qu'y a-t-il? Document officiel et à mâcher vous-même.
The primitives
Map & Starmap, Chunks n'ont pas été examinés.
Chains
Exécutez les tâches en série. Les tâches suivantes reçoivent le résultat de l'exécution de la tâche précédente.
** Faites attention à la signature de chaque tâche. ** **
from celery import chain
# `add.s(4, 4)`Le résultat de`mul.s(8)`Passez à.そLe résultat de`mul.s(10)`Passez à.
chain(add.s(4, 4), mul.s(8), mul.s(10))
Groups
Exécutez plusieurs tâches en parallèle.
from celery import group
# `task.s(2, 2)`Quand`task.s(4, 4)`Est exécuté en parallèle
group(
	task.s(2, 2),
	task.s(4, 4)
)
Chords
Les résultats de l'exécution de plusieurs tâches peuvent être transmis au rappel.
from celery import chord
#Résultats d'exécution multiples Tsum.s()Passer au
chord(add.s(i, i) for i in xrange(100))(tsum.s()).get()
Dans cet exemple, ʻadd.s (i, i) pour i dans xrange (100) est une tâche à exécuter en parallèle, et le résultat de l'exécution (liste?) Est passé au rappel tsum.s () `.
1 pre-processor + n post-processors
Je voulais faire ça et j'ai cherché!
Cela ressemble à une échelle de temps.

Assumer des tâches telles que l'exécution de la sauvegarde d'image comme prétraitement, le traitement d'image comme post-traitement, le redimensionnement, etc. en même temps.
@task
def main_task(object_id):
    #Tâches de prétraitement
    
    #Beau traitement
    
    #Objet pour les tâches suivantes_Renvoie l'ID
    return object_id
    
@task
def sub_task_1(object_id):
    #Tâches de post-traitement
    pass
@task
def sub_task_2(object_id):
    #Tâches de post-traitement
    pass
#Construisez une chaîne de tâches entières. chaîne()Et groupe.
chains = chain(
    main_task.s(object.pk),
    group(
    	sub_task_1.s(),
    	sub_task_2.s()
    )
)
#Exécutez la chaîne
# main_Une fois l'exécution de la tâche terminée, sous_task_1, sub_task_2 est exécuté en parallèle.
chains.apply_async()
Le but est de faire correspondre les signatures reçues par les tâches suivantes dans le groupe.
1 processor + mapped post processors
Exécutez plusieurs sorties du prétraitement et exécutez plusieurs tâches suivantes en parallèle. Similaire au flux précédent, sauf que le pré-processus génère plusieurs résultats (nombre indéfini).

@task
def pre_process_task(object_id):
    #Tâches de prétraitement
    #Beau traitement
    #Renvoie une liste d'objets qui seront traités
	return [1, 2, 3, 4, 5 ...]
@task
def post_process_task(object_id):
    #Tâches de post-traitement
    #Conception pour recevoir des objets individuels
    pass
@task
def dmap(it, callback):
    #Recevez la liste et transmettez-la à callbak
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()
#Construisez une chaîne de tâches entières
chains = chain(
    pre_process_task.s(object.pk),
    dmap.s(post_process_task.s()
)
#Exécutez la chaîne
# pre_process_publier une fois l'exécution de la tâche terminée_process_processus de tâches en parallèle
chains.apply_async()
Completed task
Si vous utilisez si (), cela devient une tâche immuable, vous pouvez donc exécuter la tâche en ignorant la valeur de retour de la tâche précédente.

@task
def main_task(object_id):
    #Une tâche
    return (object_id, result)
    
@task
def sub_task_1(args):
    #Une tâche
    object_id, result = args
    return True
    
@task
def sub_task_2(args):
    #Une tâche
    object_id, result = args
    return True
    
@task
def finalize_task(object_id):
    #Journal d'achèvement de la tâche de sortie
    logger.info('Task completed')
    return True
    
object_id = 123
chain(
    main_task.s(object_id),
    group(
    	sub_task_1.s(),  # main_Utiliser la valeur de retour de la tâche
    	sub_task_2.s()   # main_Utiliser la valeur de retour de la tâche
    ),
    main_completed_task.si(object_id)       # s()Pas si()Notez que
).apply_async()