Coder Social home page Coder Social logo

flink-extended / dl-on-flink Goto Github PK

View Code? Open in Web Editor NEW
678.0 41.0 202.0 55.12 MB

Deep Learning on Flink aims to integrate Flink and deep learning frameworks (e.g. TensorFlow, PyTorch, etc) to enable distributed deep learning training and inference on a Flink cluster.

License: Apache License 2.0

Shell 1.34% Dockerfile 0.21% Java 70.69% Python 23.69% CMake 0.16% C++ 3.91% Vim Script 0.01%
flink tensorflow pytorch python deep-learning

dl-on-flink's Introduction

Deep Learning on Flink

Deep Learning on Flink aims to integrate Flink and deep learning frameworks (e.g. TensorFlow, PyTorch, etc.) to enable distributed deep learning training and inference on a Flink cluster.

It runs the deep learning tasks inside a Flink operator so that Flink can help establish a distributed environment, manage the resource, read/write the data with the rich connectors in Flink and handle the failures.

Currently, Deep Learning on Flink supports TensorFlow.

Supported Operating System

Deep Learning on Flink is tested and supported on the following 64-bit systems:

  • Ubuntu 18.04
  • macOS 10.15

Support Framework Version

  • TensorFlow: 1.15.x & 2.4.x
  • PyTorch: 1.11.x
  • Flink: 1.14.x

Getting Started

Deep learning on Flink currently works with Tensorflow and PyTorch. You can see the following pages for the usage and examples.

Build From Source

Requirements

  • python: 3.7
  • cmake >= 3.6
  • java 1.8
  • maven >=3.3.0

Deep Learning on Flink requires Java and Python works together. Thus, we need to build for both Java and Python.

Initializing Submodules before Building Deep Learning on Flink from Source

Please use the following command to initialize submodules before building from source.

git submodule update --init --recursive

Build Java

mvn -DskipTests clean install

After finish, you can find the target distribution in the dl-on-flink-dist/target folder.

Build Python

Install from Source

You can run the following commands to install the Python packages from source

# Install dl-on-flink-framework first
pip install dl-on-flink-framework/python

# Note that you should only install one of the following as they require
# different versions of Tensorflow 
# For tensorflow 1.15.x
pip install dl-on-flink-tensorflow/python
# For tensorflow 2.4.x
pip install dl-on-flink-tensorflow-2.x/python

Build wheels

We provide a script to build wheels for Python packages, you can run the following command.

bash tools/build_wheel.sh

After finish, you can find the wheels at tools/dist. Then you can install the python package with the wheels.

pip install tools/dist/<wheel>

For More Information

Design document

License

Apache License 2.0

dl-on-flink's People

Contributors

bgeng777 avatar dependabot[bot] avatar gforky avatar hongbo-miao avatar jiangxin369 avatar jinxing64 avatar kwohting avatar lisy09 avatar littlebbbo avatar queyuexzy avatar ryantd avatar stenicholas avatar sxnan avatar tywtyw avatar weizhong94 avatar wuchaochen avatar wuchaochen1 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dl-on-flink's Issues

Get latest model version not returning the latest model version

get_latest_validated_model_version and get_latest_generated_model_version method in SqlAlchemyStore doesn't return the latest model version, when the model version goes above 10.

The problem is that the two methods don't order the returned ModelVersion by version.

Deploying docker container to Kubernetes guide

Hi,

I followed the ReadMe & was able to deploy the com.alibaba.flink.ml.examples.tensorflow.mnist.MnistDist example locally in docker.

Are there any steps or would anyone know how to deploy this example to Kubernetes?

Thanks

Exception when there is only InputTfExampleConfig but no OutputTfExampleConfig

As follows, this is the normal configuration of ExampleCoding (that is, the format of Flink and Python for data transmission), but if only the encode part is configured without configuring the decode part, it will not be executed, and an exception will be thrown. vice versa.

// configure encode example coding
String strInput = ExampleCodingConfig.createExampleConfigStr(encodeNames, encodeTypes, 
                                                             entryType, entryClass);
config.getProperties().put(TFConstants.INPUT_TF_EXAMPLE_CONFIG, strInput);
config.getProperties().put(MLConstants.ENCODING_CLASS,
                           ExampleCoding.class.getCanonicalName());

// configure decode example coding
String strOutput = ExampleCodingConfig.createExampleConfigStr(decodeNames, decodeTypes, 
                                                              entryType, entryClass);
config.getProperties().put(TFConstants.OUTPUT_TF_EXAMPLE_CONFIG, strOutput);
config.getProperties().put(MLConstants.DECODING_CLASS, 
                           ExampleCoding.class.getCanonicalName());

Such a usage scenario is relatively common. For example, during the training process, the user only needs to transfer data to the TF without returning the table, so there will only be an encode phase of flink-to-tf without tf-to-flink. For the user, it is also customary to set only the encoding-related configuration.
Therefore, I want to be able to configure only the encode part without configuring the decode part, and vice versa.
After review, the main reason is the method of ReflectUtil.createInstance(className, classes, objects) in CodingFactory.java. This method will create an ExampleCoding instance according to ENCODING_CLASS. According to the definition of ExampleCoding.java, both inputConfig and outputConfig(even if not) will be configured in the constructor, which will result in a NullPointerException.
The following is the exception information:

java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.alibaba.flink.ml.util.ReflectUtil.createInstance(ReflectUtil.java:36)
	at com.alibaba.flink.ml.coding.CodingFactory.getEncoding(CodingFactory.java:49)
	at com.alibaba.flink.ml.data.DataExchange.<init>(DataExchange.java:58)
	at com.alibaba.flink.ml.operator.ops.MLMapFunction.open(MLMapFunction.java:80)
	at com.alibaba.flink.ml.operator.ops.MLFlatMapOp.open(MLFlatMapOp.java:51)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at com.alibaba.flink.ml.tensorflow.coding.ExampleCodingConfig.fromJsonObject(ExampleCodingConfig.java:100)
	at com.alibaba.flink.ml.tensorflow.coding.ExampleCoding.<init>(ExampleCoding.java:57)
	... 16 more

The following is a detailed test case, Java code:

private static final String ZookeeperConn = "127.0.0.1:2181";
private static final String[] Scripts = {"test.py"};
private static final int WorkerNum = 1;
private static final int PsNum = 0;

@Test
public void testExampleCodingWithoutDecode() throws Exception {
		TestingServer server = new TestingServer(2181, true);
		StreamExecutionEnvironment streamEnv = 
      	StreamExecutionEnvironment.createLocalEnvironment(1);
		streamEnv.setRestartStrategy(RestartStrategies.noRestart());
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
  
		Table input = tableEnv
				.fromDataStream(streamEnv.fromCollection(createDummyData()), "input");
		TableSchema inputSchema = 
				new TableSchema(new String[]{"input"}, 
                    		new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO});
		TableSchema outputSchema = null;
  
  	TFConfig config = createTFConfig("test_example_coding_without_decode");
  	// configure encode coding
  	String strInput = ExampleCodingConfig.createExampleConfigStr(
      	new String[]{"input"}, new DataTypes[]{DataTypes.STRING}, 
      	ExampleCodingConfig.ObjectType.ROW, Row.class);
  	config.getProperties().put(TFConstants.INPUT_TF_EXAMPLE_CONFIG, strInput);
		config.getProperties().put(MLConstants.ENCODING_CLASS, 
                               ExampleCoding.class.getCanonicalName());
  
  	// run in python
		Table output = TFUtils.inference(streamEnv, tableEnv, input, config, outputSchema);

		streamEnv.execute();
		server.stop();
}

private List<Row> createDummyData() {
		List<Row> rows = new ArrayList<>();
		for (int i = 0; i < 10; i++) {
				Row row = new Row(1);
				row.setField(0, String.format("data-%d", i));
        rows.add(row);
		}
		return rows;
}

private TFConfig createTFConfig(String mapFunc) {
		Map<String, String> prop = new HashMap<>();
		prop.put(MLConstants.CONFIG_STORAGE_TYPE, MLConstants.STORAGE_ZOOKEEPER);
		prop.put(MLConstants.CONFIG_ZOOKEEPER_CONNECT_STR, ZookeeperConn);
		return new TFConfig(WorkerNum, PsNum, prop, Scripts, mapFunc, null);
}

Python code:

import tensorflow as tf
from flink_ml_tensorflow.tensorflow_context import TFContext


class FlinkReader(object):
    def __init__(self, context, batch_size=1, features={'input': tf.FixedLenFeature([], tf.string)}):
        self._context = context
        self._batch_size = batch_size
        self._features = features
        self._build_graph()

    def _decode(self, features):
        return features['input']

    def _build_graph(self):
        dataset = self._context.flink_stream_dataset()
        dataset = dataset.map(lambda record: tf.parse_single_example(record, features=self._features))
        dataset = dataset.map(self._decode)
        dataset = dataset.batch(self._batch_size)
        iterator = dataset.make_one_shot_iterator()
        self._next_batch = iterator.get_next()

    def next_batch(self, sess):
        try:
            batch = sess.run(self._next_batch)
            return batch
        except tf.errors.OutOfRangeError:
            return None


def test_example_coding_without_decode(context):
    tf_context = TFContext(context)
    if 'ps' == tf_context.get_role_name():
        from time import sleep
        while True:
            sleep(1)
    else:
        index = tf_context.get_index()
        job_name = tf_context.get_role_name()
        cluster_json = tf_context.get_tf_cluster()
        cluster = tf.train.ClusterSpec(cluster=cluster_json)

        server = tf.train.Server(cluster, job_name=job_name, task_index=index)
        sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False,
                                     device_filters=["/job:ps", "/job:worker/task:%d" % index])
        with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:' + str(index), cluster=cluster)):
            reader = FlinkReader(tf_context)

            with tf.train.ChiefSessionCreator(master=server.target, config=sess_config).create_session() as sess:
                while True:
                    batch = reader.next_batch(sess)
                    tf.logging.info(str(batch))
                    if batch is None:
                        break
                sys.stdout.flush()

Flink yarn mode

The readme mentions Flink yarn mode.
Would it be possible to update the readme to show how to run the MNSIT example in yarn mode?

README should mention changing user.properties virtualenv.option from --user to -U in virtual environment

When I built source in virtual environment, I failed with error:
"[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.1:exec (install-pip) on project flink-ml-framework: Result of /bin/sh -c cd /root/flink-ai-extended/flink-ml-framework/sbin && bash pip_install_packages.sh execution is: '1'. -> [Help 1]"

I have followed README instruction strictly. However Alibaba staff told me that I should also change user.properties: virtualenv.option=-U

I suggest that README to add this information as well.

TFInferenceUDTF 类 当模型输入变量只有一个的时候

public void eval(Object... objects) {
        if (this.failed) {
            throw new RuntimeException("inference thread failed!");
        } else {
            Preconditions.checkArgument(objects.length == this.inputNames.length, "Input fields length mismatch");
            try {
                this.rowCache.put(objects);
            } catch (InterruptedException var3) {
                var3.printStackTrace();
                throw new RuntimeException(var3.getMessage());
            }
        }
    }

如果模型输入变量只有一个的时候,objects则会是变量本身,而不是包含输入变量长度为1的对象。当输入变量是一个数组,长度不为1时,这会使checkArgument不通过,进而报错。
当模型输入变量有多个的时候,代码则正常工作。

example distribution没有成功

使用flink on yarn测试
1.--mnist-files /tmp/mnist_input 是每台服务器都要放置,还是只在客户端放置

2.python virtual environment:是把编译生成的包放在lib/python2.7/site-packages下,打成package吗?

worker中的python进程crash以后,TF框架会重启worker,重启后线程不工作

1.模拟worker故障,把worker的python进程kill掉
2.看到重启日志,worker被重启
3.重启后worker不再工作,也没有报错
0 [254065, None]
0 [254067, None]
0 [254069, None]
0 [254071, None]
0 [254072, None]
0 [254073, None]
/bin/python
Running user func in process mode
2020-03-31 18:52:45 [worker:0-python-startup.py:105] INFO worker:0 calling user func map_func
########## /data8/hadoop/local/usercache/appcache/application_1584599214294_1161/container_e12_1584599214294_1161_01_000003/temp/code/input_output.py
/data8/hadoop/local/usercache/appcache/application_1584599214294_1161/container_e12_1584599214294_1161_01_000003/temp/tfenv/lib/python2.7/site-packages/flink_ml_tensorflow/libflink_ops.so
load libflink_ops.so success
1.13.1
{'ps': [u'10.60.12.156:63206'], 'worker': [u'10.60.12.156:47182', u'10.60.12.156:25015']}
2020-03-31 18:52:45.563601: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2020-03-31 18:52:45.572807: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2195080000 Hz
2020-03-31 18:52:45.574608: I tensorflow/compiler/xla/service/service.cc:150] XLA service 0x47606a0 executing computations on platform Host. Devices:
2020-03-31 18:52:45.574651: I tensorflow/compiler/xla/service/service.cc:158] StreamExecutor device (0): ,
2020-03-31 18:52:45.609075: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job ps -> {0 -> 10.60.12.156:63206}
2020-03-31 18:52:45.609113: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job worker -> {0 -> localhost:47182, 1 -> 10.60.12.156:25015}
2020-03-31 18:52:45.653664: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:391] Started server with target: grpc://localhost:47182
2020-03-31 18:52:46 [worker:0-python-deprecation.py:323] WARNING From /data8/hadoop/local/usercache/appcache/application_1584599214294_1161/container_e12_1584599214294_1161_01_000003/temp/code/input_output.py:38: get_or_create_global_step (from tensorflow.contrib.framework.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Please switch to tf.train.get_or_create_global_step
2020-03-31 18:52:46 [worker:0-python-deprecation.py:323] WARNING From /data8/hadoop/local/usercache/appcache/application_1584599214294_1161/container_e12_1584599214294_1161_01_000003/temp/tfenv/lib/python2.7/site-packages/tensorflow/python/framework/op_def_library.py:263: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Colocations handled automatically by placer.
2020-03-31 18:52:46 [worker:0-python-monitored_session.py:222] INFO Graph was finalized.
2020-03-31 18:52:51.684078: I tensorflow/core/distributed_runtime/master_session.cc:1192] Start master session 9ce9df9c2aec4aea with config: device_filters: "/job:ps" device_filters: "/job:worker/task:0" allow_soft_placement: true
2020-03-31 18:52:51 [worker:0-python-session_manager.py:491] INFO Running local_init_op.
2020-03-31 18:52:51 [worker:0-python-session_manager.py:493] INFO Done running local_init_op.
2020-03-31 18:52:51.756584: I /root/flink-ai-extended-master/flink-ml-tensorflow/python/flink_ml_tensorflow/ops/flink_writer_ops.cc:46] FlinkTFRecordWriter:FlinkTFRecordWriter:queue:///tmp/1585651729008-0/queue-6229914349635769493.input:8392704

2020-03-31 18:52:51.756676: I /root/flink-ai-extended-master/flink-ml-tensorflow/python/flink_ml_tensorflow/ops/queue_file_system.cc:30] queue name: queue:///tmp/1585651729008-0/queue-6229914349635769493.input:8392704
2020-03-31 18:52:51.756694: I /root/flink-ai-extended-master/flink-ml-tensorflow/python/flink_ml_tensorflow/ops/queue_file_system.cc:52] fileName: /tmp/1585651729008-0/queue-6229914349635769493.input
2020-03-31 18:52:51.756703: I /root/flink-ai-extended-master/flink-ml-tensorflow/python/flink_ml_tensorflow/ops/queue_file_system.cc:53] buf:8392704
2020-03-31 18:52:51.770782: I /root/flink-ai-extended-master/flink-ml-tensorflow/python/flink_ml_tensorflow/ops/queue_file_system.cc:30] queue name: queue:///tmp/1585651729008-0/queue-5958550517903632776.output:8392704
2020-03-31 18:52:51.770832: I /root/flink-ai-extended-master/flink-ml-tensorflow/python/flink_ml_tensorflow/ops/queue_file_system.cc:52] fileName: /tmp/1585651729008-0/queue-5958550517903632776.output
2020-03-31 18:52:51.770841: I /root/flink-ai-extended-master/flink-ml-tensorflow/python/flink_ml_tensorflow/ops/queue_file_system.cc:53] buf:8392704

The streaming inference result needs to wait until the next query to write to sink

In the stream processing environment, after reading data from the source and processing it through Flink-AI-Extended, the result is not immediately written to the sink, but is not written until the next data of the source arrives.

I built a simple demo for stream processing. Source injects a message every 5 seconds, a total of 25. The python part is immediately written back to the sink after reading.
When I inject a message into the source, the log shows that the python process has received it and executed "context.output_writer_op" in python, but the sink did not receive any messages. When I continue to inject a message into the source, the last result is written to the sink.

The following is the log:

...
[Source][2019-07-31 11:45:56.76]produce data-10
[Sink][2019-07-31 11:45:56.76]finish data-9

[Source][2019-07-31 11:46:01.765]produce data-11
[Sink][2019-07-31 11:46:01.765]finish data-10
...

But I want to write back to sink immediately after executing "output_writer_op":

...
[Source][2019-07-31 11:45:56.76]produce data-10
[Sink][2019-07-31 11:45:56.76]finish data-10

[Source][2019-07-31 11:46:01.765]produce data-11
[Sink][2019-07-31 11:46:01.765]finish data-11
...

For the time being, it is not clear why it is the cause of this situation.

The following is my demo code:

package org.apache.flink.table.ml.lib.tensorflow;

import com.alibaba.flink.ml.operator.util.DataTypes;
import com.alibaba.flink.ml.tensorflow.client.TFConfig;
import com.alibaba.flink.ml.tensorflow.client.TFUtils;
import com.alibaba.flink.ml.tensorflow.coding.ExampleCoding;
import com.alibaba.flink.ml.tensorflow.coding.ExampleCodingConfig;
import com.alibaba.flink.ml.tensorflow.util.TFConstants;
import com.alibaba.flink.ml.util.MLConstants;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.ml.lib.tensorflow.util.Utils;
import org.apache.flink.types.Row;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;

public class SourceSinkTest {
    private static final String ZookeeperConn = "127.0.0.1:2181";
    private static final String[] Scripts = {"/Users/bodeng/TextSummarization-On-Flink/src/main/python/pointer-generator/test.py"};
    private static final int WorkerNum = 1;
    private static final int PsNum = 0;

    @Test
    public void testSourceSink() throws Exception {
        TestingServer server = new TestingServer(2181, true);
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
        streamEnv.setRestartStrategy(RestartStrategies.noRestart());

        DataStream<Row> sourceStream = streamEnv.addSource(
                new DummyTimedSource(20, 5), new RowTypeInfo(Types.STRING)).setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
        Table input = tableEnv.fromDataStream(sourceStream, "input");
        TFConfig config = createTFConfig("test_source_sink");

        TableSchema outputSchema = new TableSchema(new String[]{"output"}, new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO});

        // configure encode coding
        String strInput = ExampleCodingConfig.createExampleConfigStr(
                new String[]{"input"}, new DataTypes[]{DataTypes.STRING},
                ExampleCodingConfig.ObjectType.ROW, Row.class);
        config.getProperties().put(TFConstants.INPUT_TF_EXAMPLE_CONFIG, strInput);
        config.getProperties().put(MLConstants.ENCODING_CLASS,
                ExampleCoding.class.getCanonicalName());

        // configure decode coding
        String strOutput = ExampleCodingConfig.createExampleConfigStr(
                new String[]{"output"}, new DataTypes[]{DataTypes.STRING},
                ExampleCodingConfig.ObjectType.ROW, Row.class);
        config.getProperties().put(TFConstants.OUTPUT_TF_EXAMPLE_CONFIG, strOutput);
        config.getProperties().put(MLConstants.DECODING_CLASS,
                ExampleCoding.class.getCanonicalName());
      
        Table output = TFUtils.inference(streamEnv, tableEnv, input, config, outputSchema);
        tableEnv.toAppendStream(output, Row.class)
                .map(r -> "[Sink][" + new Timestamp(System.currentTimeMillis()) + "]finish " + r.getField(0) + "\n")
                .print().setParallelism(1);

        streamEnv.execute();
        server.stop();
    }

    private TFConfig createTFConfig(String mapFunc) {
        Map<String, String> prop = new HashMap<>();
        prop.put(MLConstants.CONFIG_STORAGE_TYPE, MLConstants.STORAGE_ZOOKEEPER);
        prop.put(MLConstants.CONFIG_ZOOKEEPER_CONNECT_STR, ZookeeperConn);
        return new TFConfig(WorkerNum, PsNum, prop, Scripts, mapFunc, null);
    }

    private static class DummyTimedSource implements SourceFunction<Row>, CheckpointedFunction {
        public static final Logger LOG = LoggerFactory.getLogger(DummyTimedSource.class);
        private long count = 0L;
        private long MAX_COUNT;
        private long INTERVAL;
	    private volatile boolean isRunning = true;

        private transient ListState<Long> checkpointedCount;

        public DummyTimedSource(long maxCount, long interval) {
            this.MAX_COUNT = maxCount;
            this.INTERVAL = interval;
        }

        @Override
        public void run(SourceContext<Row> ctx) throws Exception {
            while (isRunning && count < MAX_COUNT) {
                // this synchronized block ensures that state checkpointing,
                // internal state updates and emission of elements are an atomic operation
                synchronized (ctx.getCheckpointLock()) {
                    Row row = new Row(1);
                    row.setField(0, String.format("data-%d", count));
                    System.out.println("[Source][" + new Timestamp(System.currentTimeMillis()) + "]produce " + row.getField(0));
                    ctx.collect(row);
                    count++;
                    Thread.sleep(INTERVAL * 1000);
                }
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.checkpointedCount.clear();
            this.checkpointedCount.add(count);
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.checkpointedCount = context
                    .getOperatorStateStore()
                    .getListState(new ListStateDescriptor<>("count", Long.class));

            if (context.isRestored()) {
                for (Long count : this.checkpointedCount.get()) {
                    this.count = count;
                }
            }
        }
    }
}

and python code:

import sys
import datetime

import tensorflow as tf
from flink_ml_tensorflow.tensorflow_context import TFContext


class FlinkReader(object):
    def __init__(self, context, batch_size=1, features={'input': tf.FixedLenFeature([], tf.string)}):
        self._context = context
        self._batch_size = batch_size
        self._features = features
        self._build_graph()

    def _decode(self, features):
        return features['input']

    def _build_graph(self):
        dataset = self._context.flink_stream_dataset()
        dataset = dataset.map(lambda record: tf.parse_single_example(record, features=self._features))
        dataset = dataset.map(self._decode)
        dataset = dataset.batch(self._batch_size)
        iterator = dataset.make_one_shot_iterator()
        self._next_batch = iterator.get_next()

    def next_batch(self, sess):
        try:
            batch = sess.run(self._next_batch)
            return batch
        except tf.errors.OutOfRangeError:
            return None


class FlinkWriter(object):
    def __init__(self, context):
        self._context = context
        self._build_graph()

    def _build_graph(self):
        self._write_feed = tf.placeholder(dtype=tf.string)
        self.write_op, self._close_op = self._context.output_writer_op([self._write_feed])

    def _example(self, results):
        example = tf.train.Example(features=tf.train.Features(
            feature={
                'output': tf.train.Feature(bytes_list=tf.train.BytesList(value=[results[0]])),
            }
        ))
        return example

    def write_result(self, sess, results):
        sess.run(self.write_op, feed_dict={self._write_feed: self._example(results).SerializeToString()})

    def close(self, sess):
        sess.run(self._close_op)



def test_source_sink(context):
    tf_context = TFContext(context)
    if 'ps' == tf_context.get_role_name():
        from time import sleep
        while True:
            sleep(1)
    else:
        index = tf_context.get_index()
        job_name = tf_context.get_role_name()
        cluster_json = tf_context.get_tf_cluster()
        cluster = tf.train.ClusterSpec(cluster=cluster_json)

        server = tf.train.Server(cluster, job_name=job_name, task_index=index)
        sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False,
                                     device_filters=["/job:ps", "/job:worker/task:%d" % index])
        with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:' + str(index), cluster=cluster)):
            reader = FlinkReader(tf_context)
            writer = FlinkWriter(tf_context)

            with tf.train.ChiefSessionCreator(master=server.target, config=sess_config).create_session() as sess:
                while True:
                    batch = reader.next_batch(sess)
                    if batch is None:
                        break
                    # tf.logging.info("[TF][%s]process %s" % (str(datetime.datetime.now()), str(batch)))

                    writer.write_result(sess, batch)
                writer.close(sess)
                sys.stdout.flush()

0.2.1版本编译报错

您好,编译0.2.1版本是遇到了如下错误,请问有相关的解决经验吗:
In file included from /home/miaoluwen/.local/lib/python2.7/site-packages/tensorflow/include/tensorflow/core/lib/io/record_writer.h:25:0,
[INFO] from /tmp/pip-req-build-joZP07/flink_ml_tensorflow/ops/flink_writer_ops.cc:27:
[INFO] /home/miaoluwen/.local/lib/python2.7/site-packages/tensorflow/include/tensorflow/core/lib/io/zlib_outputbuffer.h:19:18: fatal error: zlib.h: No such file or directory
[INFO] #include <zlib.h>
[INFO] ^
[INFO] compilation terminated.
[INFO] gmake[2]: *** [CMakeFiles/flink_ops.dir/flink_ml_tensorflow/ops/flink_writer_ops.cc.o] Error 1
[INFO] gmake[1]: *** [CMakeFiles/flink_ops.dir/all] Error 2
[INFO] gmake: *** [all] Error 2
[INFO] Traceback (most recent call last):
[INFO] File "", line 1, in
[INFO] File "/tmp/pip-req-build-joZP07/setup.py", line 91, in
[INFO] install_requires = ['tensorflow==1.13.1', 'tensorboard==1.13.1', 'flink_ml_framework==0.2.1'],
[INFO] File "/home/miaoluwen/.local/lib/python2.7/site-packages/setuptools/init.py", line 145, in setup
[INFO] return distutils.core.setup(**attrs)
[INFO] File "/usr/lib64/python2.7/distutils/core.py", line 152, in setup
[INFO] dist.run_commands()
[INFO] File "/usr/lib64/python2.7/distutils/dist.py", line 953, in run_commands
[INFO] self.run_command(cmd)
[INFO] File "/usr/lib64/python2.7/distutils/dist.py", line 972, in run_command
[INFO] cmd_obj.run()
[INFO] File "/home/miaoluwen/.local/lib/python2.7/site-packages/setuptools/command/install.py", line 61, in run
[INFO] return orig.install.run(self)
[INFO] File "/usr/lib64/python2.7/distutils/command/install.py", line 563, in run
[INFO] self.run_command('build')
[INFO] File "/usr/lib64/python2.7/distutils/cmd.py", line 326, in run_command
[INFO] self.distribution.run_command(command)
[INFO] File "/usr/lib64/python2.7/distutils/dist.py", line 972, in run_command
[INFO] cmd_obj.run()
[INFO] File "/usr/lib64/python2.7/distutils/command/build.py", line 127, in run
[INFO] self.run_command(cmd_name)
[INFO] File "/usr/lib64/python2.7/distutils/cmd.py", line 326, in run_command
[INFO] self.distribution.run_command(command)
[INFO] File "/usr/lib64/python2.7/distutils/dist.py", line 972, in run_command
[INFO] cmd_obj.run()
[INFO] File "/tmp/pip-req-build-joZP07/setup.py", line 48, in run
[INFO] self.build_extension(ext)
[INFO] File "/tmp/pip-req-build-joZP07/setup.py", line 81, in build_extension
[INFO] cwd=self.build_temp)
[INFO] File "/usr/lib64/python2.7/subprocess.py", line 542, in check_call
[INFO] raise CalledProcessError(retcode, cmd)
[INFO] subprocess.CalledProcessError: Command '['cmake', '--build', '.', '--config', 'Release', '--', '-j2', '-lpthread']' returned non-zero exit status 2
[INFO] ----------------------------------------

编译环境:
python: 2.7.5
pip: 19.1.1
cmake: 3.6.2
centos: 7
kernel: 3.10.0-693.el7.x86_64

build flink image

image
sh build_flink_image.sh
后一直停留在该处,未发现flink-ml/flink:latest生成,请问是什么原因?
终端一直卡在该处

A bug in setupVirtualEnv method while run on Mac OS

Through the setupVirtualEnv method in com.alibaba.flink.ml.util.PythonUtil.java, the flink-ai-extended system automatically help us create a virtual env. In the end of the setupVirtualEnv method, the flink-ai-extended system needs to find the path of the libJvm dynamic link library, as shown in the code below:
String libJvm = "libjvm.so";
String jvmPath = findChildByName(new File(System.getenv("JAVA_HOME")), libJvm).getParent();
However, the libJvm dynamic link library is named libJvm.dylib under Mac OS, but named libJvm.so under Linux. Therefore, when we run a flink-ai-extended program under Mac OS, the system would report an error, as no libJvm.so can be found under Mac OS.

Add mongodb support to notification service

  1. Add mongodb connector to notification service, provide mongodb as choice of events' storage.
  2. Add get latest version of events for specific key for listening, refactor protofuf file and corresponding codes.

找不到proto包

执行代码报找不到程序包:com.alibaba.flink.ml.proto不存在

import com.alibaba.flink.ml.proto.ContextResponse;
import com.alibaba.flink.ml.proto.ContextRequest;
import com.alibaba.flink.ml.proto.ContextResponse;
import com.alibaba.flink.ml.proto.NodeServiceGrpc;

我看到有这个包,也有相关.proto文件,需要怎么操作解决这个问题?

java.lang.ClassCastException: java.lang.Double cannot be cast to com.alibaba.fastjson.JSONObject

2020-03-27 20:17:14,776 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - worker (1/2) (357c43cc683201a3288f5119db018aa9) switched from RUNNING to FAILED.
java.lang.ClassCastException: java.lang.Double cannot be cast to com.alibaba.fastjson.JSONObject
at com.alibaba.flink.ml.coding.impl.JsonCodingImpl.encode(JsonCodingImpl.java:30)
at com.alibaba.flink.ml.data.impl.DataBridgeImpl.write(DataBridgeImpl.java:45)
at com.alibaba.flink.ml.data.DataExchange.write(DataExchange.java:73)
at com.alibaba.flink.ml.operator.ops.MLMapFunction.flatMap(MLMapFunction.java:147)
at com.alibaba.flink.ml.operator.ops.MLFlatMapOp.flatMap(MLFlatMapOp.java:61)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

Notification high availability issues

  • Abstract method of SqlAlchemyStore has no implementation.
  • notify method of NotificationService has no any return value.
  • HAManager doesn't delete member when stopping.

What is the difference between train and inference interfaces?

The api in TFUtils distinguishes between train and inference, but the two interfaces are basically the same in terms of usage, parameters, and return values. Even if they are freely replaced during use, they can run normally. So, what is the reason for distinguishing these two apis?

public static <IN, OUT> DataStream<OUT> train(StreamExecutionEnvironment streamEnv, DataStream<IN> input,
			TFConfig tfConfig, TypeInformation<OUT> outTI) throws IOException;

public static <IN, OUT> DataStream<OUT> inference(StreamExecutionEnvironment streamEnv, DataStream<IN> input,
			TFConfig tfConfig, TypeInformation<OUT> outTI) throws IOException;

In my opinion, Flink-AI-Extended just plays the role of environment coordination and task scheduling, and the actual running mode should be decentralized to Python to manage and configure through super parameters. I think we can replace the current train/inference with invoke(args...), which is more succinct and unified at the api level. I don't know if there is any inappropriate place in the implementation?

Simplify the ExampleCoding configuration process

Now the process of configuring ExampleCoding is cumbersome. As a common configuration, I think we can add some tool interfaces to help users simply configure.
The following is the general configuration process under the current version:

// configure encode example coding
String strInput = ExampleCodingConfig.createExampleConfigStr(encodeNames, encodeTypes, 
                                                             entryType, entryClass);
config.getProperties().put(TFConstants.INPUT_TF_EXAMPLE_CONFIG, strInput);
config.getProperties().put(MLConstants.ENCODING_CLASS,
                           ExampleCoding.class.getCanonicalName());

// configure decode example coding
String strOutput = ExampleCodingConfig.createExampleConfigStr(decodeNames, decodeTypes, 
                                                              entryType, entryClass);
config.getProperties().put(TFConstants.OUTPUT_TF_EXAMPLE_CONFIG, strOutput);
config.getProperties().put(MLConstants.DECODING_CLASS, 
                           ExampleCoding.class.getCanonicalName());

It can be seen that the user needs to know the column names, types, various constants, etc. of the field when configuring example coding. In fact, it can be encapsulated, and the user only needs to provide the input and output table schema to complete the configuration. For example:

ExampleCodingConfigUtil.configureExampleCoding(tfConfig, inputSchema, outputSchema, 
                                               ExampleCodingConfig.ObjectType.ROW, Row.class);

In the current version, the data type that TF can accept is defined in DataTypes in Flink-AI-Extended project, and the data type of Flink Table field is defined in TypeInformation in Flink project and some basic types such as BasicTypeInfo and BasicArrayTypeInfo are implemented. But the problem is that the basic types of DataTypes and TypeInformation are not one-to-one correspondence.

Therefore, if we want to encapsulate the ExampleCoding configuration process, we need to solve the problem that DataTypes is not compatible with TypeInformation. There are two options:

  1. Provide a method for converting DataTypes and TypeInformation. Although most of the commonly used types can be matched, they are not completely one-to-one correspondence, so there are some problems that cannot be converted.
  2. Discard DataTypes and use TypeInformation directly in Flink-AI-Extended. DataTypes is just a simple enumeration type that only participates in the identification of data types. TypeInformation can also achieve the same functionality.

Solution 1 is relatively simple to implement and easy to be compatible, but solution 2 is better in the long run.

mavn install error

When running with mvn -DskipTests=true clean install. I got such an error. Have anyone met such an error.

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.1:exec (install-pip) on project flink-ml-framework: Result of /bin/sh -c cd /workdir/flink-ai-extended/flink-ml-framework/sbin && bash pip_install_packages.sh execution is: '127'. -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.1:exec (install-pip) on project flink-ml-framework: Result of /bin/sh -c cd /workdir/flink-ai-extended/flink-ml-framework/sbin && bash pip_install_packages.sh execution is: '127'.
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:215)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305) 
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:957)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:289)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:193)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347)
Caused by: org.apache.maven.plugin.MojoExecutionException: Result of /bin/sh -c cd /workdir/flink-ai-extended/flink-ml-framework/sbin && bash pip_install_packages.sh execution is: '127'.
    at org.codehaus.mojo.exec.ExecMojo.execute (ExecMojo.java:260)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:210)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56) 
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:957)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:289)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:193)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347)
[ERROR]
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <args> -rf :flink-ml-framework

revise sqla relationship param into reasonable names

in ai_flow/store/db/db_model.py, there are some parameters, which hold the relationship function, should have a reasonable name to represent the linked entity.

Need fix:

class SqlModelRelation(base, Base):
    ...
    model_relation = relationship("SqlProject", backref=backref('model_relation', cascade='all'))

class SqlWorkflowExecution(base, Base):
    ...
    workflow_execution = relationship("SqlProject", backref=backref('workflow_execution', cascade='all'))

class SqlModelVersionRelation(base):
    ...
    model_version_relations = relationship("SqlModelRelation", backref=backref('model_version_relation', cascade='all'))
    model_version_relation = relationship("SqlWorkflowExecution", backref=backref('model_version_relation', cascade='all'))

class SqlJob(base, Base):
    ...
    job = relationship("SqlWorkflowExecution", backref=backref('job_info', cascade='all'))

Correct:

class SqlModelVersion(base):
    ...
    registered_model = relationship('SqlRegisteredModel', backref=backref('model_version', cascade='all'))

add mongodb store backend for meta service

Major change

  1. add MongoDB store backend to support meta service
  2. set up an abstract test class, which is summarized by all test cases about the Store. SQLite and MySQL test class inherit the abstract class
  3. add mongo store test class, inheriting abstract test class

Unit test howto

1. set your MongoDB credentials in ai_flow/test/test_util.py
DEFAULT_MONGODB_USERNAME = 'xxx'
DEFAULT_MONGODB_PASSWORD = 'xxx'
DEFAULT_MONGODB_HOST = 'xxx'
DEFAULT_MONGODB_PORT = 27017

2. comment '@unittest.skip' in ai_flow/test/store/test_mongo_store.py
# @unittest.skip("To run this test you need to configure the MongoDB info in 'ai_flow/test/test_util.py'")
# @pytest.mark.release

Python version >= 3.5 required Error

When I execute script "/opt/work_home/docker/flink/create_venv.sh" in venv-build image, it raised the following errors. How to fix it?

New python executable in /opt/work_home/temp/test/tfenv/bin/python
Installing setuptools, pip, wheel...
done.
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 won't be maintained after that date. A future version of pip will drop support for Python 2.7. More details about Python 2 support in pip, can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support
Looking in indexes: http://mirrors.aliyun.com/pypi/simple/
Collecting grpcio
Downloading http://mirrors.aliyun.com/pypi/packages/a5/46/5d08b6e26748ed6f3b5e93d980ea5daa63c3a8200b2ad270645b0e2f9566/grpcio-1.22.0-cp27-cp27mu-manylinux1_x86_64.whl (2.2MB)
|████████████████████████████████| 2.2MB 8.4MB/s
Collecting six
Downloading http://mirrors.aliyun.com/pypi/packages/73/fb/00a976f728d0d1fecfe898238ce23f502a721c0ac0ecfedb80e0d88c64e9/six-1.12.0-py2.py3-none-any.whl
Collecting numpy
Downloading http://mirrors.aliyun.com/pypi/packages/da/32/1b8f2bb5fb50e4db68543eb85ce37b9fa6660cd05b58bddfafafa7ed62da/numpy-1.17.0.zip (6.5MB)
|████████████████████████████████| 6.5MB 2.8MB/s
ERROR: Command errored out with exit status 1:
command: /opt/work_home/temp/test/tfenv/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-TAcG_0/numpy/setup.py'"'"'; file='"'"'/tmp/pip-install-TAcG_0/numpy/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(file);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, file, '"'"'exec'"'"'))' egg_info --egg-base pip-egg-info
cwd: /tmp/pip-install-TAcG_0/numpy/
Complete output (5 lines):
Traceback (most recent call last):
File "", line 1, in
File "/tmp/pip-install-TAcG_0/numpy/setup.py", line 31, in
raise RuntimeError("Python version >= 3.5 required.")
RuntimeError: Python version >= 3.5 required.
----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.

[Flink AI Flow] Events send order is not same as task completion order

Consider the following scenario:
the DAG looks like [op1, op2 -> op3], with event-based scheduler,op1 completed before op2, but the TaskStatusChanged event of op2 is sent to scheduler before op1.

when EventBasedScheduler#_find_schedulable_tasks receives a TaskStatusChanged event, it will check the task status of op1 and op2. So it would find op1 and op2 are SUCCEESS at each time it receives TaskStatusChanged event, and op3 would be triggered twice.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.