# TODO: change model type to params: Union[Type[Any], Tuple[Type[Any], ...]]
delcls.__annotations__[ROOT_KEY]
def__class_getitem__(cls, model: Union[Type[RootT], TypeVar]) ->Union[Type[RootT], TypeVar]:
# TODO: change model type to params: Union[Type[Any], Tuple[Type[Any], ...]]# as in GenericModel# For now, only singular model types are allowed. These lines are needed for
# TODO: Update Dataset.__eq__ similarly, with tests
else:
raiseRuntimeError('Model does not allow setting of extra attributes')
# TODO: Update Dataset.__eq__ similarly, with testsdef__eq__(self, other: object) ->bool:
returnself.__class__==other.__class__andsuper().__eq__(other)
# TODO: change model type to params: Union[Type[Any], Tuple[Type[Any], ...]]
data: Dict[str, ModelT] =Field(default={})
def__class_getitem__(cls, model: ModelT) ->ModelT:
# TODO: change model type to params: Union[Type[Any], Tuple[Type[Any], ...]]# as in GenericModel# For now, only singular model types are allowed. These lines are needed for
# TODO: implement general solution to make sure that one does not modify input data by
@TaskTemplate()defremove_columns(json_dataset: JsonListOfDictsOfAnyDataset,
column_keys_for_data_items: Dict[str, List[str]]) ->JsonListOfDictsOfAnyDataset:
# TODO: implement general solution to make sure that one does not modify input data by# automatically copying or otherwise. Perhaps setting immutable/frozen option in pydantic,# if available?#
FINISHED=3# TODO: Add 'apply' state# TODO: Add 'failed' state and error management# TODO: Consider the need for a 'waiting' stateclassRunStateLogMessages(str, Enum):
# TODO: Use json serializer package from the pydantic config instead of 'json'
returnmulti_model_dataset# TODO: Use json serializer package from the pydantic config instead of 'json'classMultiModelDataset(Dataset[ModelT], Generic[ModelT]):
# TODO: switch from plural to singular for names of modules in omnipy modules
JsonModel,
JsonNestedDictsModel)
# TODO: switch from plural to singular for names of modules in omnipy modules# TODO: call omnipy modules something else than modules, to distinguish from Python modules.# Perhaps plugins?JsonModelT=TypeVar('JsonModelT', bound=Union[JsonModel, JsonListModel, JsonDictModel])
# TODO: Check if we can move to explicit definition of __root__ field at the object
# As long as models are not created concurrently, setting the class members temporarily# should not have averse effects# TODO: Check if we can move to explicit definition of __root__ field at the object# level in pydantic 2.0 (when it is released)ifcls==Model:
cls._depopulate_root_field()
param_keys=set(inspect.signature(job).parameters.keys())
# TODO: Refactor to remove dependency# Also, add test for not allowing override of fixed_paramsifhasattr(job, 'param_key_map'):
forkey, valinjob.param_key_map.items():
# TODO: When job state machine is implemented, check using that to see if in job state
ifhasattr(self, '_job_func'):
self._name=self._job_func.__name__# TODO: When job state machine is implemented, check using that to see if in job stateself._unique_name=self._generate_unique_name() ifhasattr(self, 'create_job') elseNone@staticmethod
# TODO: Possibly reimplement logic using a state machine, e.g. "transitions" package
persist_outputs: Optional[PersistOutputsOptions] =None,
restore_outputs: Optional[RestoreOutputsOptions] =None):
# TODO: Possibly reimplement logic using a state machine, e.g. "transitions" packageifpersist_outputsisNone:
self._persist_outputs=PersistOpts.FOLLOW_CONFIGifself._has_job_configelseNoneelse:
FINISHED=3# TODO: Add 'apply' state# TODO: Add 'failed' state and error management# TODO: Consider the need for a 'waiting' stateclassRunStateLogMessages(str, Enum):
ifnotself.__doc__:
self._set_standard_field_description()
# TODO: Add test for get_model_classdefget_model_class(self) ->ModelT:
returnself.__fields__.get(DATA_KEY).type_# TODO: Update _raise_no_model_exception() text. Model is now a requirement@staticmethoddef_raise_no_model_exception() ->None:
# TODO: Refactor this, and possibly elsewhere in class
@classmethoddef_create_subcls_inheriting_from_mixins_and_orig_cls(cls):
# TODO: Refactor this, and possibly elsewhere in classdef__init__(self, *args, **kwargs):
# print(f'__init__ for obj of class: {self.__class__.__name__}')
self._serializer_registry=self._create_serializer_registry()
def_create_serializer_registry(self):
# TODO: store in runtime, to remove dependenciesregistry=SerializerRegistry()
registry.register(PandasDatasetToTarFileSerializer)
FINISHED=3# TODO: Add 'apply' state# TODO: Add 'failed' state and error management# TODO: Consider the need for a 'waiting' stateclassRunStateLogMessages(str, Enum):
# TODO: Change JobBase and friends into Generics such as one can annotated with
raise# TODO: Change JobBase and friends into Generics such as one can annotated with# e.g. 'TaskTemplate[[int], int]' instead of just 'TaskTemplate'JobBase.accept_mixin(NameJobBaseMixin)
# TODO: when parsing config from file is implemented, make sure that the new engine
returngetattr(self.objects, engine_choice)
def_new_engine_config_if_new_cls(self, engine: IsEngine, engine_choice: EngineChoice) ->None:
# TODO: when parsing config from file is implemented, make sure that the new engine# config classes here reparse the config filesengine_config_cls=engine.get_config_cls()
ifself._get_engine_config(engine_choice).__class__isnotengine_config_cls:
# TODO: Reimplement typing of flows and task to harmonize with new structure
...
# TODO: Reimplement typing of flows and task to harmonize with new structuredeflinear_flow_template_callable_decorator_cls(
cls: Type['LinearFlowTemplate']
) ->IsTaskTemplatesFlowTemplateCallable['LinearFlowTemplate']:
# TODO: call omnipy modules something else than modules, to distinguish from Python modules.
JsonModel,
JsonNestedDictsModel)
# TODO: switch from plural to singular for names of modules in omnipy modules# TODO: call omnipy modules something else than modules, to distinguish from Python modules.# Perhaps plugins?JsonModelT=TypeVar('JsonModelT', bound=Union[JsonModel, JsonListModel, JsonDictModel])
# TODO: Update _raise_no_model_exception() text. Model is now a requirement
ifnotself.__doc__:
self._set_standard_field_description()
# TODO: Add test for get_model_classdefget_model_class(self) ->ModelT:
returnself.__fields__.get(DATA_KEY).type_# TODO: Update _raise_no_model_exception() text. Model is now a requirement@staticmethoddef_raise_no_model_exception() ->None: