siddhi-io / siddhi Goto Github PK
View Code? Open in Web Editor NEWStream Processing and Complex Event Processing Engine
Home Page: http://siddhi.io
License: Apache License 2.0
Stream Processing and Complex Event Processing Engine
Home Page: http://siddhi.io
License: Apache License 2.0
Siddhi is looking for extension files (*.siddhiext) through java.class.path, however, streaming processing engine like Flink (taskmanager) does not put the library in java.class.path, it is in taskmanager.tmp.dirs or java.io.tmpdir, and these libraries' file name suffix is NOT "jar". I think we need to enhance SiddhiExtensionLoader to support external class path and read file type rather than determine by file name suffix.
If there is a snapshot of an execution plan that uses a time window, a null pointer exception is thrown when it is retrieved. I tested with versions 3.1.2 and 4.0.0-M1.
Exception in thread "main" java.lang.NullPointerException
at org.wso2.siddhi.core.query.processor.stream.window.TimeBatchWindowProcessor.restoreState(TimeBatchWindowProcessor.java:249)
at org.wso2.siddhi.core.util.snapshot.SnapshotService.restore(SnapshotService.java:112)
at org.wso2.siddhi.core.ExecutionPlanRuntime.restore(ExecutionPlanRuntime.java:277)
at test.Test.main(Test.java:48)
This is the test code. The exception is thrown after the second execution.
public class Test {
public static void main(String[] args) throws Exception {
StringBuilder plan = new StringBuilder();
plan.append("@config(async = 'true')\n");
plan.append("define stream inStream (temp float);\n");
plan.append("from inStream#window.timeBatch(3000)\n");
plan.append("select avg(temp) as temp\n");
plan.append("insert into outStream\n");
SiddhiManager siddhiManager = new SiddhiManager();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(plan.toString());
executionPlanRuntime.addCallback("outStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event e : events) {
System.out.println(Arrays.toString(e.getData()));
}
}
});
File f = new File("plan.bytes");
if (f.exists()) {
byte[] buffer = new byte[8192];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int aux = 0;
FileInputStream fis = new FileInputStream(f);
while ((aux = fis.read(buffer)) > 0) {
baos.write(buffer, 0, aux);
}
fis.close();
executionPlanRuntime.restore(baos.toByteArray());
}
InputHandler inputHandler = executionPlanRuntime.getInputHandler("inStream");
executionPlanRuntime.start();
for (int i = 1; i <= 1000; i++) {
inputHandler.send(new Object[] {randomTemp()});
}
Thread.sleep(3000);
byte[] bytes = executionPlanRuntime.snapshot();
FileOutputStream fos = new FileOutputStream(f);
fos.write(bytes);
fos.flush();
fos.close();
executionPlanRuntime.shutdown();
siddhiManager.shutdown();
}
private static float randomTemp() {
return (float) (Math.random() * 100);
}
}
Thanks
Reporter : Leo Romanoff ([email protected])
Jirra url : https://wso2.org/jira/browse/SIDDHI-10
Description :
In some of my test-cases I wanted to avoid using a single stream for all tenants (because it is very slow). So, I created one stream per tenant (e.g. 300000 streams). All such streams are structurally the same, but have different names. I noticed that it consumes quite some memory, because stream definitions are not shared, even though they are immutable as far as I understand.
May be it would be a good idea to share stream definitions if they are the same? I.e. StreamDefinition has two fields: "String name" and "StreamRepresentation streamRep". The representation part could be shared by all streams with the same structure.
Even better idea could be to allow custom types. Then you could do something like:
define type MyType (fiield1 string, field2 int, field3 float)
define stream MyStream1 MyType
define stream MyStream2 MyType
define stream MyStream3 MyType
define stream MyStream4 MyType
...
Plus, if custom types could be defined, one could allow using them in stream/type definitions, e.g.:
define type MySecondType (fiield1 string, field2 int, field3 float, field3 MyType)
Reporter : Manoj Gunawardena ([email protected])
Jirra url : https://wso2.org/jira/browse/SIDDHI-4
Description :
For large time frames and large number of events the memory based event queues might be running out of memory. The idea of disk backed window is, when the number of events increased in the incoming queue, the part of the queue is written to the disk and once events required to emit, read from disk and load to memory in advance.
Any plan to add javadoc to the source code?
It definitely helps user to better understand how Siddhi works, and of course would be more contributions.
ExecutionPlan.java L52: Annotation.annotation("info").element("name", name)
ExecutionPlanParser.java L73: executionPlanContext.setName(element.getValue())
SiddhiConstants.java L31: public static final String ANNOTATION_NAME = "Name"
The name of executionPlanContext
is always a generated UUID.
Is the name of executionPlanContext
is supposed to be the name of ExecutionPlan
?
i want use connting pattern with time gap,but the result makes me confused.The example i wrote is a brute force login success dection and below is the execution plan:
@Plan:name('TestExecutionPlan')
define stream rawStream ( catBehavior string, catOutcome string, srcAddress string,
deviceCat string, srcUsername string,
catObject string, destAddress string, appProtocol string );
@info(name = 'condition1')
from rawStream[ catBehavior == '/Authentication/Verify'
and catOutcome == 'FAIL' and not( srcAddress is null ) ]#window.time(1 min)
select srcAddress, deviceCat, srcUsername, destAddress, appProtocol
insert into e1_OutputStream;
@info(name = 'condition2')
from rawStream[ catBehavior == '/Authentication/Verify'
and catOutcome == 'OK' and not( srcAddress is null ) ]#window.time(1 min)
select srcAddress, deviceCat, srcUsername, destAddress, appProtocol
insert into e2_OutputStreamOutputStream;
@info(name = 'result')
from every ( e1 = e1_OutputStream<9:> ) -> e2 = e2_OutputStream[ e1.srcAddress == srcAddress
and e1.deviceCat == deviceCat
and e1.srcUsername == srcUsername
and e1.destAddress == destAddress
and e1.appProtocol == appProtocol ]
within 10 second
select 'relationEvent' as event, e1.srcAddress,
e1.deviceCat, e1.srcUsername, e1.destAddress, e1.appProtocol
insert into resultOutputStream;
and the event stream is below,include 9 FAIL, 1 SUCCESS events
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,OK,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
9 FAIL login and 1 SUC login will trigger the rule,so i wrote e1 = e1_OutputStream<9:>
,also i want detect during a time period,so i wrote within 10 second
.the event streams i sent should not trigger the rule( the sum of delay time > 10 sec ),but the result is that it do triggered the rule.so i'm wondering the time gap i defined,matches with the first event or the last event?
9 FAIL login and 1 SUC login will trigger the rule,so i wrote e1 = e1_OutputStream<9:>
,also i want detect during a time period,so i wrote within 10 second
,the event streams i sent should not trigger the rule,but the result is it do triggered the rule.so i'm wornding
i found it in the document,but when i used it in my code,error occured.And i can not found DistinctcountAttributeAggregator.java in latest siddhi version
Hi, when I try to build the siddhi project using mvn package, it gives the following error, can you help me to resolve this error ?
Thank you
`[INFO] ------------------------------------------------------------------------
[ERROR] Plugin org.apache.maven.plugins:maven-source-plugin:2.1.2 or one of its dependencies could not be resolved: Failed to read artifact descriptor for org.apache.maven.plugins:maven-source-plugin:jar:2.1.2: 1 problem was encountered while building the effective model
[ERROR] [FATAL] Non-readable POM C:\Users\Roledene.m2\repository\org\apache\maven\plugins\maven-source-plugin\2.1.2\maven-source-plugin-2.1.2.pom: no more data available - expected end tag to close start tag from line 22, parser stopped on START_DOCUMENT seen ...mlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoc... @22:119 @
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginResolutionException`
Can someone please let me know if siddhi has built in caching?
Thanks
Currently external time batch is trigger by events driven, which is correct and expected. But there is no time round.
Is that possible to support feature like
from stream#window.externalTimeBatch(timestamp, 5 min, 1 second), here the optional third parameter is to specify the round. That's if given 1 seconds, each time window should be starting seconds. (Instead, currently we might start by event timestamp which is not necessary to be round as starting seconds)
Reporter : @suhothayan
Jirra url : https://wso2.org/jira/browse/SIDDHI-9
Description :
to enable siddhi to have a stranded cache API
What is the status of non-occuriences support in sequences and patterns?
Given the test class below, it appears that ExecutionPlanRuntime is not shutting down.
package eu.ferari.examples.distributedcount.states;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.util.EventPrinter;
public class STState {
private static final String eventStreamDefinition =
"define stream cdrEventStream (nodeId string, phone string, timeStamp long); ";
private static final String query =
"@info(name = 'query1') from cdrEventStream#window.externalTime(timeStamp,10 sec) " + "select nodeId, phone, timeStamp, count(timeStamp) as callCount group by phone " + "insert all events into outputStream;";
private final SiddhiManager siddhiManager = new SiddhiManager();
private final ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(eventStreamDefinition + query);
private final InputHandler inputHandler = executionPlanRuntime.getInputHandler("cdrEventStream");
private int inEventCount;
private int removeEventCount;
public void externalTimeWindowTest1() throws InterruptedException {
SiddhiManager siddhiManager = new SiddhiManager();
String cseEventStream = "" +
"define stream LoginEvents (timeStamp long, ip string, phone string) ;";
String query = "" +
"@info(name = 'query1') " +
"from LoginEvents#window.externalTime(timeStamp,5 sec) " +
"select timeStamp, count(ip) as cccount, phone group by phone " +
"insert all events into uniqueIps ;";
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
executionPlanRuntime.addCallback("query1", new QueryCallback() {@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
}
});
InputHandler inputHandler = executionPlanRuntime.getInputHandler("LoginEvents");
executionPlanRuntime.start();
inputHandler.send(new Object[] {
1366335804341l, "192.10.1.3", "1"
});
executionPlanRuntime.shutdown();
System.out.println("Done");
}
public static void main(String[] args) throws InterruptedException {
STState s = new STState();
s.externalTimeWindowTest1();
}
}
Using siddhi 3.0.3 as Java library
I am seeing some unexpected behavior when using group by with a time window. The general issue is that I get the correct result aggregations for the group, but I receive one aggregated result per event, not one per group.
Specifically,
Using the following query, with no time window:
@info(name = 'RealTimeQuery') from MyEventStream select EventName as EventName, count() as Count, avg(ElapsedTime) as Avg_ElapsedTime group by EventName output snapshot every 30 seconds insert into CEP_Result;
I get one result per unique EventName, with the proper counts and avg. So, in my example, there are 15 unique EventNames, and each unique EventName occurs 10 times for a total of 150 events. I get 15 results, one for each EventName, with each having a count of 10 and the correct average, as I would expect.
However, if I add a time window to this query:
@info(name = 'RealTimeQuery') from MyEventStream#window.time(2 minutes) select EventName as EventName, count() as Count, avg(ElapsedTime) as Avg_ElapsedTime group by EventName output snapshot every 30 seconds insert into CEP_Result;
The result for each EventName is correct, as it has the correct count and average. But, instead of one result per EventName, I get 10, for a total of 150 results.
We're looking to replace Esper with Siddhi, so I ran similar queries through Esper, and with Esper, in both cases, I received the expected output of one result per EventName.
Reporter : @sacjaya
Jirra url : https://wso2.org/jira/browse/SIDDHI-7
Description :
Allowing users to add a version with the siddhi extension name. Same extension can exist with different versions. If extension get changed and a new version arrives, there won't be a need to change existing queries.
The command line debugger tool SiddhiDebuggerClient.java is in test directory so it is not packed in the jar and end users cannot use the command line debugger.
Current location: siddhi-core/src/test/java/org/wso2/siddhi/core/debugger/SiddhiDebuggerClient.java
ExecutionPlanRuntimeBuilder
defined a triggerDefinitionMap
and eventTriggerMap
, but they never passed to ExecutionPlanRuntime
when ExecutionPlanRuntimeBuilder.build()
is called. I'm not sure if I understand correctly.
I recently adopted Siddhi CEP engine 3.1.2 for my own research. However, one problem hinders me from going further. For example, I would like to use a 1-minute sliding time window for processing an event stream. I wonder if I can get access to the filtered events specific to each of the sliding window. That is, I want to collect all filtered events for sliding time window 1, sliding time window 2, etc... rather than the final messed outputs
Reporter : @suhothayan
Description :
Adding median aggregator for Siddhi
Hello!
I am trying to use siddhi as a library in my own project but I don't really know how to configure programmatically the siddhi manager to use it in HA mode without using the whole "wso2 cep framework".
Is this possible? would it be possible to get any example? please help!! I am really lost with siddhi 3.0, and I can't find any useful documentation or guide to use it as an independent library...
Thank you very much!!!
Has problems with Siddhi group by with a time window, I am able to get the correct result aggregations for the group, but I receive one aggregated result per event, not one per group.
Here is the query:
define stream bootCorrelationStream (logLevel string, message string, similarityId string, timestamp long, uuid string); @info(name = 'bootCorrelation') from bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2 min) select similarityId, adanos:first(message) as message, min(timestamp) as startTime, max(timestamp) as endTime group by uuid, similarityId insert into tempStream;
Input data:
logLevel="ERROR", message="ERROR first message", timestamp="2016-05-21 01:22:07.579", uuid="94350d43-8b6b-4669-8561-2f956b05a341", similarityId="b9b35d4a-03b9-4b36-9c02-302c63a2e382",uuid=94350d43-8b6b-4669-8561-2f956b05a341
logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21 01:22:08.314", uuid="94350d43-8b6b-4669-8561-2f956b05a341", similarityId="9ced5924-11b5-4008-9cf0-ea4a32311ce6",uuid=94350d43-8b6b-4669-8561-2f956b05a341
logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21 01:22:15.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341", similarityId="9ced5924-11b5-4008-9cf0-ea4a32311ce6",uuid=94350d43-8b6b-4669-8561-2f956b05a341
logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21 01:25:07.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341", similarityId="9ced5924-11b5-4008-9cf0-ea4a32311ce6",uuid=94350d43-8b6b-4669-8561-2f956b05a341
Here is the output:
Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579, similarityId=b9b35d4a-03b9-4b36-9c02-302c63a2e382, uuid=94350d43-8b6b-4669-8561-2f956b05a341
Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314, similarityId=9ced5924-11b5-4008-9cf0-ea4a32311ce6, uuid=94350d43-8b6b-4669-8561-2f956b05a341
Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314, similarityId=9ced5924-11b5-4008-9cf0-ea4a32311ce6, uuid=94350d43-8b6b-4669-8561-2f956b05a341
Count: 1; message=ERROR second message, timestamp=2016-05-21 01:25:07.017, similarityId=9ced5924-11b5-4008-9cf0-ea4a32311ce6, uuid=94350d43-8b6b-4669-8561-2f956b05a341
Here is the major code:
register callback
`
this.siddhiRuntime = new SiddhiRuntimeHolder();
this.siddhiRuntime.siddhiManager = new SiddhiManager();
ExecutionPlanRuntime executionPlanRuntime = this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
this.siddhiRuntime.inputHandler = executionPlanRuntime.getInputHandler(this.getStreamName());
final SiddhiBolt siddhiBolt = this;
this.siddhiRuntime.queryCallback = new QueryCallback() {
@Override
public void receive(long timeStamp, org.wso2.siddhi.core.event.Event[] inEvents,
org.wso2.siddhi.core.event.Event[] removeEvents) {
siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
}
};
executionPlanRuntime.addCallback("query", this.siddhiRuntime.queryCallback);
executionPlanRuntime.start();
`
query details
StringBuilder executionPlane = new StringBuilder(" define stream bootCorrelationStream "); executionPlane.append("(logLevel string, message string, similarityId string, timestamp long, uuid string); "); executionPlane.append(" @info(name = 'bootCorrelation') "); // externalTimeBatch(timestamp, 5 min), batch time window for specified timestamp executionPlane.append("from bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2 min) "); executionPlane.append(" select similarityId, adanos:first(message) as message, uuid, "); executionPlane.append(" min(timestamp) as startTime, max(timestamp) as endTime "); executionPlane.append(" group by uuid, similarityId "); executionPlane.append(" insert into tempStream; "); appendLogs(executionPlane.toString()); return executionPlane.toString();
callback details
`
Map<String, List> tempMap = new HashMap<String, List>();
for (Event event : inEvents) {
Object[] data = event.getData();
BootCorrelationAggregationData aggregateData = new BootCorrelationAggregationData();
aggregateData.setSimilarityId((String) data[0]);
aggregateData.setMessage((String) data[1]);
aggregateData.setUuid((String) data[2]);
aggregateData.setTimestamp((Long) data[3]);
if (!tempMap.containsKey(aggregateData.getUuid())) {
tempMap.put(aggregateData.getUuid(), new ArrayList());
}
tempMap.get(aggregateData.getUuid()).add(aggregateData);
}
List<BootCorrelationAggregationData> emitList = new ArrayList<BootCorrelationAggregationData>();
Iterator<Entry<String, List<BootCorrelationAggregationData>>> it = tempMap.entrySet().iterator();
while (it.hasNext()) {
Entry<String, List<BootCorrelationAggregationData>> entry = it.next();
List<BootCorrelationAggregationData> temp = entry.getValue();
Collections.sort(temp, new Comparator<BootCorrelationAggregationData>() {
@Override
public int compare(BootCorrelationAggregationData o1, BootCorrelationAggregationData o2) {
return (int) (o1.getTimestamp() - o2.getTimestamp());
}
});
if (temp.size() > 0) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String message = String.format("Count: %s; message=%s, timestamp=%s, similarityId=%s, uuid=%s", temp.size(),
temp.get(0).getMessage(), sdf.format(new Date(temp.get(0).getTimestamp())),
temp.get(0).getSimilarityId(), temp.get(0).getUuid()) LOG.info(message);
emitList.add(temp.get(0));
}
}
if (emitList.size() > 0) {
this.getOutputCollector().emit(new Values(emitList));
LOG.info(String.format("BootCorrelationSiddhiBolt emit size: %s", emitList.size()));
for (BootCorrelationAggregationData data : emitList) {
LOG.info(String.format("BootCorrelationSiddhiBolt emits %s", ToStringBuilder.reflectionToString(data)));
}
}
`
Reporter : Rajeev Sampath ([email protected])
Jirra url : https://wso2.org/jira/browse/SIDDHI-11
Description :
Currently the functions such as min(), max() are applicable only for individual attributes and we can't select the corresponding event that has the max() attribute.
To solve this something like selectEvent(max(attribute_name)) is required for windows/tables and other aggregators.
Hey guys,
I am trying to follow https://docs.wso2.com/display/CEP310/Writing+a+Custom+Function to create an extention, but I cannot find https://docs.wso2.com/display/CEP310/Writing+a+Custom+Function
anywhere, I have the following dependencies included:
siddhi-query-api siddhi-core siddhi-query-compiler
Where can I get that Annotation? I am using version 3.1.0. Or is the annotation not needed anymore?
Thanks
Using Pattern query to detect non-occurrence of an event. After testing for a while, seeing Dropping event exception. Using 3.1.0 version. Should their be lock in handling pendingEventList in LogicalPreStateProcessor. ?
2017-03-13 16:31:36,671 | ERROR | heduler-thread-4 | AbstractStreamProcessor | 188 - siddhi-core - 3.1.0 | Dropping event chunk EventChunk{first=StreamEvent{timestamp=1489422696671, beforeWindowData=null, onAfterWindowData=null, outputData=[C006D20050220031, 1489422336000, 424660, AP_CAPABLE], type=CURRENT, next=null}}, error in processing org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor, null
java.lang.NullPointerException
at org.wso2.siddhi.core.query.input.stream.state.LogicalPreStateProcessor.processAndReturn(LogicalPreStateProcessor.java:116)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.SingleProcessStreamReceiver.processAndClear(SingleProcessStreamReceiver.java:56)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:77)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:99)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.stream.state.receiver.PatternSingleProcessStreamReceiver.receive(PatternSingleProcessStreamReceiver.java:43)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:127)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:329)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor.process(TimeWindowProcessor.java:123)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:57)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:101)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.stream.single.EntryValveProcessor.process(EntryValveProcessor.java:47)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.util.Scheduler$EventCaller.run(Scheduler.java:171)[188:siddhi-core:3.1.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[:1.8.0_66]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_66]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)[:1.8.0_66]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)[:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_66]
Another stack trace reported at:
java.lang.NullPointerException
at java.util.LinkedList$ListItr.next(LinkedList.java:893)[:1.8.0_66]
at org.wso2.siddhi.core.query.input.stream.state.LogicalPreStateProcessor.processAndReturn(LogicalPreStateProcessor.java:115)
at org.wso2.siddhi.core.query.input.StateMultiProcessStreamReceiver.processAndClear(StateMultiProcessStreamReceiver.java:49)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.process(MultiProcessStreamReceiver.java:75)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.receive(MultiProcessStreamReceiver.java:112)
at org.wso2.siddhi.core.query.input.stream.state.receiver.PatternMultiProcessStreamReceiver.receive(PatternMultiProcessStreamReceiver.java:58)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:149)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:334)
at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:34)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:41)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:57)[188:siddhi-core:3.1.0]
what proplem in syntax in below query?
<sw:siddhi-query
selector="from LocationStream#window.time( 60 min ) as e1, e2 = LocationStream[(latitude != e1.latitude or longitude != e1.longitude) and e1.assignment == assignment] as e3 join MeasurementStream[mxname=='speed']#window.time( 60 min ) as e2 on e1.assignment==e2.assignment select e1.assignment, e1.latit$
<sw:groovy-stream-processor scriptPath="siddhiExecuteCommand.groovy"
stream="LocationChanged"/>
/sw:siddhi-query
I recently moved to Siddhi CEP 3.1.0 version. Executing this plan at the rate of 400 events/second. After 3 hours, found events are not processed as Siddhi threads are is in deadlock condition
"@plan:name('cepexecutionplan') " +
"define stream deviceCheckInStream (serialNum string,currentCheckInTime long, sequenceNum long); " +
"define stream swCompCheckInStream (serialNum string, timestamp long, sequenceNum long, info1 object, info2 object, streamType string); " +
"@info(name = 'query1') " +
" from deviceCheckInStream#window.time(6 minutes) " +
"select * "+
" insert expired events into delayedDeviceCheckInStream; " +
"@info(name = 'query2') " +
" from (every e1=deviceCheckInStream -> nonOccurringEvent = deviceCheckInStream[sequenceNum > e1.sequenceNum and serialNum == e1.serialNum] or"
+ " delayedEvent=delayedDeviceCheckInStream[e1.sequenceNum == sequenceNum and serialNum == e1.serialNum]) within 370 sec "+
" select e1.serialNum as serialNum, e1.currentCheckInTime as lastSuccessfulCheckInTime, "
+ "nonOccurringEvent.currentCheckInTime as nonOccurringId, e1.sequenceNum as lastSuccessfulCheckInSeqNum "+
" having (nonOccurringId is null) "+
" insert into devCheckInNonOccurrenceStream; " +
"@info(name = 'query3') " +
" from (every e1=swCompCheckInStream -> e2 = swCompCheckInStream[sequenceNum != e1.sequenceNum and serialNum == e1.serialNum]) within 6 min "+
" select e2.sequenceNum as currSeq, e1.serialNum, e2.timestamp, e1.streamType as prevStreamType, e2.streamType as currStreamType, "
+ "e1.info1 as previnfo1, e2.info1 as currinfo1, "
+ "e1.info2 as previnfo2, e2.info2 as currinfo2 "
+ " insert into swCompStatusOutputStream; ";`
Lock obtained in PatternMultiProcessStreamReceiver is stuck in getLastEvent, below is the thread dump traces. Any idea when this could happen? Any help on this?
at org.wso2.siddhi.core.event.ComplexEventChunk.getLastEvent(ComplexEventChunk.java:107)
at org.wso2.siddhi.core.event.ComplexEventChunk.add(ComplexEventChunk.java:100)
at org.wso2.siddhi.core.query.input.stream.state.LogicalPreStateProcessor.processAndReturn(LogicalPreStateProcessor.java:124)
at org.wso2.siddhi.core.query.input.StateMultiProcessStreamReceiver.processAndClear(StateMultiProcessStreamReceiver.java:49)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.process(MultiProcessStreamReceiver.java:75)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.receive(MultiProcessStreamReceiver.java:112)
Reporter : Lahiru Gunathilaka ([email protected])
Jirra url : https://wso2.org/jira/browse/SIDDHI-6
Description :
According to this article[1] siddhi is expecting users to provide the query in an efficient way and siddi evaluate the filters, but if we can arrange the filters (this will be useful for complex filtering logics) and build a high performance tree which leads to a better early failure/success detection that would be great and users really doesn't have to worry about it.
[1]http://wso2.com/library/articles/2013/06/understanding-siddhi-powers-wso2-cep-2x/
I cloned the master branch and tried to create packages. Build stopped at failed tests...
Failed tests:
testSnapshotOutputRateLimitQuery1(org.wso2.siddhi.core.query.ratelimit.SnapshotOutputRateLimitTestCase)
testSnapshotOutputRateLimitQuery2(org.wso2.siddhi.core.query.ratelimit.SnapshotOutputRateLimitTestCase)
testSnapshotOutputRateLimitQuery19(org.wso2.siddhi.core.query.ratelimit.SnapshotOutputRateLimitTestCase)
testSnapshotOutputRateLimitQuery20(org.wso2.siddhi.core.query.ratelimit.SnapshotOutputRateLimitTestCase)
MultiThreadedTest2(org.wso2.siddhi.core.stream.JunctionTestCase)
MultiThreadedWithEventPoolTest(org.wso2.siddhi.core.stream.JunctionTestCase)
FilterTest1(org.wso2.siddhi.core.query.FilterTestCase)
Tests run: 457, Failures: 7, Errors: 0, Skipped: 0
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Siddhi ............................................. SUCCESS [02:50 min]
[INFO] Siddhi Query API ................................... SUCCESS [ 59.150 s]
[INFO] Siddhi Query Compiler .............................. SUCCESS [01:12 min]
[INFO] Siddhi Core ........................................ FAILURE [08:57 min]
[INFO] Siddhi Extensions .................................. SKIPPED
[INFO] Siddhi Extension - Evaluate Scripts ................ SKIPPED
[INFO] Siddhi Extension - String .......................... SKIPPED
[INFO] Siddhi Extension - Geo ............................. SKIPPED
[INFO] Siddhi Extension - Math ............................ SKIPPED
[INFO] Siddhi Extension - Event Table ..................... SKIPPED
[INFO] Siddhi Extension - RegEx ........................... SKIPPED
[INFO] Siddhi Extension - Time ............................ SKIPPED
[INFO] Siddhi Extension - Time Series ..................... SKIPPED
[INFO] Siddhi Samples ..................................... SKIPPED
[INFO] Siddhi Performance Samples ......................... SKIPPED
[INFO] Siddhi Quick Start Samples ......................... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 15:26 min
[INFO] Finished at: 2015-07-28T08:53:44-07:00
[INFO] Final Memory: 53M/226M
[INFO] ------------------------------------------------------------------------
I want to indicate the group so that in the pattern(sequence),I can write like that"from every ( e1 = e1_OutputStream[field == "group1"]<9:> ) ->..."
Reporter : Aitor Argomaniz ([email protected] )
Jirra url : https://wso2.org/jira/browse/SIDDHI-5
Description :
Currently to integrate Siddhi in a Maven project it's necessary setup the pom.xml to use the wso2 maven repository because any version of the CEP is published in Maven Central Repository:
http://search.maven.org/#search%7Cga%7C1%7Csiddhi-core
It would be great you publish new releases of CEP on Maven Central repository in order to don't do necessary modify the pom.xml files to add custom repositores. Also It would give more relevance to the project increasing the visibility of your work.
If you define a partition over a felid that has large number (e.g. Million) of different values CEP with go OOM.
Currently, for about 20K partitions, it will go OOM for with 256MB. This has to happen at some point, however above is too early. We need to make partitions as lightweight as possible as it will be used as a way of grouping.
I am new to CEP and Siddhi. I am trying to write a stock trading quote demo based CEP engine Siddhi.
I still can't get good idea for implementation after read the document of Siddhi and code examples. Actually there is not any example for real case.
Basicly the demo idea is building market quote book by individual quote from exchange line handler.
If I write it by java codes then it should be clear and easy example but I really want to make it by SiddhiQL in generic way.
If you could help/demo on this then I will appreciate very much.
The quote event object from exchange looks like:
Quote{uuid,productId,exchTime,bidPrice,bidSize,offerPrice,offerSize}
The quote book looks like this to end user, keep top 3 best bid/offer prices in quote book.
productId, bid3Price,bid3Size,bid2Price,bid2Size,bid1Price,bid1Size,offer1Price,offer1Size,offer2Price,offer2Size,offer3Price,offer3Size
productId=IBM,bid3Price=100, bid3Size=5k(16k),bid2Price=101,bid2Size=10k(11k),bid1Price=102, bid1Size=1k(1k), offer1Price=103, offer1Size=2k(2k),offer2Price=104,offer2Size=1k(3k),offer3Price=105,offer3Size=1k(4k)
bid1 is best bid price.
Accumulative bid2Size = bid2Size + bid1Size
Accumulative bid3Size = bid3Size + accumulative bid2Size
offer1 is best offer price
Accumulative offer2Size =offer2Size + offer1Size
Accumulative offer3Size = offer3Size + accumulative offer2Size
I tried to implement this logic in Siddhi by SiddhiQL as I knew:
I hope I have explained my thought clearly :)
My code:
SiddhiManager siddhiManager = new SiddhiManager();
String executionPlan = "" +
"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.lengthBatch(4)" +
"select symbol, price, avg(price) as avgPrice " +
"insert expired events into outputStream;";
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
executionPlanRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
System.out.println("receive events: " + events.length);
for (Event e:events)
System.out.println(e);
}
});
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream");
executionPlanRuntime.start();
int i = 0;
while (i < 10) {
float p = i*10;
inputHandler.send(new Object[]{"WSO2", p, 100L});
System.out.println("\"WSO2\", " + p);
inputHandler.send(new Object[] {"IBM", p, 100L});
System.out.println("\"IBM\", " + p);
Thread.sleep(1000);
i++;
}
So, I think that when event expired, i can get the avg of price for events batch.
but the result is,
"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
"IBM", 30.0
receive events: 1
Event{timestamp=1447933077892, data=[IBM, 10.0, 0.0], isExpired=false}
"WSO2", 40.0
"IBM", 40.0
"WSO2", 50.0
"IBM", 50.0
receive events: 1
Event{timestamp=1447933079894, data=[IBM, 30.0, 0.0], isExpired=false}
you can see that the avg of price is 0.
You can only see AttributeAggregator.processAdd but not AttributeAggregator.processRemove in external time sliding window. The issue is caused by we don't reset expiredEventChunk before removing expired events, and the lastReturned event does not have next event.
Reportor : Leo Romanoff ([email protected] )
Jirra url : https://wso2.org/jira/browse/SIDDHI-2
Description :
I created a few test-cases to check experimentally how Siddhi behaves under load. I created a single instance of a SiddhiManager, added 10000 queries that all read from the same input stream, check if a specific attribute (namely, price) of an event is inside a given random interval ( [ price >= random_low and price <= random_high] ) and output into randomly into one of 100 streams. Then I measured the time required to process 1000000 events using this setup. I also did exactly the same experiment with Esper.
My findings were that Siddhi is much, much slower than Esper in this setup. After looking into the internal implementations of both, I realized the reason. Siddhi processes all queries that read from the same input stream in a linear fashion, sequentially. Even if many of the queries have almost the same condition, no optimization attempts are done by Siddhi. Esper detects that many queries have a condition on the same variable and create some sort of a decision tree. As a result, Esper's running time in O(log N), where as Siddhi needs O.
I'm not saying that this test-case if very typical or important, but may be Siddhi should try to analyze the complete set of queries and try to apply some optimizations, when it is possible? I.e. it is a bit of a global optimization applied. It could detect some common sub-expressions or sub-conditions in the queries and evaluate them only once, instead of doing it over and over again by evaluating each query separately.
After getting these first results, I changed the setup, so that each query uses one of many input streams (e.g. one of 300) instead of using the same one. This greatly improved the situation, because now the number of queries per input stream was much smaller and thus processing was way faster. But even in this setup it is still about 5-6 times slower than Esper in this situation.
The maven project is attached, so that you can try to reproduce the issue.
The snapshot/persist function has arace condition .. see the code example below. Note the Thread.sleep and the comment.
And window.TimeBatch is even more broken version >3.0.3. snapshotting only works if there is no pending data. eg snapshot is taken after a query and before a new events arrive. Otherwise same NPE as described below.
public static void main(String[] args) throws InterruptedException, IOException {
ExecutionPlanRuntime executionPlanRuntime = setupTimePlan();
executionPlanRuntime.start();
executionPlanRuntime.getInputHandler("ts").send(new Object[] { "A", 1 });
executionPlanRuntime.getInputHandler("ts").send(new Object[] { "B", 2 });
executionPlanRuntime.getInputHandler("ts").send(new Object[] { "C", 3 });
executionPlanRuntime.getInputHandler("ts").send(new Object[] { "D", 4 });
Thread.sleep(100);
without the Sleep "sumval" in query is randomly 6, 15 and with version >3.0.4 NPE:
at org.wso2.siddhi.core.event.ComplexEventChunk.getLastEvent(ComplexEventChunk.java:81)
at org.wso2.siddhi.core.event.ComplexEventChunk.add(ComplexEventChunk.java:75)
at org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor.restoreState(TimeWindowProcessor.java:151)
at org.wso2.siddhi.core.util.snapshot.SnapshotService.restore(SnapshotService.java:67)
at org.wso2.siddhi.core.ExecutionPlanRuntime.restore(ExecutionPlanRuntime.java:193)
at com.anokor.engine.siddhi.internal.SiddhiEngine.main(SiddhiEngine.java:54)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
byte[] state = executionPlanRuntime.snapshot();
executionPlanRuntime.shutdown();
ExecutionPlanRuntime newRuntime = setupTimePlan();
newRuntime.start();
newRuntime.restore(state);
newRuntime.getInputHandler("ts").send(new Object[] { "Z", 10 });
newRuntime.shutdown();
}
private static ExecutionPlanRuntime setupTimePlan() {
SiddhiManager siddhiManager = new SiddhiManager();
String cseEventStream = "define stream ts (name string, val int);";
String query = "@info(name = 'q1') from ts#window.time(1000) select name, sum(val) as sumval insert into OutputStream";
ExecutionPlanRuntime executionPlanRuntime = siddhiManager
.createExecutionPlanRuntime("@Plan:name('plan') " + cseEventStream + query);
executionPlanRuntime.addCallback("q1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
}
});
return executionPlanRuntime;
}
Reporter : @suhothayan
Jirra url : https://wso2.org/jira/browse/SIDDHI-8
Description :
There are lot of cases to test in Joins for CEP. Following are examples, and I think we need to write test cases for things we have not done.
Dimensions
So there are lot of combinations!!
Need to understand performance for each case. We need a jira for cases we have not covered.
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.007 sec <<< FAILURE!
testRegression(org.wso2.siddhi.test.SimpleRegOutlierTestCase) Time elapsed: 0.003 sec <<< ERROR!
org.wso2.siddhi.core.exception.QueryCreationException: No extension exist for TransformExtension{extensionName='timeseries', functionName='outlier'}
at org.wso2.siddhi.core.util.SiddhiClassLoader.loadExtensionImplementation(SiddhiClassLoader.java:69)
at org.wso2.siddhi.core.util.SiddhiClassLoader.loadProcessor(SiddhiClassLoader.java:37)
at org.wso2.siddhi.core.util.parser.StreamParser.updateQueryEventSourceOutDefinition(StreamParser.java:498)
at org.wso2.siddhi.core.query.creator.BasicQueryCreator.updateQueryEventSourceList(BasicQueryCreator.java:46)
at org.wso2.siddhi.core.query.creator.QueryCreator.init(QueryCreator.java:66)
at org.wso2.siddhi.core.query.creator.BasicQueryCreator.(BasicQueryCreator.java:40)
I use #window.cron
like this:
String query = "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.cron('*/5 * * * * ?') " + "select symbol, sum(price) as ap " + "group by symbol " + "insert into outputStream;";
,
it is expected to output result every 5 seconds group by symbol,
my test result:
"WSO2", 30.0
"IBM", 30.0
"WSO2", 40.0
"IBM", 40.0
"WSO2", 50.0
"IBM", 50.0
"WSO2", 60.0
"IBM", 60.0
"WSO2", 70.0
"IBM", 70.0
receive events: 10
Event{timestamp=1473152315472, data=[WSO2, 30.0], isExpired=false}
Event{timestamp=1473152315472, data=[IBM, 30.0], isExpired=false}
Event{timestamp=1473152316472, data=[WSO2, 70.0], isExpired=false}
Event{timestamp=1473152316472, data=[IBM, 70.0], isExpired=false}
Event{timestamp=1473152317472, data=[WSO2, 120.0], isExpired=false}
Event{timestamp=1473152317472, data=[IBM, 120.0], isExpired=false}
Event{timestamp=1473152318472, data=[WSO2, 180.0], isExpired=false}
Event{timestamp=1473152318472, data=[IBM, 180.0], isExpired=false}
Event{timestamp=1473152319472, data=[WSO2, 250.0], isExpired=false}
Event{timestamp=1473152319472, data=[IBM, 250.0], isExpired=false}
it is grouped every event,
I am expecting output like this[like #window.timeBatch
syntax]:
Event{timestamp=1473152319472, data=[WSO2, 250.0], isExpired=false}
Event{timestamp=1473152319472, data=[IBM, 250.0], isExpired=false}
#window.cron
can not use like this?
mvn clean install is failing
testWindowQuery10(org.wso2.siddhi.test.standard.WindowTestCase) Time elapsed: 2.512 sec <<< FAILURE!
junit.framework.AssertionFailedError: Event count expected:<2> but was:<1>
at junit.framework.Assert.fail(Assert.java:50)
at junit.framework.Assert.failNotEquals(Assert.java:287)
at junit.framework.Assert.assertEquals(Assert.java:67)
at junit.framework.Assert.assertEquals(Assert.java:199)
at org.wso2.siddhi.test.standard.WindowTestCase.testWindowQuery10(WindowTestCase.java:528)
I am using variable partitions with potentially infinite number of keys. My application run out of heap space. Checking the source code I see that partitions are added to the Map PartitionRuntime.partitionInstanceRuntimeMap but that there is no way to remove them.
I think that the CEP engine needs some mechanism to "garbage collect" partitions which are no longer needed. They can be added again if an event with the same key is received.
"Marking" partitions for GC could be done in a number of ways:
-- Ron
e.g:
define stream TempStream (key int, value1 int, value2 string); @info(name = 'query1') from TempStream#window.timeBatch(5 sec)
in 5 seconds, comes 4 events:
[1, 2, a]; <-- earlist event
[2, 3, b];
[1, 4, c];
[2, 5, d]; <-- latest event
I want to get "key", "value2" of the earlist event that group by "key"
In esper, I can use "select minby(time).value2" to get result : [1,a] ,[2,b]
latest: "select maxby(time).value2",and result: [1,c],[2,d]
does siddhi support this?
Hello,
I wanted to do a simple query with siddhi which gives me back the hits in a batch. I could easily do it with esper but it seems I had some problems with siddhi. Is there a way to use external time instaed of internal time ? I tried to use External Time Window feature but it only can be used for simple queries, so it is not enough for me.
Here is the poc (ESPER):
Configuration con = new Configuration();
//con.addEventType("LoginType", beanClass);
con.addEventType("LoginType", LoginType.class);
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(con);
String expression = "select count(loginType), timeStamp, userName,loginType,systemType,eventID, count(eventID) as ecntid from LoginType.win:time_batch(5 sec) where eventID='254' and userName='maci' ";
//expression = "select a.timeStamp, a.userName,a.loginType,a.systemType,count(a.eventID ) as ecntid from pattern[every a=LoginType(eventID='254' and count(a.eventID) > 3) where timer:within(5 sec)";
//expression = "select o.timeStamp, o.userName,o.systemType,o.eventID, count(o.eventID), b.timeStamp, b.eventID from pattern[every o=LoginType -> (timer:interval(5 sec) and b=LoginType(eventID=o.eventID) )]";
expression = "select * from LoginType.win:time_batch(5 sec) " +
" match_recognize ( " +
" measures A.eventID as aEID, B.eventID as bEID , A.timeStamp as firstStamp, B.timeStamp as secondStamp " +
" pattern ( A B ) " +
" define " +
" A as A.eventID = '254' and A.userName='maci' , " +
" B as B.eventID = '255' and prev(B.userName) = A.userName " +
" ) ";
//expression = "select a.eventID,b.eventID,a.userName,b.userName,a.timeStamp,b.timeStamp from pattern[every a=LoginType(a.eventID='254') -> (timer:interval(5 sec) and b=LoginType(b.eventID='255'))]";
/*
expression = "select sorted(price desc).take(5) as highestprice " +
" from LoginType.win:time(5 min) ";
*/
EPStatement statement = epService.getEPAdministrator().createEPL(expression);
MyListener listener = new MyListener();
statement.addListener(listener);
Calendar c = Calendar.getInstance();
EPRuntime runtime = epService.getEPRuntime();
runtime.sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));
long start = new Date().getTime();
int k = 0;
for (k = 0; k <= 100000000; k++) {
c.add(Calendar.SECOND, 1);
String eventID = "254";
if (k % 3 == 0) {
eventID = "255";
}
runtime.sendEvent(new CurrentTimeEvent(c.getTime().getTime()));
runtime.sendEvent(new LoginType(new Date(c.getTime().getTime()), "type1", "any name", eventID, "windows", new Random().nextInt(1000)));
}
long end = new Date().getTime();
System.out.println(end - start);
I tried to use a similar query in esper but I was not be able to use the timestamp of my logs.
Could you give me a similar example with siddhi ?
Here are my attempts to create the above example with siddhi :
public void externalTimeWindowTest1() throws InterruptedException {
SiddhiManager siddhiManager = new SiddhiManager();
String cseEventStream = "define stream LoginEvents (myTime long, ip string, phone string,price int) ;";
String query = "@info(name = 'query1') from LoginEvents#window.timeBatch(5 sec) "
+ "select myTime, phone, ip, price , max(price) as maxprice, min(price) as minprice, count(myTime) as cntip insert all events into OutPut ";
/*String query = "@info(name='query1') from every a1 = LoginEvents " +
" -> b1 = LoginEvents[b1.ip == a1.ip ]#window.externalTime(b1.myTime,5 second) " +
" within 5 seconds select a1.myTime, a1.phone, a1.ip, a1.price , max(a1.price) as maxprice, min(a1.price) as minprice, count(a1.myTime) as cntip insert current events into Output ";
*/
final ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(inEvents);
System.out.println(new Date(timeStamp));
if (inEvents != null) {
System.out.println("======================== START ===============================");
for (Event e : inEvents) {
if (e.isExpired()) continue;
System.out.println("----------------------------");
System.out.println(new Date(e.getTimestamp()));
System.out.println("IP:" + e.getData(2));
System.out.println("Max price:" + e.getData(4));
System.out.println("Min price:" + e.getData(5));
System.out.println("IP siddhiCount:" + e.getData(6));
System.out.println("Expired :" + e.isExpired());
System.out.println("----------------------------");
}
System.out.println("======================== END ===============================");
}
}
});
executionPlanRuntime.start();
new Thread(new Runnable() {
@Override
public void run() {
Calendar c = Calendar.getInstance();
c.add(Calendar.HOUR, 1);
c.add(Calendar.SECOND, 1);
InputHandler inputHandler = executionPlanRuntime.getInputHandler("LoginEvents");
int i = 0;
for (i = 0; i <= 1000; i++) {
c.add(Calendar.SECOND, 1);
try {
inputHandler.send(c.getTime().getTime(), new Object[]{c.getTime().getTime(), new String("192.10.1.1"), "1", new Random().nextInt(1000)});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
sleep(15000);
executionPlanRuntime.shutdown();
System.out.println("Done");
}
Version :
✘~/Downloads/tmp/siddhi/siddhi ➦ e18aaef git branch | sed -n '/* /s///p'
(HEAD detached at v3.0.1)
~/Downloads/tmp/siddhi/siddhi ➦ e18aaef git status
HEAD detached at v3.0.1
nothing to commit, working directory clean
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-javadoc-plugin:2.8:jar (attach-javadocs) on project siddhi-core: MavenReportException: Error while creating archive:
[ERROR] Exit code: 1 - /siddhi/modules/siddhicore/src/main/java/org/wso2/siddhi/core/executor/function/FunctionExecutor.java:80: warning: no description for @return
[ERROR] * @return
[ERROR] ^
Here I describe the business logic I want to implement:
From a continuing stream of logs,if there are more than 9 login fail logs and 1 login success log in the stream in a time period, then generate a event called "bruteForceLoginSuccess".
cuz I want group the aggregation based on "srcAddress, srcUsername, destAddress, appProtocol",so I wrote the query like that :(ande the success qurey is similar)
@info(name = 'condition1')
from rawStream[ catBehavior == '/Authentication/Verify' and catOutcome == 'FAIL' ]#window.timeBatch(10 sec)
select srcAddress, catOutcome, deviceCat, srcUsername, destAddress, appProtocol, distinctcount( catObject ) as distinctMinCount, count() as groupCount
group by srcAddress, srcUsername, destAddress, appProtocol
having groupCount >= 9 and distinctMinCount >=1
insert into e1_OutputStream;
The reason why I use timeBatch window is that I want judge the group size is bigger than 9,so I use a filed called groupCount and it must greater than 9.if I use time window(slide),it will generate 9 events to the e1_OutputStream and every event's groupCount is ++ compared the preview's.(the first's groupCount is 1,the twice is 2 and the ninth event's groupCount is 9)
But in the pattern matching part,if i wrote a cond like groupCount >= 9,I lost 8 preview events(their groupCount is 1~8 ,so they don't matching the cond).
If I use timeBatch,it only generate an aggregate event per group into the e1_OutputStream,and it has the groupCount for the sum of the events in group.I can use this event as a representative events, when I doing the pattern matching.
And there comes the problem:I use timeBatch both in login fail condition and success condition to generate e1_OutputStream and e2_OutputStream,they both have the same time(though the events in them has different time),and the pattern matching must have a time sequence relation, so i can not trigger the pattern unless I sleep a time before I send the login success log.(the pattern matching is in the below)
@info(name = 'result')
from every ( e1 = e1_OutputStream ) -> every ( e2 = e2_OutputStream[ srcAddress == e1.srcAddress
and deviceCat == e1.deviceCat
and srcUsername == e1.srcUsername
and destAddress == e1.destAddress
and appProtocol == e1.appProtocol ] )
select 'relationEvent' as event, e1.srcAddress, e1.deviceCat, e1.srcUsername, e1.destAddress, e1.appProtocol
insert into resultOutputStream;;
what should I do to complete my task?which is the most correct way to achive it?really appreciate to your response
Hi,
I am currently working with Siddhi v3.0.4.
Following is my test program and you will be able to find that I injected a null-included event (3th event). And the null value is for the symbol and also the symbol is the key for value partition in the query.
// Creating Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();
String executionPlan = "" +
"define stream cseEventStream (symbol string, price float, volume long); " +
"partition with (symbol of cseEventStream)" +
" begin " +
" @info(name = 'query11') " +
" from cseEventStream[volume < 150] " +
" select symbol,price " +
" insert into outputStream ;" +
" end; ";
//Generating runtime
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
executionPlanRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] inEvents) {
EventPrinter.print(0L, inEvents, null);
}
});
//Retrieving InputHandler to push events into Siddhi
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream");
//Starting event processing
executionPlanRuntime.start();
//Sending events to Siddhi
//try {
inputHandler.send(new Object[]{"IBM", 700f, 100l});
inputHandler.send(new Object[]{"WSO2", 60.5f, 200l});
inputHandler.send(new Object[]{null, 50f, 100l});
inputHandler.send(new Object[]{"GOOG", 50f, 30l});
inputHandler.send(new Object[]{"IBM", 76.6f, 400l});
inputHandler.send(new Object[]{"WSO2", 45.6f, 50l});
Thread.sleep(500);
//Shutting down the runtime
executionPlanRuntime.shutdown();
//Shutting down Siddhi
siddhiManager.shutdown();
And the execution result is following:
Jan 25, 2016 2:55:12 PM com.lmax.disruptor.FatalExceptionHandler handleEventException
SEVERE: Exception processing: 5 IndexedEvent{streamIndex=0, event=Event{timestamp=1453701312805, data=[WSO2, 45.6, 50], isExpired=false}}
java.lang.NullPointerException
at org.wso2.siddhi.core.partition.executor.ValuePartitionExecutor.execute(ValuePartitionExecutor.java:32)
at org.wso2.siddhi.core.partition.PartitionStreamReceiver.receive(PartitionStreamReceiver.java:189)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:142)
at org.wso2.siddhi.core.stream.StreamJunction.access$000(StreamJunction.java:44)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:312)
at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:39)
at org.wso2.siddhi.core.stream.input.SingleThreadEntryValve.send(SingleThreadEntryValve.java:59)
at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.sendEvents(SingleStreamEntryValve.java:166)
at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.onEvent(SingleStreamEntryValve.java:149)
at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.onEvent(SingleStreamEntryValve.java:119)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "Siddhi-3aab83cb-5578-418d-9c2a-1091ac3e5491-executor-thread-0" java.lang.RuntimeException: java.lang.NullPointerException
at com.lmax.disruptor.FatalExceptionHandler.handleEventException(FatalExceptionHandler.java:45)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.wso2.siddhi.core.partition.executor.ValuePartitionExecutor.execute(ValuePartitionExecutor.java:32)
at org.wso2.siddhi.core.partition.PartitionStreamReceiver.receive(PartitionStreamReceiver.java:189)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:142)
at org.wso2.siddhi.core.stream.StreamJunction.access$000(StreamJunction.java:44)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:312)
at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:39)
at org.wso2.siddhi.core.stream.input.SingleThreadEntryValve.send(SingleThreadEntryValve.java:59)
at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.sendEvents(SingleStreamEntryValve.java:166)
at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.onEvent(SingleStreamEntryValve.java:149)
at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.onEvent(SingleStreamEntryValve.java:119)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
... 3 more
... 3 more
I found that the PartitionStreamReceiver checks whether the currentKey from partitionExecutor.execute(nextEvent) is null or not. However the partitionExecutor.execute(nextEvent) throws NullPointerException. Following is the relevant part of source of PartitionStreamReceiver.java (line 189 ~ 191).
String currentKey = partitionExecutor.execute(nextEvent);
if (currentKey != null) {
if (key == null) {
So I tried to change the ValuePartitionExecutor.java, from
public String execute(ComplexEvent event) {
return expressionExecutor.execute(event).toString();
}
to
public String execute(ComplexEvent event) {
String result = null;
try {
result = expressionExecutor.execute(event).toString();
} catch(NullPointerException e) {
result = null;
}
return result;
}
And then I got no errors, but the null-included event is not emitted also.
I don't know if the NullPointerException can be regarded as a bug or not.
However If null values in symbol in the above query are not allowed intentionally in Siddhi, I should check all incoming events to see if it contains null value.
Hey,
I have scenario where I have n streams:
stream 1:
Person: name, security_id
stream 2:
Person: fullName, secId, email
stream 3:
Person: email
...
(may have more)
I have equality criteria such:
stream1.security_id = stream2.secId
||
stream2.email = stream3.email
...
may have more criteria
This can't be solved easily with Java as standard equality breaks here
i.e
stream1:e == stream2:e //true
stream2:e == streamc:e //true
but
stream1:e == streamc:e //false
I'm trying to find some approach via CEP that can help me to perform these correlation/aggregation by rules and was wondering if Siddhi can help,
Thanks
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.