Coder Social home page Coder Social logo

siddhi-io / siddhi Goto Github PK

View Code? Open in Web Editor NEW
1.5K 122.0 522.0 34.31 MB

Stream Processing and Complex Event Processing Engine

Home Page: http://siddhi.io

License: Apache License 2.0

Java 99.63% ANTLR 0.23% Shell 0.01% FreeMarker 0.13%
streaming-data-processing cep stream-processing complex-event-processing online-learning cloud-native cloud library cncf

siddhi's People

Contributors

anoukh avatar anugayan avatar ashensw avatar dependabot-preview[bot] avatar dependabot-support avatar dilini-muthumala avatar dnwick avatar gokul avatar grainier avatar ksdperera avatar lasanthafdo avatar minudika avatar miyurud avatar mohanvive avatar nirmal070125 avatar niveathika avatar pcnfernando avatar rajeev3001 avatar ramindu90 avatar rolandhewage avatar rukshiw avatar sacjaya avatar sajithshn avatar senthuran16 avatar suhothayan avatar swsachith avatar sybernix avatar thilia avatar tishan89 avatar wso2-jenkins-bot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

siddhi's Issues

SiddhiExtensionLoader does not support jar file which file name suffix is NOT "jar"

Siddhi is looking for extension files (*.siddhiext) through java.class.path, however, streaming processing engine like Flink (taskmanager) does not put the library in java.class.path, it is in taskmanager.tmp.dirs or java.io.tmpdir, and these libraries' file name suffix is NOT "jar". I think we need to enhance SiddhiExtensionLoader to support external class path and read file type rather than determine by file name suffix.

NPE on snapshot restore with window timebatch

If there is a snapshot of an execution plan that uses a time window, a null pointer exception is thrown when it is retrieved. I tested with versions 3.1.2 and 4.0.0-M1.

Exception in thread "main" java.lang.NullPointerException
	at org.wso2.siddhi.core.query.processor.stream.window.TimeBatchWindowProcessor.restoreState(TimeBatchWindowProcessor.java:249)
	at org.wso2.siddhi.core.util.snapshot.SnapshotService.restore(SnapshotService.java:112)
	at org.wso2.siddhi.core.ExecutionPlanRuntime.restore(ExecutionPlanRuntime.java:277)
	at test.Test.main(Test.java:48)

This is the test code. The exception is thrown after the second execution.

public class Test {

    public static void main(String[] args) throws Exception {
        StringBuilder plan = new StringBuilder();
        plan.append("@config(async = 'true')\n");
        plan.append("define stream inStream (temp float);\n");
        plan.append("from inStream#window.timeBatch(3000)\n");
        plan.append("select avg(temp) as temp\n");
        plan.append("insert into outStream\n");

        SiddhiManager siddhiManager = new SiddhiManager();

        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(plan.toString());

        executionPlanRuntime.addCallback("outStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event e : events) {
                    System.out.println(Arrays.toString(e.getData()));
                }
            }
        });

        File f = new File("plan.bytes");

        if (f.exists()) {
            byte[] buffer = new byte[8192];
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            int aux = 0;
            FileInputStream fis = new FileInputStream(f);
            while ((aux = fis.read(buffer)) > 0) {
                baos.write(buffer, 0, aux);
            }
            fis.close();
            executionPlanRuntime.restore(baos.toByteArray());
        }

        InputHandler inputHandler = executionPlanRuntime.getInputHandler("inStream");

        executionPlanRuntime.start();

        for (int i = 1; i <= 1000; i++) {
            inputHandler.send(new Object[] {randomTemp()});
        }
        Thread.sleep(3000);
        byte[] bytes = executionPlanRuntime.snapshot();
        FileOutputStream fos = new FileOutputStream(f);
        fos.write(bytes);
        fos.flush();
        fos.close();
        executionPlanRuntime.shutdown();
        siddhiManager.shutdown();
    }

    private static float randomTemp() {
        return (float) (Math.random() * 100);
    }
}

Thanks

Support sharing of stream representations

Reporter : Leo Romanoff ([email protected])
Jirra url : https://wso2.org/jira/browse/SIDDHI-10

Description :

In some of my test-cases I wanted to avoid using a single stream for all tenants (because it is very slow). So, I created one stream per tenant (e.g. 300000 streams). All such streams are structurally the same, but have different names. I noticed that it consumes quite some memory, because stream definitions are not shared, even though they are immutable as far as I understand.

May be it would be a good idea to share stream definitions if they are the same? I.e. StreamDefinition has two fields: "String name" and "StreamRepresentation streamRep". The representation part could be shared by all streams with the same structure.

Even better idea could be to allow custom types. Then you could do something like:
define type MyType (fiield1 string, field2 int, field3 float)
define stream MyStream1 MyType
define stream MyStream2 MyType
define stream MyStream3 MyType
define stream MyStream4 MyType
...

Plus, if custom types could be defined, one could allow using them in stream/type definitions, e.g.:
define type MySecondType (fiield1 string, field2 int, field3 float, field3 MyType)

Disk backed window

Reporter : Manoj Gunawardena ([email protected])
Jirra url : https://wso2.org/jira/browse/SIDDHI-4

Description :
For large time frames and large number of events the memory based event queues might be running out of memory. The idea of disk backed window is, when the number of events increased in the incoming queue, the part of the queue is written to the disk and once events required to emit, read from disk and load to memory in advance.

Almost no javadoc in source code?

Any plan to add javadoc to the source code?
It definitely helps user to better understand how Siddhi works, and of course would be more contributions.

question about counting pattern?

i want use connting pattern with time gap,but the result makes me confused.The example i wrote is a brute force login success dection and below is the execution plan:

@Plan:name('TestExecutionPlan')

define stream rawStream ( catBehavior string, catOutcome string, srcAddress string, 
             deviceCat string, srcUsername string, 
             catObject string, destAddress string, appProtocol string ); 
 
@info(name = 'condition1') 
from rawStream[ catBehavior == '/Authentication/Verify' 
         and catOutcome == 'FAIL' and not( srcAddress is null ) ]#window.time(1 min) 
select srcAddress, deviceCat, srcUsername, destAddress, appProtocol 
insert into e1_OutputStream;

@info(name = 'condition2') 
from rawStream[ catBehavior == '/Authentication/Verify' 
         and catOutcome == 'OK' and not( srcAddress is null ) ]#window.time(1 min)  
select srcAddress, deviceCat, srcUsername, destAddress, appProtocol 
insert into e2_OutputStreamOutputStream;

@info(name = 'result') 
from every ( e1 = e1_OutputStream<9:> ) -> e2 = e2_OutputStream[ e1.srcAddress == srcAddress 
                                and e1.deviceCat == deviceCat 
                                and e1.srcUsername == srcUsername 
                                and e1.destAddress == destAddress 
                                and e1.appProtocol == appProtocol ]
within 10 second 
select 'relationEvent' as event, e1.srcAddress, 
e1.deviceCat, e1.srcUsername, e1.destAddress, e1.appProtocol 
insert into resultOutputStream;

and the event stream is below,include 9 FAIL, 1 SUCCESS events

rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,FAIL,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]
delay(2000)
rawStream=[/Authentication/Verify,OK,1.1.1.1,deviceCat,srcUsername,catObject,destAddress,appProtocol]

9 FAIL login and 1 SUC login will trigger the rule,so i wrote e1 = e1_OutputStream<9:>,also i want detect during a time period,so i wrote within 10 second .the event streams i sent should not trigger the rule( the sum of delay time > 10 sec ),but the result is that it do triggered the rule.so i'm wondering the time gap i defined,matches with the first event or the last event?
9 FAIL login and 1 SUC login will trigger the rule,so i wrote e1 = e1_OutputStream<9:>,also i want detect during a time period,so i wrote within 10 second ,the event streams i sent should not trigger the rule,but the result is it do triggered the rule.so i'm wornding

has distinctcount been Deprecated

i found it in the document,but when i used it in my code,error occured.And i can not found DistinctcountAttributeAggregator.java in latest siddhi version

Plugin plugin:2.1.2 or one of its dependencies could not be resolved

Hi, when I try to build the siddhi project using mvn package, it gives the following error, can you help me to resolve this error ?
Thank you

`[INFO] ------------------------------------------------------------------------
[ERROR] Plugin org.apache.maven.plugins:maven-source-plugin:2.1.2 or one of its dependencies could not be resolved: Failed to read artifact descriptor for org.apache.maven.plugins:maven-source-plugin:jar:2.1.2: 1 problem was encountered while building the effective model
[ERROR] [FATAL] Non-readable POM C:\Users\Roledene.m2\repository\org\apache\maven\plugins\maven-source-plugin\2.1.2\maven-source-plugin-2.1.2.pom: no more data available - expected end tag to close start tag from line 22, parser stopped on START_DOCUMENT seen ...mlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoc... @22:119 @
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:

[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginResolutionException`

ExternalTimeBatch window to support round

Currently external time batch is trigger by events driven, which is correct and expected. But there is no time round.

Is that possible to support feature like
from stream#window.externalTimeBatch(timestamp, 5 min, 1 second), here the optional third parameter is to specify the round. That's if given 1 seconds, each time window should be starting seconds. (Instead, currently we might start by event timestamp which is not necessary to be round as starting seconds)

ExecutionPlanRuntime is not shutting down

Given the test class below, it appears that ExecutionPlanRuntime is not shutting down.

package eu.ferari.examples.distributedcount.states;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.util.EventPrinter;

public class STState {
    private static final String eventStreamDefinition =
        "define stream cdrEventStream (nodeId string, phone string, timeStamp long); ";
    private static final String query =
        "@info(name = 'query1') from cdrEventStream#window.externalTime(timeStamp,10 sec) " + "select nodeId, phone, timeStamp, count(timeStamp) as callCount group by phone " + "insert all events into outputStream;";

    private final SiddhiManager siddhiManager = new SiddhiManager();
    private final ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(eventStreamDefinition + query);
    private final InputHandler inputHandler = executionPlanRuntime.getInputHandler("cdrEventStream");

    private int inEventCount;
    private int removeEventCount;

    public void externalTimeWindowTest1() throws InterruptedException {

        SiddhiManager siddhiManager = new SiddhiManager();

        String cseEventStream = "" +
            "define stream LoginEvents (timeStamp long, ip string, phone string) ;";
        String query = "" +
            "@info(name = 'query1') " +
            "from LoginEvents#window.externalTime(timeStamp,5 sec) " +
            "select timeStamp, count(ip) as cccount, phone group by phone " +
            "insert all events into uniqueIps ;";

        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);

        executionPlanRuntime.addCallback("query1", new QueryCallback() {@Override
            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
            }

        });

        InputHandler inputHandler = executionPlanRuntime.getInputHandler("LoginEvents");
        executionPlanRuntime.start();

        inputHandler.send(new Object[] {
            1366335804341l, "192.10.1.3", "1"
        });

        executionPlanRuntime.shutdown();
        System.out.println("Done");

    }

    public static void main(String[] args) throws InterruptedException {
        STState s = new STState();
        s.externalTimeWindowTest1();
    }
}

Use of group by with time window

Using siddhi 3.0.3 as Java library

I am seeing some unexpected behavior when using group by with a time window. The general issue is that I get the correct result aggregations for the group, but I receive one aggregated result per event, not one per group.

Specifically,

Using the following query, with no time window:

@info(name = 'RealTimeQuery') from MyEventStream select EventName as EventName, count() as Count, avg(ElapsedTime) as Avg_ElapsedTime group by EventName output snapshot every 30 seconds insert into CEP_Result;

I get one result per unique EventName, with the proper counts and avg. So, in my example, there are 15 unique EventNames, and each unique EventName occurs 10 times for a total of 150 events. I get 15 results, one for each EventName, with each having a count of 10 and the correct average, as I would expect.

However, if I add a time window to this query:

@info(name = 'RealTimeQuery') from MyEventStream#window.time(2 minutes) select EventName as EventName, count() as Count, avg(ElapsedTime) as Avg_ElapsedTime group by EventName output snapshot every 30 seconds insert into CEP_Result;

The result for each EventName is correct, as it has the correct count and average. But, instead of one result per EventName, I get 10, for a total of 150 results.

We're looking to replace Esper with Siddhi, so I ran similar queries through Esper, and with Esper, in both cases, I received the expected output of one result per EventName.

How to get the filtered results per sliding time window

I recently adopted Siddhi CEP engine 3.1.2 for my own research. However, one problem hinders me from going further. For example, I would like to use a 1-minute sliding time window for processing an event stream. I wonder if I can get access to the filtered events specific to each of the sliding window. That is, I want to collect all filtered events for sliding time window 1, sliding time window 2, etc... rather than the final messed outputs

HA ussage as library

Hello!

I am trying to use siddhi as a library in my own project but I don't really know how to configure programmatically the siddhi manager to use it in HA mode without using the whole "wso2 cep framework".
Is this possible? would it be possible to get any example? please help!! I am really lost with siddhi 3.0, and I can't find any useful documentation or guide to use it as an independent library...

Thank you very much!!!

Has problems with Siddhi group by with a time window

Has problems with Siddhi group by with a time window, I am able to get the correct result aggregations for the group, but I receive one aggregated result per event, not one per group.

Here is the query:
define stream bootCorrelationStream (logLevel string, message string, similarityId string, timestamp long, uuid string); @info(name = 'bootCorrelation') from bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2 min) select similarityId, adanos:first(message) as message, min(timestamp) as startTime, max(timestamp) as endTime group by uuid, similarityId insert into tempStream;

Input data:
logLevel="ERROR", message="ERROR first message", timestamp="2016-05-21 01:22:07.579", uuid="94350d43-8b6b-4669-8561-2f956b05a341", similarityId="b9b35d4a-03b9-4b36-9c02-302c63a2e382",uuid=94350d43-8b6b-4669-8561-2f956b05a341
logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21 01:22:08.314", uuid="94350d43-8b6b-4669-8561-2f956b05a341", similarityId="9ced5924-11b5-4008-9cf0-ea4a32311ce6",uuid=94350d43-8b6b-4669-8561-2f956b05a341
logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21 01:22:15.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341", similarityId="9ced5924-11b5-4008-9cf0-ea4a32311ce6",uuid=94350d43-8b6b-4669-8561-2f956b05a341
logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21 01:25:07.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341", similarityId="9ced5924-11b5-4008-9cf0-ea4a32311ce6",uuid=94350d43-8b6b-4669-8561-2f956b05a341

Here is the output:
Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579, similarityId=b9b35d4a-03b9-4b36-9c02-302c63a2e382, uuid=94350d43-8b6b-4669-8561-2f956b05a341
Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314, similarityId=9ced5924-11b5-4008-9cf0-ea4a32311ce6, uuid=94350d43-8b6b-4669-8561-2f956b05a341
Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314, similarityId=9ced5924-11b5-4008-9cf0-ea4a32311ce6, uuid=94350d43-8b6b-4669-8561-2f956b05a341
Count: 1; message=ERROR second message, timestamp=2016-05-21 01:25:07.017, similarityId=9ced5924-11b5-4008-9cf0-ea4a32311ce6, uuid=94350d43-8b6b-4669-8561-2f956b05a341

Here is the major code:
register callback
`
this.siddhiRuntime = new SiddhiRuntimeHolder();
this.siddhiRuntime.siddhiManager = new SiddhiManager();
ExecutionPlanRuntime executionPlanRuntime = this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
this.siddhiRuntime.inputHandler = executionPlanRuntime.getInputHandler(this.getStreamName());

    final SiddhiBolt siddhiBolt = this;
    this.siddhiRuntime.queryCallback = new QueryCallback() {
        @Override
        public void receive(long timeStamp, org.wso2.siddhi.core.event.Event[] inEvents,
                org.wso2.siddhi.core.event.Event[] removeEvents) {
            siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
        }
    };
    executionPlanRuntime.addCallback("query", this.siddhiRuntime.queryCallback);
    executionPlanRuntime.start();

`

query details
StringBuilder executionPlane = new StringBuilder(" define stream bootCorrelationStream "); executionPlane.append("(logLevel string, message string, similarityId string, timestamp long, uuid string); "); executionPlane.append(" @info(name = 'bootCorrelation') "); // externalTimeBatch(timestamp, 5 min), batch time window for specified timestamp executionPlane.append("from bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2 min) "); executionPlane.append(" select similarityId, adanos:first(message) as message, uuid, "); executionPlane.append(" min(timestamp) as startTime, max(timestamp) as endTime "); executionPlane.append(" group by uuid, similarityId "); executionPlane.append(" insert into tempStream; "); appendLogs(executionPlane.toString()); return executionPlane.toString();

callback details
`
Map<String, List> tempMap = new HashMap<String, List>();
for (Event event : inEvents) {
Object[] data = event.getData();
BootCorrelationAggregationData aggregateData = new BootCorrelationAggregationData();
aggregateData.setSimilarityId((String) data[0]);
aggregateData.setMessage((String) data[1]);
aggregateData.setUuid((String) data[2]);
aggregateData.setTimestamp((Long) data[3]);
if (!tempMap.containsKey(aggregateData.getUuid())) {
tempMap.put(aggregateData.getUuid(), new ArrayList());
}
tempMap.get(aggregateData.getUuid()).add(aggregateData);
}

    List<BootCorrelationAggregationData> emitList = new ArrayList<BootCorrelationAggregationData>();
    Iterator<Entry<String, List<BootCorrelationAggregationData>>> it = tempMap.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, List<BootCorrelationAggregationData>> entry = it.next();
        List<BootCorrelationAggregationData> temp = entry.getValue();
        Collections.sort(temp, new Comparator<BootCorrelationAggregationData>() {
            @Override
            public int compare(BootCorrelationAggregationData o1, BootCorrelationAggregationData o2) {
                return (int) (o1.getTimestamp() - o2.getTimestamp());
            }
        });

        if (temp.size() > 0) {
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            String message = String.format("Count: %s; message=%s, timestamp=%s, similarityId=%s, uuid=%s", temp.size(), 
                    temp.get(0).getMessage(), sdf.format(new Date(temp.get(0).getTimestamp())), 
                    temp.get(0).getSimilarityId(), temp.get(0).getUuid())               LOG.info(message);
            emitList.add(temp.get(0));
        }
    }

    if (emitList.size() > 0) {
        this.getOutputCollector().emit(new Values(emitList));
        LOG.info(String.format("BootCorrelationSiddhiBolt emit size: %s", emitList.size()));
        for (BootCorrelationAggregationData data : emitList) {
            LOG.info(String.format("BootCorrelationSiddhiBolt emits %s", ToStringBuilder.reflectionToString(data)));
        }
    }

`

NullpointerException reported in TimeWindowProcessor

Using Pattern query to detect non-occurrence of an event. After testing for a while, seeing Dropping event exception. Using 3.1.0 version. Should their be lock in handling pendingEventList in LogicalPreStateProcessor. ?

2017-03-13 16:31:36,671 | ERROR | heduler-thread-4 | AbstractStreamProcessor | 188 - siddhi-core - 3.1.0 | Dropping event chunk EventChunk{first=StreamEvent{timestamp=1489422696671, beforeWindowData=null, onAfterWindowData=null, outputData=[C006D20050220031, 1489422336000, 424660, AP_CAPABLE], type=CURRENT, next=null}}, error in processing org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor, null
java.lang.NullPointerException
at org.wso2.siddhi.core.query.input.stream.state.LogicalPreStateProcessor.processAndReturn(LogicalPreStateProcessor.java:116)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.SingleProcessStreamReceiver.processAndClear(SingleProcessStreamReceiver.java:56)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:77)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:99)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.stream.state.receiver.PatternSingleProcessStreamReceiver.receive(PatternSingleProcessStreamReceiver.java:43)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:127)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:329)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor.process(TimeWindowProcessor.java:123)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:57)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:101)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.query.input.stream.single.EntryValveProcessor.process(EntryValveProcessor.java:47)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.util.Scheduler$EventCaller.run(Scheduler.java:171)[188:siddhi-core:3.1.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[:1.8.0_66]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_66]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)[:1.8.0_66]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)[:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_66]

Another stack trace reported at:
java.lang.NullPointerException
at java.util.LinkedList$ListItr.next(LinkedList.java:893)[:1.8.0_66]
at org.wso2.siddhi.core.query.input.stream.state.LogicalPreStateProcessor.processAndReturn(LogicalPreStateProcessor.java:115)
at org.wso2.siddhi.core.query.input.StateMultiProcessStreamReceiver.processAndClear(StateMultiProcessStreamReceiver.java:49)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.process(MultiProcessStreamReceiver.java:75)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.receive(MultiProcessStreamReceiver.java:112)
at org.wso2.siddhi.core.query.input.stream.state.receiver.PatternMultiProcessStreamReceiver.receive(PatternMultiProcessStreamReceiver.java:58)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:149)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:334)
at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:34)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:41)[188:siddhi-core:3.1.0]
at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:57)[188:siddhi-core:3.1.0]

how i can execute query from two stream?

what proplem in syntax in below query?
<sw:siddhi-query
selector="from LocationStream#window.time( 60 min ) as e1, e2 = LocationStream[(latitude != e1.latitude or longitude != e1.longitude) and e1.assignment == assignment] as e3 join MeasurementStream[mxname=='speed']#window.time( 60 min ) as e2 on e1.assignment==e2.assignment select e1.assignment, e1.latit$
<sw:groovy-stream-processor scriptPath="siddhiExecuteCommand.groovy"
stream="LocationChanged"/>
/sw:siddhi-query

Siddhi CEP 3.1.0 hit deadlock condition on pattern query execution

I recently moved to Siddhi CEP 3.1.0 version. Executing this plan at the rate of 400 events/second. After 3 hours, found events are not processed as Siddhi threads are is in deadlock condition

"@plan:name('cepexecutionplan') " +
"define stream deviceCheckInStream (serialNum string,currentCheckInTime long, sequenceNum long); " +
"define stream swCompCheckInStream (serialNum string, timestamp long, sequenceNum long, info1 object, info2 object, streamType string); " +

     "@info(name = 'query1') " +
     " from deviceCheckInStream#window.time(6 minutes) " + 
     "select * "+                   
     " insert expired events into delayedDeviceCheckInStream; " + 

     "@info(name = 'query2') " + 
     " from (every e1=deviceCheckInStream -> nonOccurringEvent = deviceCheckInStream[sequenceNum > e1.sequenceNum and serialNum == e1.serialNum] or"
     + " delayedEvent=delayedDeviceCheckInStream[e1.sequenceNum ==  sequenceNum and serialNum == e1.serialNum]) within 370 sec "+ 
     " select e1.serialNum as serialNum, e1.currentCheckInTime as lastSuccessfulCheckInTime,  "
     + "nonOccurringEvent.currentCheckInTime as nonOccurringId, e1.sequenceNum as lastSuccessfulCheckInSeqNum  "+ 
     " having (nonOccurringId is null) "+ 
     " insert into devCheckInNonOccurrenceStream; " +

     "@info(name = 'query3') " +  
     " from (every e1=swCompCheckInStream -> e2 = swCompCheckInStream[sequenceNum != e1.sequenceNum and serialNum == e1.serialNum]) within 6 min "+ 
     " select e2.sequenceNum as currSeq, e1.serialNum, e2.timestamp, e1.streamType as prevStreamType, e2.streamType as currStreamType, "
     + "e1.info1 as previnfo1, e2.info1 as currinfo1, "
     + "e1.info2 as previnfo2, e2.info2 as currinfo2 "
     + " insert into swCompStatusOutputStream; ";`

Lock obtained in PatternMultiProcessStreamReceiver is stuck in getLastEvent, below is the thread dump traces. Any idea when this could happen? Any help on this?

at org.wso2.siddhi.core.event.ComplexEventChunk.getLastEvent(ComplexEventChunk.java:107)
at org.wso2.siddhi.core.event.ComplexEventChunk.add(ComplexEventChunk.java:100)
at org.wso2.siddhi.core.query.input.stream.state.LogicalPreStateProcessor.processAndReturn(LogicalPreStateProcessor.java:124)
at org.wso2.siddhi.core.query.input.StateMultiProcessStreamReceiver.processAndClear(StateMultiProcessStreamReceiver.java:49)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.process(MultiProcessStreamReceiver.java:75)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.receive(MultiProcessStreamReceiver.java:112)

  • locked <0x0000000084260b08> (a org.wso2.siddhi.core.query.input.stream.state.receiver.PatternMultiProcessStreamReceiver)
    at org.wso2.siddhi.core.query.input.stream.state.receiver.PatternMultiProcessStreamReceiver.receive(PatternMultiProcessStreamReceiver.java:58)
    at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:149)
    at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:334)
    at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:34)
    at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:41)
    at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:57)
    at

Improving performance by building a tree for the input query to early detection for a given filter

Reporter : Lahiru Gunathilaka ([email protected])
Jirra url : https://wso2.org/jira/browse/SIDDHI-6

Description :
According to this article[1] siddhi is expecting users to provide the query in an efficient way and siddi evaluate the filters, but if we can arrange the filters (this will be useful for complex filtering logics) and build a high performance tree which leads to a better early failure/success detection that would be great and users really doesn't have to worry about it.

[1]http://wso2.com/library/articles/2013/06/understanding-siddhi-powers-wso2-cep-2x/

mvn package fails on fresh clone of Master

I cloned the master branch and tried to create packages. Build stopped at failed tests...

Failed tests:
testSnapshotOutputRateLimitQuery1(org.wso2.siddhi.core.query.ratelimit.SnapshotOutputRateLimitTestCase)
testSnapshotOutputRateLimitQuery2(org.wso2.siddhi.core.query.ratelimit.SnapshotOutputRateLimitTestCase)
testSnapshotOutputRateLimitQuery19(org.wso2.siddhi.core.query.ratelimit.SnapshotOutputRateLimitTestCase)
testSnapshotOutputRateLimitQuery20(org.wso2.siddhi.core.query.ratelimit.SnapshotOutputRateLimitTestCase)
MultiThreadedTest2(org.wso2.siddhi.core.stream.JunctionTestCase)
MultiThreadedWithEventPoolTest(org.wso2.siddhi.core.stream.JunctionTestCase)
FilterTest1(org.wso2.siddhi.core.query.FilterTestCase)

Tests run: 457, Failures: 7, Errors: 0, Skipped: 0

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Siddhi ............................................. SUCCESS [02:50 min]
[INFO] Siddhi Query API ................................... SUCCESS [ 59.150 s]
[INFO] Siddhi Query Compiler .............................. SUCCESS [01:12 min]
[INFO] Siddhi Core ........................................ FAILURE [08:57 min]
[INFO] Siddhi Extensions .................................. SKIPPED
[INFO] Siddhi Extension - Evaluate Scripts ................ SKIPPED
[INFO] Siddhi Extension - String .......................... SKIPPED
[INFO] Siddhi Extension - Geo ............................. SKIPPED
[INFO] Siddhi Extension - Math ............................ SKIPPED
[INFO] Siddhi Extension - Event Table ..................... SKIPPED
[INFO] Siddhi Extension - RegEx ........................... SKIPPED
[INFO] Siddhi Extension - Time ............................ SKIPPED
[INFO] Siddhi Extension - Time Series ..................... SKIPPED
[INFO] Siddhi Samples ..................................... SKIPPED
[INFO] Siddhi Performance Samples ......................... SKIPPED
[INFO] Siddhi Quick Start Samples ......................... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 15:26 min
[INFO] Finished at: 2015-07-28T08:53:44-07:00
[INFO] Final Memory: 53M/226M
[INFO] ------------------------------------------------------------------------

Publish siddhi libraries in Maven Central

Reporter : Aitor Argomaniz ([email protected] )
Jirra url : https://wso2.org/jira/browse/SIDDHI-5

Description :
Currently to integrate Siddhi in a Maven project it's necessary setup the pom.xml to use the wso2 maven repository because any version of the CEP is published in Maven Central Repository:
http://search.maven.org/#search%7Cga%7C1%7Csiddhi-core

It would be great you publish new releases of CEP on Maven Central repository in order to don't do necessary modify the pom.xml files to add custom repositores. Also It would give more relevance to the project increasing the visibility of your work.

Siddhi goes OOM when having millions of partitions

If you define a partition over a felid that has large number (e.g. Million) of different values CEP with go OOM.

Currently, for about 20K partitions, it will go OOM for with 256MB. This has to happen at some point, however above is too early. We need to make partitions as lightweight as possible as it will be used as a way of grouping.

How to implement stock quote book demo by CEP/SiddhiQL

Background and Goal

I am new to CEP and Siddhi. I am trying to write a stock trading quote demo based CEP engine Siddhi.
I still can't get good idea for implementation after read the document of Siddhi and code examples. Actually there is not any example for real case.
Basicly the demo idea is building market quote book by individual quote from exchange line handler.
If I write it by java codes then it should be clear and easy example but I really want to make it by SiddhiQL in generic way.

If you could help/demo on this then I will appreciate very much.

Requirement

The quote event object from exchange looks like:
Quote{uuid,productId,exchTime,bidPrice,bidSize,offerPrice,offerSize}

The quote book looks like this to end user, keep top 3 best bid/offer prices in quote book.

productId, bid3Price,bid3Size,bid2Price,bid2Size,bid1Price,bid1Size,offer1Price,offer1Size,offer2Price,offer2Size,offer3Price,offer3Size
productId=IBM,bid3Price=100, bid3Size=5k(16k),bid2Price=101,bid2Size=10k(11k),bid1Price=102, bid1Size=1k(1k), offer1Price=103, offer1Size=2k(2k),offer2Price=104,offer2Size=1k(3k),offer3Price=105,offer3Size=1k(4k)

bid1 is best bid price.
Accumulative bid2Size = bid2Size + bid1Size
Accumulative bid3Size = bid3Size + accumulative bid2Size

offer1 is best offer price
Accumulative offer2Size =offer2Size + offer1Size
Accumulative offer3Size = offer3Size + accumulative offer2Size

The java/c++ code process likes below

  1. Listen the exchange market quote, receive a new quote event.
  2. Create a bid list and a offer list for this product if absents.
  3. If the new quote has bid price, then get existing bid list of this product, add new or update current quote by bid price and size, or ignore if it is worse then current top 3 bid prices. The logic of offer quote is similar.
  4. End user's application can get notification if current quote book changed.

SiddhiQL process maybe look like this:

I tried to implement this logic in Siddhi by SiddhiQL as I knew:

  1. Define quote stream:
    define stream Quote (uuid string, productId string, exchTime long,
    bidPrice double,bidSize double
    offerPrice double,offerSize double);
  2. Define bid and offer streams for individual product, e.g. (..I think this is not a good idea but don't know to optimize by QL)
    define stream IBMBidQuote (uuid string, productId string, exchTime long,
    bidPrice double,bidSize double);
    define stream IBMOfferQuote (uuid string, productId string, exchTime long,
    offerPrice double,offerSize double);
  3. Define quote book table to provide query service for end user:
    define table QuoteBook (productId string,
    bid1Price double,bid1Size double,bid2Price double,bid2Size double,bid3Price double,bid3Size double,
    offer1Price double,offer1Size double,offer2Price double,offer2Size double,offer3Price double,offer3Size double);
  4. Separate product bid quotes and offer quotes by QL:
    From Quote[bidPrice>0 and productId='IBM']
    Select uuid, productId, bidPrice, exchTime, bidSize
    Insert into IBMBidQuote
    ...(Offer side is similar as bid)
  5. Keep top 10 best prices in bid/offer side quote list, eventually will filter top 3 as result into Quote to user.
    ...(Don't know how to implement by QL)
  6. End user add a listener to listen the new quote book changes by QL:
    From QuoteBook[productId in ('IBM','MSFT']
    Select productId,bid1Price,.......

Last

I hope I have explained my thought clearly :)

Issue of using aggregate function on length batch

My code:
SiddhiManager siddhiManager = new SiddhiManager();

    String executionPlan = "" +
            "define stream cseEventStream (symbol string, price float, volume long);" +
            "" +
            "@info(name = 'query1') " +
            "from cseEventStream[700 > price]#window.lengthBatch(4)" +
            "select symbol, price, avg(price) as avgPrice " +
            "insert expired events into outputStream;";

    ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
    executionPlanRuntime.addCallback("outputStream", new StreamCallback() {

        @Override
        public void receive(Event[] events) {
            System.out.println("receive events: " + events.length);
            for (Event e:events)
                System.out.println(e);
        }
    });


    InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream");
    executionPlanRuntime.start();
    int i = 0;
    while (i < 10) {
        float p = i*10;
        inputHandler.send(new Object[]{"WSO2", p, 100L});
        System.out.println("\"WSO2\", " + p);
        inputHandler.send(new Object[] {"IBM", p, 100L});
        System.out.println("\"IBM\", " + p);
        Thread.sleep(1000);
        i++;
    }

So, I think that when event expired, i can get the avg of price for events batch.
but the result is,
"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
"IBM", 30.0
receive events: 1
Event{timestamp=1447933077892, data=[IBM, 10.0, 0.0], isExpired=false}
"WSO2", 40.0
"IBM", 40.0
"WSO2", 50.0
"IBM", 50.0
receive events: 1
Event{timestamp=1447933079894, data=[IBM, 30.0, 0.0], isExpired=false}

you can see that the avg of price is 0.

External time sliding window does not remove expired events

You can only see AttributeAggregator.processAdd but not AttributeAggregator.processRemove in external time sliding window. The issue is caused by we don't reset expiredEventChunk before removing expired events, and the lastReturned event does not have next event.

Very bad performance when a huge number of queries is used with a single input stream

Reportor : Leo Romanoff ([email protected] )
Jirra url : https://wso2.org/jira/browse/SIDDHI-2

Description :
I created a few test-cases to check experimentally how Siddhi behaves under load. I created a single instance of a SiddhiManager, added 10000 queries that all read from the same input stream, check if a specific attribute (namely, price) of an event is inside a given random interval ( [ price >= random_low and price <= random_high] ) and output into randomly into one of 100 streams. Then I measured the time required to process 1000000 events using this setup. I also did exactly the same experiment with Esper.

My findings were that Siddhi is much, much slower than Esper in this setup. After looking into the internal implementations of both, I realized the reason. Siddhi processes all queries that read from the same input stream in a linear fashion, sequentially. Even if many of the queries have almost the same condition, no optimization attempts are done by Siddhi. Esper detects that many queries have a condition on the same variable and create some sort of a decision tree. As a result, Esper's running time in O(log N), where as Siddhi needs O.

I'm not saying that this test-case if very typical or important, but may be Siddhi should try to analyze the complete set of queries and try to apply some optimizations, when it is possible? I.e. it is a bit of a global optimization applied. It could detect some common sub-expressions or sub-conditions in the queries and evaluate them only once, instead of doing it over and over again by evaluating each query separately.

After getting these first results, I changed the setup, so that each query uses one of many input streams (e.g. one of 300) instead of using the same one. This greatly improved the situation, because now the number of queries per input stream was much smaller and thus processing was way faster. But even in this setup it is still about 5-6 times slower than Esper in this situation.

The maven project is attached, so that you can try to reproduce the issue.

ceptest 2.zip
ceptest.zip
_thumb_38195
_thumb_38196

Race condition in snapshot()

The snapshot/persist function has arace condition .. see the code example below. Note the Thread.sleep and the comment.
And window.TimeBatch is even more broken version >3.0.3. snapshotting only works if there is no pending data. eg snapshot is taken after a query and before a new events arrive. Otherwise same NPE as described below.

public static void main(String[] args) throws InterruptedException, IOException {
    ExecutionPlanRuntime executionPlanRuntime = setupTimePlan();
    executionPlanRuntime.start();
    executionPlanRuntime.getInputHandler("ts").send(new Object[] { "A", 1 });
    executionPlanRuntime.getInputHandler("ts").send(new Object[] { "B", 2 });
    executionPlanRuntime.getInputHandler("ts").send(new Object[] { "C", 3 });
    executionPlanRuntime.getInputHandler("ts").send(new Object[] { "D", 4 });

    Thread.sleep(100); 

without the Sleep "sumval" in query is randomly 6, 15 and with version >3.0.4 NPE:
at org.wso2.siddhi.core.event.ComplexEventChunk.getLastEvent(ComplexEventChunk.java:81)
at org.wso2.siddhi.core.event.ComplexEventChunk.add(ComplexEventChunk.java:75)
at org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor.restoreState(TimeWindowProcessor.java:151)
at org.wso2.siddhi.core.util.snapshot.SnapshotService.restore(SnapshotService.java:67)
at org.wso2.siddhi.core.ExecutionPlanRuntime.restore(ExecutionPlanRuntime.java:193)
at com.anokor.engine.siddhi.internal.SiddhiEngine.main(SiddhiEngine.java:54)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

    byte[] state = executionPlanRuntime.snapshot();
    executionPlanRuntime.shutdown();
    ExecutionPlanRuntime newRuntime = setupTimePlan();
    newRuntime.start();
    newRuntime.restore(state);
    newRuntime.getInputHandler("ts").send(new Object[] { "Z", 10 });
    newRuntime.shutdown();
  }

  private static ExecutionPlanRuntime setupTimePlan() {
    SiddhiManager siddhiManager = new SiddhiManager();

    String cseEventStream = "define stream ts (name string, val int);";
    String query = "@info(name = 'q1') from ts#window.time(1000) select name, sum(val) as sumval insert into OutputStream";

    ExecutionPlanRuntime executionPlanRuntime = siddhiManager
        .createExecutionPlanRuntime("@Plan:name('plan') " + cseEventStream + query);

    executionPlanRuntime.addCallback("q1", new QueryCallback() {
      @Override
      public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
        EventPrinter.print(timeStamp, inEvents, removeEvents);
      }
    });
    return executionPlanRuntime;
  }

Test cases & Performance of Siddhi Joins

Reporter : @suhothayan
Jirra url : https://wso2.org/jira/browse/SIDDHI-8

Description :
There are lot of cases to test in Joins for CEP. Following are examples, and I think we need to write test cases for things we have not done.

Dimensions

  1. Join on one stream only or both
  2. Event rate of both streams (both fast, one fast other slow, both slow)
  3. Size of time window small(1s), medium (10 min), long (6 hours), very long (week)
  4. What portion of events match (1%, 10%, 70%)

So there are lot of combinations!!

Need to understand performance for each case. We need a jira for cases we have not covered.

Time Series Test case failing

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.007 sec <<< FAILURE!
testRegression(org.wso2.siddhi.test.SimpleRegOutlierTestCase) Time elapsed: 0.003 sec <<< ERROR!
org.wso2.siddhi.core.exception.QueryCreationException: No extension exist for TransformExtension{extensionName='timeseries', functionName='outlier'}
at org.wso2.siddhi.core.util.SiddhiClassLoader.loadExtensionImplementation(SiddhiClassLoader.java:69)
at org.wso2.siddhi.core.util.SiddhiClassLoader.loadProcessor(SiddhiClassLoader.java:37)
at org.wso2.siddhi.core.util.parser.StreamParser.updateQueryEventSourceOutDefinition(StreamParser.java:498)
at org.wso2.siddhi.core.query.creator.BasicQueryCreator.updateQueryEventSourceList(BasicQueryCreator.java:46)
at org.wso2.siddhi.core.query.creator.QueryCreator.init(QueryCreator.java:66)
at org.wso2.siddhi.core.query.creator.BasicQueryCreator.(BasicQueryCreator.java:40)

It can't #window.cron instead of using #window.timeBatch ?

I use #window.cron like this:
String query = "" + "@info(name = 'query1') " + "from cseEventStream[700 > price]#window.cron('*/5 * * * * ?') " + "select symbol, sum(price) as ap " + "group by symbol " + "insert into outputStream;";,
it is expected to output result every 5 seconds group by symbol,
my test result:

"WSO2", 30.0
"IBM", 30.0
"WSO2", 40.0
"IBM", 40.0
"WSO2", 50.0
"IBM", 50.0
"WSO2", 60.0
"IBM", 60.0
"WSO2", 70.0
"IBM", 70.0
receive events: 10
Event{timestamp=1473152315472, data=[WSO2, 30.0], isExpired=false}
Event{timestamp=1473152315472, data=[IBM, 30.0], isExpired=false}
Event{timestamp=1473152316472, data=[WSO2, 70.0], isExpired=false}
Event{timestamp=1473152316472, data=[IBM, 70.0], isExpired=false}
Event{timestamp=1473152317472, data=[WSO2, 120.0], isExpired=false}
Event{timestamp=1473152317472, data=[IBM, 120.0], isExpired=false}
Event{timestamp=1473152318472, data=[WSO2, 180.0], isExpired=false}
Event{timestamp=1473152318472, data=[IBM, 180.0], isExpired=false}
Event{timestamp=1473152319472, data=[WSO2, 250.0], isExpired=false}
Event{timestamp=1473152319472, data=[IBM, 250.0], isExpired=false}

it is grouped every event,

I am expecting output like this[like #window.timeBatch syntax]:

Event{timestamp=1473152319472, data=[WSO2, 250.0], isExpired=false}
Event{timestamp=1473152319472, data=[IBM, 250.0], isExpired=false}

#window.cron can not use like this?

Unable to build the project -- test case failing ???

mvn clean install is failing

testWindowQuery10(org.wso2.siddhi.test.standard.WindowTestCase) Time elapsed: 2.512 sec <<< FAILURE!
junit.framework.AssertionFailedError: Event count expected:<2> but was:<1>
at junit.framework.Assert.fail(Assert.java:50)
at junit.framework.Assert.failNotEquals(Assert.java:287)
at junit.framework.Assert.assertEquals(Assert.java:67)
at junit.framework.Assert.assertEquals(Assert.java:199)
at org.wso2.siddhi.test.standard.WindowTestCase.testWindowQuery10(WindowTestCase.java:528)

OutOfMemoryError Java heap space: due to partitions

I am using variable partitions with potentially infinite number of keys. My application run out of heap space. Checking the source code I see that partitions are added to the Map PartitionRuntime.partitionInstanceRuntimeMap but that there is no way to remove them.

I think that the CEP engine needs some mechanism to "garbage collect" partitions which are no longer needed. They can be added again if an event with the same key is received.

"Marking" partitions for GC could be done in a number of ways:

  • time to live: no new events for a certain time
  • all windows referenced by the partition are empty
  • extension of the query with commands to remove the partition

-- Ron

support aggregate functions "minby/maxby" synax like esper?

e.g:
define stream TempStream (key int, value1 int, value2 string); @info(name = 'query1') from TempStream#window.timeBatch(5 sec)

in 5 seconds, comes 4 events:
[1, 2, a]; <-- earlist event
[2, 3, b];
[1, 4, c];
[2, 5, d]; <-- latest event

I want to get "key", "value2" of the earlist event that group by "key"
In esper, I can use "select minby(time).value2" to get result : [1,a] ,[2,b]
latest: "select maxby(time).value2",and result: [1,c],[2,d]

does siddhi support this?

Siddhi external time

Hello,

I wanted to do a simple query with siddhi which gives me back the hits in a batch. I could easily do it with esper but it seems I had some problems with siddhi. Is there a way to use external time instaed of internal time ? I tried to use External Time Window feature but it only can be used for simple queries, so it is not enough for me.

Here is the poc (ESPER):

        Configuration con = new Configuration();
        //con.addEventType("LoginType", beanClass);
        con.addEventType("LoginType", LoginType.class);
        EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(con);
        String expression = "select count(loginType), timeStamp, userName,loginType,systemType,eventID, count(eventID) as ecntid from LoginType.win:time_batch(5 sec) where eventID='254' and userName='maci' ";
        //expression = "select a.timeStamp, a.userName,a.loginType,a.systemType,count(a.eventID ) as ecntid from pattern[every a=LoginType(eventID='254' and count(a.eventID) > 3) where timer:within(5 sec)";
        //expression = "select o.timeStamp, o.userName,o.systemType,o.eventID, count(o.eventID), b.timeStamp, b.eventID from pattern[every o=LoginType -> (timer:interval(5 sec) and b=LoginType(eventID=o.eventID) )]";

        expression = "select * from LoginType.win:time_batch(5 sec)  " +
                "   match_recognize ( " +
                "   measures A.eventID as aEID, B.eventID as bEID , A.timeStamp as firstStamp, B.timeStamp as secondStamp " +
                "   pattern ( A B ) " +
                "   define " +
                "   A as A.eventID = '254' and A.userName='maci' , " +
                "   B as B.eventID = '255' and prev(B.userName) = A.userName " +
                "  ) ";

        //expression = "select a.eventID,b.eventID,a.userName,b.userName,a.timeStamp,b.timeStamp from pattern[every a=LoginType(a.eventID='254') -> (timer:interval(5 sec) and b=LoginType(b.eventID='255'))]";

        /*
        expression = "select sorted(price desc).take(5) as highestprice " +
                " from LoginType.win:time(5 min)  ";
        */

        EPStatement statement = epService.getEPAdministrator().createEPL(expression);
        MyListener listener = new MyListener();
        statement.addListener(listener);

        Calendar c = Calendar.getInstance();
        EPRuntime runtime = epService.getEPRuntime();
        runtime.sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));

        long start = new Date().getTime();
        int k = 0;
        for (k = 0; k <= 100000000; k++) {
            c.add(Calendar.SECOND, 1);
            String eventID = "254";
            if (k % 3 == 0) {
                eventID = "255";
            }
            runtime.sendEvent(new CurrentTimeEvent(c.getTime().getTime()));
            runtime.sendEvent(new LoginType(new Date(c.getTime().getTime()), "type1", "any name", eventID, "windows", new Random().nextInt(1000)));

        }
        long end = new Date().getTime();
        System.out.println(end - start);


I tried to use a similar query in esper but I was not be able to use the timestamp of my logs.
Could you give me a similar example with siddhi ?
Here are my attempts to create the above example with siddhi :

    public void externalTimeWindowTest1() throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();
        String cseEventStream = "define stream LoginEvents (myTime long, ip string, phone string,price int) ;";
        String query = "@info(name = 'query1') from LoginEvents#window.timeBatch(5 sec)  "
                + "select myTime, phone, ip, price , max(price) as maxprice, min(price) as minprice, count(myTime) as cntip insert all events into OutPut ";


        /*String query = "@info(name='query1') from every a1 = LoginEvents  " +
                "            -> b1 = LoginEvents[b1.ip == a1.ip ]#window.externalTime(b1.myTime,5 second)   " +
                "       within 5 seconds  select a1.myTime, a1.phone, a1.ip, a1.price , max(a1.price) as maxprice, min(a1.price) as minprice, count(a1.myTime) as cntip insert current events into Output ";
        */
        final ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);

        executionPlanRuntime.addCallback("query1", new QueryCallback() {

            @Override
            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
                EventPrinter.print(inEvents);
                System.out.println(new Date(timeStamp));
                if (inEvents != null) {
                    System.out.println("======================== START ===============================");
                    for (Event e : inEvents) {
                        if (e.isExpired()) continue;

                        System.out.println("----------------------------");
                        System.out.println(new Date(e.getTimestamp()));
                        System.out.println("IP:" + e.getData(2));
                        System.out.println("Max price:" + e.getData(4));
                        System.out.println("Min price:" + e.getData(5));
                        System.out.println("IP siddhiCount:" + e.getData(6));
                        System.out.println("Expired :" + e.isExpired());
                        System.out.println("----------------------------");
                    }
                    System.out.println("======================== END  ===============================");
                }
            }
        });
        executionPlanRuntime.start();


        new Thread(new Runnable() {
            @Override
            public void run() {
                Calendar c = Calendar.getInstance();
                c.add(Calendar.HOUR, 1);
                c.add(Calendar.SECOND, 1);
                InputHandler inputHandler = executionPlanRuntime.getInputHandler("LoginEvents");
                int i = 0;
                for (i = 0; i <= 1000; i++) {
                    c.add(Calendar.SECOND, 1);
                    try {
                        inputHandler.send(c.getTime().getTime(), new Object[]{c.getTime().getTime(), new String("192.10.1.1"), "1", new Random().nextInt(1000)});
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        sleep(15000);
        executionPlanRuntime.shutdown();
        System.out.println("Done");
    }

Version :

✘~/Downloads/tmp/siddhi/siddhi  ➦ e18aaef  git branch | sed -n '/* /s///p'
(HEAD detached at v3.0.1)
~/Downloads/tmp/siddhi/siddhi  ➦ e18aaef  git status
HEAD detached at v3.0.1
nothing to commit, working directory clean

Maven Fails on JavaDoc-Plugin

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-javadoc-plugin:2.8:jar (attach-javadocs) on project siddhi-core: MavenReportException: Error while creating archive:
[ERROR] Exit code: 1 - /siddhi/modules/siddhicore/src/main/java/org/wso2/siddhi/core/executor/function/FunctionExecutor.java:80: warning: no description for @return
[ERROR] * @return
[ERROR] ^

how can I do pattern matching between two stream from timeBatch window?

Here I describe the business logic I want to implement:
From a continuing stream of logs,if there are more than 9 login fail logs and 1 login success log in the stream in a time period, then generate a event called "bruteForceLoginSuccess".

cuz I want group the aggregation based on "srcAddress, srcUsername, destAddress, appProtocol",so I wrote the query like that :(ande the success qurey is similar)

@info(name = 'condition1')
from rawStream[ catBehavior == '/Authentication/Verify' and catOutcome == 'FAIL' ]#window.timeBatch(10 sec)
select srcAddress, catOutcome, deviceCat, srcUsername, destAddress, appProtocol, distinctcount( catObject ) as distinctMinCount, count() as groupCount
group by srcAddress, srcUsername, destAddress, appProtocol
having groupCount >= 9 and distinctMinCount >=1
insert into e1_OutputStream;

The reason why I use timeBatch window is that I want judge the group size is bigger than 9,so I use a filed called groupCount and it must greater than 9.if I use time window(slide),it will generate 9 events to the e1_OutputStream and every event's groupCount is ++ compared the preview's.(the first's groupCount is 1,the twice is 2 and the ninth event's groupCount is 9)
But in the pattern matching part,if i wrote a cond like groupCount >= 9,I lost 8 preview events(their groupCount is 1~8 ,so they don't matching the cond).
If I use timeBatch,it only generate an aggregate event per group into the e1_OutputStream,and it has the groupCount for the sum of the events in group.I can use this event as a representative events, when I doing the pattern matching.

And there comes the problem:I use timeBatch both in login fail condition and success condition to generate e1_OutputStream and e2_OutputStream,they both have the same time(though the events in them has different time),and the pattern matching must have a time sequence relation, so i can not trigger the pattern unless I sleep a time before I send the login success log.(the pattern matching is in the below)

 @info(name = 'result')
from every ( e1 = e1_OutputStream ) -> every ( e2 = e2_OutputStream[ srcAddress == e1.srcAddress
                                                                 and deviceCat == e1.deviceCat
                                                                 and srcUsername == e1.srcUsername
                                                                 and destAddress == e1.destAddress
                                                                 and appProtocol == e1.appProtocol ] )
select 'relationEvent' as event, e1.srcAddress, e1.deviceCat, e1.srcUsername, e1.destAddress, e1.appProtocol
insert into resultOutputStream;;

what should I do to complete my task?which is the most correct way to achive it?really appreciate to your response

NullPointerException in ValuePartitionExecutor, when null value in the partition key

Hi,
I am currently working with Siddhi v3.0.4.

Following is my test program and you will be able to find that I injected a null-included event (3th event). And the null value is for the symbol and also the symbol is the key for value partition in the query.

    // Creating Siddhi Manager
    SiddhiManager siddhiManager = new SiddhiManager();

    String executionPlan = "" +
        "define stream cseEventStream (symbol string, price float, volume long); " +
        "partition with (symbol of cseEventStream)" +
        " begin " +
        " @info(name = 'query11') " +
        " from cseEventStream[volume < 150] " +
        " select symbol,price " +
        " insert into outputStream ;" +
        " end; ";

    //Generating runtime
    ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

    executionPlanRuntime.addCallback("outputStream", new StreamCallback() {
        @Override
        public void receive(Event[] inEvents) {
            EventPrinter.print(0L, inEvents, null);
        }   
    }); 

    //Retrieving InputHandler to push events into Siddhi
    InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream");

    //Starting event processing
    executionPlanRuntime.start();

    //Sending events to Siddhi
    //try {
    inputHandler.send(new Object[]{"IBM", 700f, 100l});
    inputHandler.send(new Object[]{"WSO2", 60.5f, 200l});
    inputHandler.send(new Object[]{null, 50f, 100l});
    inputHandler.send(new Object[]{"GOOG", 50f, 30l});
    inputHandler.send(new Object[]{"IBM", 76.6f, 400l});
    inputHandler.send(new Object[]{"WSO2", 45.6f, 50l});
    Thread.sleep(500);

    //Shutting down the runtime
    executionPlanRuntime.shutdown();

    //Shutting down Siddhi
    siddhiManager.shutdown();

And the execution result is following:

 Jan 25, 2016 2:55:12 PM com.lmax.disruptor.FatalExceptionHandler handleEventException
 SEVERE: Exception processing: 5 IndexedEvent{streamIndex=0, event=Event{timestamp=1453701312805, data=[WSO2, 45.6, 50], isExpired=false}}
 java.lang.NullPointerException
    at org.wso2.siddhi.core.partition.executor.ValuePartitionExecutor.execute(ValuePartitionExecutor.java:32)
    at org.wso2.siddhi.core.partition.PartitionStreamReceiver.receive(PartitionStreamReceiver.java:189)
    at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:142)
    at org.wso2.siddhi.core.stream.StreamJunction.access$000(StreamJunction.java:44)
    at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:312)
    at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:39)
    at org.wso2.siddhi.core.stream.input.SingleThreadEntryValve.send(SingleThreadEntryValve.java:59)
    at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.sendEvents(SingleStreamEntryValve.java:166)
    at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.onEvent(SingleStreamEntryValve.java:149)
    at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.onEvent(SingleStreamEntryValve.java:119)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Exception in thread "Siddhi-3aab83cb-5578-418d-9c2a-1091ac3e5491-executor-thread-0" java.lang.RuntimeException: java.lang.NullPointerException
    at com.lmax.disruptor.FatalExceptionHandler.handleEventException(FatalExceptionHandler.java:45)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:147)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.wso2.siddhi.core.partition.executor.ValuePartitionExecutor.execute(ValuePartitionExecutor.java:32)
    at org.wso2.siddhi.core.partition.PartitionStreamReceiver.receive(PartitionStreamReceiver.java:189)
    at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:142)
    at org.wso2.siddhi.core.stream.StreamJunction.access$000(StreamJunction.java:44)
    at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:312)
    at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:39)
    at org.wso2.siddhi.core.stream.input.SingleThreadEntryValve.send(SingleThreadEntryValve.java:59)
    at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.sendEvents(SingleStreamEntryValve.java:166)
    at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.onEvent(SingleStreamEntryValve.java:149)
    at org.wso2.siddhi.core.stream.input.SingleStreamEntryValve$SingleEntryValveHandler.onEvent(SingleStreamEntryValve.java:119)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
    ... 3 more

... 3 more

I found that the PartitionStreamReceiver checks whether the currentKey from partitionExecutor.execute(nextEvent) is null or not. However the partitionExecutor.execute(nextEvent) throws NullPointerException. Following is the relevant part of source of PartitionStreamReceiver.java (line 189 ~ 191).

                String currentKey = partitionExecutor.execute(nextEvent);
                if (currentKey != null) {
                    if (key == null) {

So I tried to change the ValuePartitionExecutor.java, from

public String execute(ComplexEvent event) {
    return  expressionExecutor.execute(event).toString();
} 

to

public String execute(ComplexEvent event) {
    String result = null;
    try {
       result =  expressionExecutor.execute(event).toString();
    } catch(NullPointerException e) {
        result = null;
    }   
    return result;
} 

And then I got no errors, but the null-included event is not emitted also.

I don't know if the NullPointerException can be regarded as a bug or not.
However If null values in symbol in the above query are not allowed intentionally in Siddhi, I should check all incoming events to see if it contains null value.

Correlations

Hey,
I have scenario where I have n streams:

stream 1:
Person: name, security_id

stream 2:
Person: fullName, secId, email

stream 3:
Person: email

...
(may have more)

I have equality criteria such:

stream1.security_id = stream2.secId
||
stream2.email = stream3.email
...
may have more criteria

This can't be solved easily with Java as standard equality breaks here
i.e
stream1:e == stream2:e //true
stream2:e == streamc:e //true
but
stream1:e == streamc:e //false

I'm trying to find some approach via CEP that can help me to perform these correlation/aggregation by rules and was wondering if Siddhi can help,

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.