Hello,
I have been stuck for few days with an error about TensorFlow execution extension for Siddhi.
First of all, I am using Siddhi tooling v5.1.2, with TensorFlow execution extension v2.0.2.
The model has been trained with TensorFlow v.1.13.1, so the same version used to build the extension, as TensorFlow serving requires.
The full code of my Siddhi Application is below:
________________________________________________________________________
@app:name('TwitterCovidApp7')
@app:description('Analyzing covid related french tweets')
@sink(type='log', prefix='LOGGER')
define stream OutputStream(output0 float, output1 float);
@source(type = 'twitter', consumer.key = "GzHDkD2alhd86ppzX4Yy4Saoj", consumer.secret = "MnDe12RyUpknDThzymK3CwOBDfdW8unILF8140nRvjD9xU8xJ7", access.token = "3344535077-xOw6dlYcRs5zBQciHs5pXeZPdkMJ9VZpDfwKWx1", access.token.secret = "536rjFOkzcdTafa3Bm0naE60OKqWTePC9Rq6GhaFnbwrw", mode = "streaming", track = "covid", language = "en",
@Map(type = 'keyvalue',
@attributes(text = "text")))
define stream inputStream (text string);
define function modifyText[JAVASCRIPT] return object {
var res=[]
var i;
for (i = 0; i < 3000; i++) {
res.push(0.0);
}
return res;
};
@info(name = 'TwitterQuery')
from inputStream
select modifyText(text) as modifiedText
insert into TreatmentStream;
@info(name = 'ToOutput')
from TreatmentStream#tensorFlow:predict('D:\siddhi\tweet analysis model v3\saved_model_testlp0', 'input', 'output', modifiedText)
select output0, output1
insert into OutputStream;
________________________________________________________________________
Basically, I am recieving tweets from Twitter API extension and posting them through requests to my HTTP local server.
But what is important is that I try to use tensorFlow:predict
on modifiedText
variable, which is an array of 3 000 float zeros created within the JavaScript Function modifyText
(which takes text
in parameter select modifyText(text) as modifiedText
but I do not use this text
variable (which contains the tweet) in the fuction for now).
At the end of 'TwitterQuery'
, this [ 0.0 , 0.0 , 0.0 , ... , 0.0 , 0.0 , 0.0 ]
is injected in modifiedText
, and then in ToOutput
we try to predict what will be the output predicted from the model with this zeros array input.
So the input array should be shaped [1,3000] if I did not miss anything.
However this following error is raised when executing the model prediction:
________________________________________________________________________
[2020-12-01_09-57-15_679] ERROR {io.siddhi.extension.execution.tensorflow.TensorFlowExtension} - Error while feeding input input. jdk.nashorn.api.scripting.ScriptObjectMirror cannot be cast to java.lang.String
[2020-12-01_09-57-15_679] ERROR {io.siddhi.core.stream.StreamJunction} - Error in 'TwitterCovidApp7' after consuming events from Stream 'TreatmentStream', You must feed a value for placeholder tensor 'dense_input_1' with dtype float and shape [?,3000]
[[{{node dense_input_1}}]]. Hence, dropping event 'StreamEvent{ timestamp=1606813035678, beforeWindowData=null, onAfterWindowData=[thank u so much Lord ๐ญ๐], outputData=[[object Array]], type=CURRENT, next=null}' (Encoded)
java.lang.IllegalArgumentException: You must feed a value for placeholder tensor 'dense_input_1' with dtype float and shape [?,3000]
[[{{node dense_input_1}}]]
at org.tensorflow.Session.run(Native Method)
at org.tensorflow.Session.access$100(Session.java:48)
at org.tensorflow.Session$Runner.runHelper(Session.java:314)
at org.tensorflow.Session$Runner.run(Session.java:264)
at io.siddhi.extension.execution.tensorflow.TensorFlowExtension.process(TensorFlowExtension.java:229)
at io.siddhi.core.query.processor.stream.StreamProcessor.processEventChunk(StreamProcessor.java:41)
at io.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:132)
at io.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:182)
at io.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:89)
at io.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:115)
at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:176)
at io.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:465)
at io.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at io.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:104)
at io.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:44)
at io.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at io.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:182)
at io.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:89)
at io.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:127)
at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:199)
at io.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:474)
at io.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:34)
at io.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:45)
at io.siddhi.core.stream.input.InputHandler.send(InputHandler.java:78)
at io.siddhi.core.stream.input.source.PassThroughSourceHandler.sendEvent(PassThroughSourceHandler.java:35)
at io.siddhi.core.stream.input.source.InputEventHandler.sendEvent(InputEventHandler.java:83)
at io.siddhi.extension.map.keyvalue.sourcemapper.KeyValueSourceMapper.mapAndProcess(KeyValueSourceMapper.java:139)
at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:175)
at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:119)
at org.wso2.extension.siddhi.io.twitter.source.TwitterStatusListener.onStatus(TwitterStatusListener.java:63)
at twitter4j.StatusStreamImpl.onStatus(StatusStreamImpl.java:75)
at twitter4j.StatusStreamBase$1.run(StatusStreamBase.java:105)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)`
________________________________________________________________________
From this line java.lang.IllegalArgumentException: You must feed a value for placeholder tensor 'dense_input_1' with dtype float and shape [?,3000]
I can understand there is a mismatch in shape between my zeros float array input and the expected input.
If I am not wrong, I have seen in the TensorFlow extension execution official doc that I need to cast the input as an 'object' in order to predict from it, am I doing right by returning an 'object' from the JavaScript function ? I add 0.0 instead of 0 in the array in order to force the float type. I am not sure about this part...
I have also tried to return 'float' instead of 'object' from the JS function and also to push this whole array in an other array to get something like [ [ 0.0 , 0.0 , 0.0 , 0.0 , ... , 0.0 , 0.0 , 0.0 , 0.0 ] ] but the same error is still raised.
Do you have any suggestion ? Thanks