retrieve spec content from Pytask

Hi,

Sorry for the dummy question, but I never used this feature as I usually use an external DB to store data and thus it has never been necessary for me to store/propagate data in/along the workflow …

I have this little test code, the goal is to run some FWs in parallel (the “forkit” PyTasks), then gather their results in the “joinit” PyTask

After execution of the parallel part of the workflow, All data are well gathered in the “joinit” task :

"spec": {

    "mytestkey_1": "something_1",

    "mytestkey_0": "something_0",

    "_tasks": [

        {

            "_fw_name": "PyTask",

            "args": [],

            "func": "test_spec.joinit"

        }

    ],

    "mytestkey_3": "something_3",

    "mytestkey_2": "something_2"

}

But I don’t find in the doc how can I access spec from a PyTask.

How can I retrieve these spec data from within “joinit” PyTask ?

Best regards,
David

PS : test code test_spec.py

#!/usr/bin/env python3

from fireworks import FWAction, Firework, ScriptTask, Workflow, PyTask, LaunchPad

def forkit(i):

print("I am forkit num %d" % i)

mykey = "mytestkey_%d" % i

return FWAction(update_spec={mykey : "something_%d" % i})

def joinit():

print("I am joinit")

# retrieve all keys in spec like mytestkey_*

# do something with them

def get_WF():

fw_start = Firework([ScriptTask.from_str("echo \"start\"")], name="dummy start")

all_fws = [fw_start]

deps = []

# proceed in parallel

for i in range(0,4):

	fw = Firework(PyTask(func='test_spec.forkit',args=[i]), name="update_spec_%d" % i, parents=[fw_start])

	all_fws.append(fw)

	deps.append(fw)

# gather results

all_fws.append(Firework(PyTask(func='test_spec.joinit',args=[]), name="gather_spec", parents = deps))

return Workflow(all_fws, name="test spec passing")

def main():

launchpad = LaunchPad()

launchpad.add_wf(get_WF())

if name == ‘main’:

main()

``

Hi,

I finally found a solution. I created a new FireTask class, copying PyTask code, which offers fw_spec as hidden first parameter of the func :

Saisissez le code ici…#!/usr/bin/env python3

``
from fireworks.core.firework import FWAction, Firework, FireTaskBase, Workflow

class PyTaskSpecAccess(FireTaskBase):

_fw_name = 'PyTaskSpecAccess'

required_params = ["func"]

optional_params = ["args", "kwargs",  "auto_kwargs", "stored_data_varname"]

def run_task(self, fw_spec):

	toks = self["func"].rsplit(".", 1)

	if len(toks) == 2:

		modname, funcname = toks

		mod = __import__(modname, globals(), locals(), [str(funcname)], 0)

		func = getattr(mod, funcname)

	else:

		#Handle built in functions.

		func = getattr(builtins, toks[0])

	args = self.get("args", []) 

	args_with_spec = [fw_spec]

	args_with_spec.extend(args)

	if self.get("auto_kwargs"):

		kwargs = {k: v for k, v in self.items()

			  if not (k.startswith("_") or k in self.required_params or k in self.optional_params)}

	else:

		kwargs = self.get("kwargs", {}) 

	output = func(*args_with_spec, **kwargs)

	if isinstance(output,FWAction):

		return output

	elif self.get("stored_data_varname"):

		return FWAction(stored_data={self["stored_data_varname"]: output})

Ugly, but works.

Is there a more elegant way to achieve the same result ?

Best,
David

···

Le jeudi 22 juin 2017 11:53:18 UTC+2, [email protected] a écrit :

Hi,

Sorry for the dummy question, but I never used this feature as I usually use an external DB to store data and thus it has never been necessary for me to store/propagate data in/along the workflow …

I have this little test code, the goal is to run some FWs in parallel (the “forkit” PyTasks), then gather their results in the “joinit” PyTask

After execution of the parallel part of the workflow, All data are well gathered in the “joinit” task :

"spec": {

    "mytestkey_1": "something_1",

    "mytestkey_0": "something_0",

    "_tasks": [

        {

            "_fw_name": "PyTask",

            "args": [],

            "func": "test_spec.joinit"

        }

    ],

    "mytestkey_3": "something_3",

    "mytestkey_2": "something_2"

}

But I don’t find in the doc how can I access spec from a PyTask.

How can I retrieve these spec data from within “joinit” PyTask ?

Best regards,
David

PS : test code test_spec.py

#!/usr/bin/env python3

from fireworks import FWAction, Firework, ScriptTask, Workflow, PyTask, LaunchPad

def forkit(i):

print("I am forkit num d" i)

mykey = "mytestkey_d" i

return FWAction(update_spec={mykey : "something_d" i})

def joinit():

print(“I am joinit”)

retrieve all keys in spec like mytestkey_*

do something with them

def get_WF():

fw_start = Firework([ScriptTask.from_str(“echo “start””)], name=“dummy start”)

all_fws = [fw_start]

deps = []

proceed in parallel

for i in range(0,4):

  fw = Firework(PyTask(func='test_spec.forkit',args=[i]), name="update_spec_%d" % i, parents=[fw_start])
  all_fws.append(fw)
  deps.append(fw)

gather results

all_fws.append(Firework(PyTask(func=‘test_spec.joinit’,args=[]), name=“gather_spec”, parents = deps))

return Workflow(all_fws, name=“test spec passing”)

def main():

launchpad = LaunchPad()

launchpad.add_wf(get_WF())

if name == ‘main’:

main()

``

Hi David

I don’t think you can access variables in the spec using the PyTask. This means the PyTask is not a good choice when you need to read data passed by upstream Fireworks into the spec. Although it would be possible to design the PyTask with an option that passes the contents of the spec as one of the kwargs into your function, I think the current plan is to keep the PyTask simple.

My suggestion is to write your own Firetask. It is really not difficult and there are detailed instructions here:

https://pythonhosted.org/FireWorks/guide_to_writing_firetasks.html

Best,

Anubhav

···

On Thursday, June 22, 2017 at 2:53:18 AM UTC-7, [email protected] wrote:

Hi,

Sorry for the dummy question, but I never used this feature as I usually use an external DB to store data and thus it has never been necessary for me to store/propagate data in/along the workflow …

I have this little test code, the goal is to run some FWs in parallel (the “forkit” PyTasks), then gather their results in the “joinit” PyTask

After execution of the parallel part of the workflow, All data are well gathered in the “joinit” task :

"spec": {

    "mytestkey_1": "something_1",

    "mytestkey_0": "something_0",

    "_tasks": [

        {

            "_fw_name": "PyTask",

            "args": [],

            "func": "test_spec.joinit"

        }

    ],

    "mytestkey_3": "something_3",

    "mytestkey_2": "something_2"

}

But I don’t find in the doc how can I access spec from a PyTask.

How can I retrieve these spec data from within “joinit” PyTask ?

Best regards,
David

PS : test code test_spec.py

#!/usr/bin/env python3

from fireworks import FWAction, Firework, ScriptTask, Workflow, PyTask, LaunchPad

def forkit(i):

print("I am forkit num d" i)

mykey = "mytestkey_d" i

return FWAction(update_spec={mykey : "something_d" i})

def joinit():

print(“I am joinit”)

retrieve all keys in spec like mytestkey_*

do something with them

def get_WF():

fw_start = Firework([ScriptTask.from_str(“echo “start””)], name=“dummy start”)

all_fws = [fw_start]

deps = []

proceed in parallel

for i in range(0,4):

  fw = Firework(PyTask(func='test_spec.forkit',args=[i]), name="update_spec_%d" % i, parents=[fw_start])
  all_fws.append(fw)
  deps.append(fw)

gather results

all_fws.append(Firework(PyTask(func=‘test_spec.joinit’,args=[]), name=“gather_spec”, parents = deps))

return Workflow(all_fws, name=“test spec passing”)

def main():

launchpad = LaunchPad()

launchpad.add_wf(get_WF())

if name == ‘main’:

main()

``

Hi David

Looks like I missed your update before I responded.

Your proposed solution is a “general” PyTask (can run any Python function) with access to the spec. It’s probably possible to clean it up a little from your PyTaskSpecAccess, but in general it will look something like that if you want to have a general PyTask with spec access.

If you want the code to be cleaner, you can just write your own Firetask called “JoinItTask” that doesn’t take in a general function (func) or general arguments (args, kwargs), etc. i.e., rather than starting with the more complicated PyTask, just follow the tutorial here:

https://pythonhosted.org/FireWorks/guide_to_writing_firetasks.html

The tutorial will show how to create a clean Firetask if you limit yourself to only calling the desired function (joinit()) rather than being able to call any Python function.

Best,

Anubhav

···

On Thursday, June 22, 2017 at 9:07:42 AM UTC-7, Anubhav Jain wrote:

Hi David

I don’t think you can access variables in the spec using the PyTask. This means the PyTask is not a good choice when you need to read data passed by upstream Fireworks into the spec. Although it would be possible to design the PyTask with an option that passes the contents of the spec as one of the kwargs into your function, I think the current plan is to keep the PyTask simple.

My suggestion is to write your own Firetask. It is really not difficult and there are detailed instructions here:

https://pythonhosted.org/FireWorks/guide_to_writing_firetasks.html

Best,

Anubhav

On Thursday, June 22, 2017 at 2:53:18 AM UTC-7, [email protected] wrote:

Hi,

Sorry for the dummy question, but I never used this feature as I usually use an external DB to store data and thus it has never been necessary for me to store/propagate data in/along the workflow …

I have this little test code, the goal is to run some FWs in parallel (the “forkit” PyTasks), then gather their results in the “joinit” PyTask

After execution of the parallel part of the workflow, All data are well gathered in the “joinit” task :

"spec": {

    "mytestkey_1": "something_1",

    "mytestkey_0": "something_0",

    "_tasks": [

        {

            "_fw_name": "PyTask",

            "args": [],

            "func": "test_spec.joinit"

        }

    ],

    "mytestkey_3": "something_3",

    "mytestkey_2": "something_2"

}

But I don’t find in the doc how can I access spec from a PyTask.

How can I retrieve these spec data from within “joinit” PyTask ?

Best regards,
David

PS : test code test_spec.py

#!/usr/bin/env python3

from fireworks import FWAction, Firework, ScriptTask, Workflow, PyTask, LaunchPad

def forkit(i):

print("I am forkit num %d" % i)
mykey = "mytestkey_%d" % i
return FWAction(update_spec={mykey : "something_%d" % i})

def joinit():

print("I am joinit")
# retrieve all keys in spec like mytestkey_*
# do something with them

def get_WF():

fw_start = Firework([ScriptTask.from_str("echo \"start\"")], name="dummy start")
all_fws = [fw_start]
deps = []
# proceed in parallel
for i in range(0,4):
  fw = Firework(PyTask(func='test_spec.forkit',args=[i]), name="update_spec_%d" % i, parents=[fw_start])
  all_fws.append(fw)
  deps.append(fw)
# gather results
all_fws.append(Firework(PyTask(func='test_spec.joinit',args=[]), name="gather_spec", parents = deps))
return Workflow(all_fws, name="test spec passing")

def main():

launchpad = LaunchPad()
launchpad.add_wf(get_WF())

if name == ‘main’:

main()

``