Coder Social home page Coder Social logo

Comments (4)

tpiperatgod avatar tpiperatgod commented on May 29, 2024 1

Overview

Conditions performs a logical operation on input.name to determine whether to trigger the subscriber based on the result of the operation.

Design

Struct

TriggerEnvConfig is used to receive and process events passed by the OpenFunction events framework.

TriggerMgr is used to store informations during the event triggering process, providing the ability for condition determination.

type TriggerEnvConfig struct {
	BusComponent string                 `json:"busComponent"`
	Inputs       []*Input               `json:"busTopic,omitempty"`
	Subscribers  map[string]*Subscriber `json:"subscribers,omitempty"`
	Port         string                 `json:"port,omitempty"`
}

type Input struct {
	Name        string `json:"name"`
	Namespace   string `json:"namespace,omitempty"`
	EventSource string `json:"eventSource"`
	Event       string `json:"event"`
}

type Subscriber struct {
	SinkComponent   string `json:"sinkComponent,omitempty"`
	DLSinkComponent string `json:"deadLetterSinkComponent,omitempty"`
	Topic           string `json:"topic,omitempty"`
	DLTopic         string `json:"deadLetterTopic,omitempty"`
}

type TriggerMgr struct {
  // key: topic, value: *InputStatus
	TopicInputMap          *sync.Map
	TopicEventMap          map[string]chan *common.TopicEvent
	CelEnv                 *cel.Env
  // key: condition, value: *Subscriber
	ConditionSubscriberMap *sync.Map
}

type InputStatus struct {
	Name        string
	LastMsgTime int64
	LastEvent   *common.TopicEvent
	Status      bool
}

Logic

  1. topic is associated with input on a one-to-one basis
  2. Create a goroutine and a channel for each input (i.e. topic)
  3. Incoming events are sent to the channel corresponding to the topic
  4. When the channel receives an event, it will:
    1. Reset timer ticker (Default 60s Timeout)
    2. Update the status of the input to true
    3. Check if any condition matches at this point
      1. If there is a matched condition, then get the subscriber configuration corresponding to the condition
      2. Sending event to the final function by the subscriber configuration
  5. Reset input status to false when timer ticker is end up

Sample

Suppose we have defined a Trigger according to the following configuration:

apiVersion: events.openfunction.io/v1alpha1
kind: Trigger
metadata:
  name: trigger-a
spec:
  eventBus: "default"
  inputs:
    - name: "A"
      eventSourceName: "my-es-a"
      eventName: "event-a"
    - name: "B"
      eventSourceName: "my-es-b"
      eventName: "event-b"
  subscribers:
  - condition: A || B
    sink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: function-sample-serving-ksvc
        namespace: default
  - condition: A && B
    topic: "metrics"

According to the topic naming rules in the event bus ({namespace}-{eventSourceName}-{eventName}), the topic names used for the two inputs are as follows:

Input name: A -> Topic name: default-my-es-a-event-a

Input name: B -> Topic name: default-my-es-b-event-b

Initilization

triggerManager.TopicInputMap:

&sync.map{
    "default-my-es-a-event-a": &InputStatus{
        LastMsgTime: 0, 
        LastEvent: nil, 
        Status: false, 
        Name: "A",
    },
    "default-my-es-b-event-b": &InputStatus{
        LastMsgTime: 0, 
        LastEvent: nil, 
        Status: false, 
        Name: "B",
    }
}

triggerManager.ConditionSubscriberMap:

&sync.map{
    "A || B": &Subscriber{
        SinkComponent: "http-sink", 
    },
    "A && B": &Subscriber{
        Topic: "metrics",
    }
}

When events incoming

Input A receives an event

triggerManager.TopicInputMap:

&sync.map{
    "default-my-es-a-event-a": &InputStatus{
        LastMsgTime: time.Now().Unix(), 
        LastEvent: event1, 
        Status: true, 
        Name: "A",
    },
    "default-my-es-b-event-b": &InputStatus{
        LastMsgTime: 0, 
        LastEvent: nil, 
        Status: false, 
        Name: "B",
    }
}

The condition "A || B" will be matched by cel and the event event1 will be sent to "http-sink".

Input B receives an event in 60s

triggerManager.TopicInputMap:

&sync.map{
    "default-my-es-a-event-a": &InputStatus{
        LastMsgTime: <Time when event1 is received>, 
        LastEvent: event1, 
        Status: true, 
        Name: "A",
    },
    "default-my-es-b-event-b": &InputStatus{
        LastMsgTime: time.Now().Unix(), 
        LastEvent: event2, 
        Status: true, 
        Name: "B",
    }
}

After cel has determined the condition, the "A || B" condition and "A && B" will both be matched and the events event1 and event2 will be sent to "http-sink" and "metrics"

Input B receives an event after 60s but Input A does not

triggerManager.TopicInputMap:

&sync.map{
    "default-my-es-a-event-a": &InputStatus{
        LastMsgTime: 0, 
        LastEvent: nil, 
        Status: false, 
        Name: "A",
    },
    "default-my-es-b-event-b": &InputStatus{
        LastMsgTime: time.Now().Unix(), 
        LastEvent: event2, 
        Status: true, 
        Name: "B",
    }
}

The condition "A || B" will be matched by cel and the event2 will be sent to "http-sink"

Performance

Add the domain name of the nats server to the ip address mapping in /etc/hosts on the node:

# nats
<svc address> nats.default nats-0.nats.default.svc.cluster.local

Import the configuration:

export CONFIG="eyJidXNDb21wb25lbnQiOiJ0cmlnZ2VyIiwiYnVzVG9waWMiOlt7Im5hbWUiOiJBIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImV2ZW50U291cmNlIjoibXktZXZlbnRzb3VyY2UiLCJldmVudCI6InNhbXBsZS1vbmUifSx7Im5hbWUiOiJCIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImV2ZW50U291cmNlIjoibXktZXZlbnRzb3VyY2UiLCJldmVudCI6InNhbXBsZS10d28ifV0sInN1YnNjcmliZXJzIjp7IkEgXHUwMDI2XHUwMDI2IEIiOnsidG9waWMiOiJtZXRyaWNzIn0sIkEgfHwgQiI6eyJzaW5rQ29tcG9uZW50IjoiaHR0cC1zaW5rIn19LCJwb3J0IjoiNTA1MCJ9"

Clone OpenFunction/events-handlers to local and go to trigger/handler.

Start the program using the dapr command line, specifying the --profile-port and --enable-profiling:

dapr run --app-id trigger-handler --enable-profiling --profile-port 7777 --app-protocol grpc --app-port 5050 --components-path ../example/deploy/ go run ./main.go

Now that the connection has been established, we can use pprof to profile the Dapr runtime.

The following example will create a cpu.pprof file containing samples from a profile session that lasts 120 seconds:

curl "http://localhost:7777/debug/pprof/profile?seconds=120" > cpu.pprof

Use the following command to display the profile (You need to install graphviz first) :

go tool pprof -http=":8081" cpu.pprof

You can refer to Profiling & Debugging to learn more.

from openfunction.

tpiperatgod avatar tpiperatgod commented on May 29, 2024

/assign @tpiperatgod

from openfunction.

tpiperatgod avatar tpiperatgod commented on May 29, 2024

Added the events-handlers repo

from openfunction.

tpiperatgod avatar tpiperatgod commented on May 29, 2024

pprof result of heap alloc-objects.pdf

from openfunction.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.