Comments (4)
Overview
Conditions performs a logical operation on input.name to determine whether to trigger the subscriber based on the result of the operation.
Design
Struct
TriggerEnvConfig is used to receive and process events passed by the OpenFunction events framework.
TriggerMgr is used to store informations during the event triggering process, providing the ability for condition determination.
type TriggerEnvConfig struct {
BusComponent string `json:"busComponent"`
Inputs []*Input `json:"busTopic,omitempty"`
Subscribers map[string]*Subscriber `json:"subscribers,omitempty"`
Port string `json:"port,omitempty"`
}
type Input struct {
Name string `json:"name"`
Namespace string `json:"namespace,omitempty"`
EventSource string `json:"eventSource"`
Event string `json:"event"`
}
type Subscriber struct {
SinkComponent string `json:"sinkComponent,omitempty"`
DLSinkComponent string `json:"deadLetterSinkComponent,omitempty"`
Topic string `json:"topic,omitempty"`
DLTopic string `json:"deadLetterTopic,omitempty"`
}
type TriggerMgr struct {
// key: topic, value: *InputStatus
TopicInputMap *sync.Map
TopicEventMap map[string]chan *common.TopicEvent
CelEnv *cel.Env
// key: condition, value: *Subscriber
ConditionSubscriberMap *sync.Map
}
type InputStatus struct {
Name string
LastMsgTime int64
LastEvent *common.TopicEvent
Status bool
}
Logic
- topic is associated with input on a one-to-one basis
- Create a goroutine and a channel for each input (i.e. topic)
- Incoming events are sent to the channel corresponding to the topic
- When the channel receives an event, it will:
- Reset timer ticker (Default 60s Timeout)
- Update the status of the input to true
- Check if any condition matches at this point
- If there is a matched condition, then get the subscriber configuration corresponding to the condition
- Sending event to the final function by the subscriber configuration
- Reset input status to false when timer ticker is end up
Sample
Suppose we have defined a Trigger according to the following configuration:
apiVersion: events.openfunction.io/v1alpha1
kind: Trigger
metadata:
name: trigger-a
spec:
eventBus: "default"
inputs:
- name: "A"
eventSourceName: "my-es-a"
eventName: "event-a"
- name: "B"
eventSourceName: "my-es-b"
eventName: "event-b"
subscribers:
- condition: A || B
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: function-sample-serving-ksvc
namespace: default
- condition: A && B
topic: "metrics"
According to the topic naming rules in the event bus ({namespace}-{eventSourceName}-{eventName}), the topic names used for the two inputs are as follows:
Input name: A -> Topic name: default-my-es-a-event-a
Input name: B -> Topic name: default-my-es-b-event-b
Initilization
triggerManager.TopicInputMap:
&sync.map{
"default-my-es-a-event-a": &InputStatus{
LastMsgTime: 0,
LastEvent: nil,
Status: false,
Name: "A",
},
"default-my-es-b-event-b": &InputStatus{
LastMsgTime: 0,
LastEvent: nil,
Status: false,
Name: "B",
}
}
triggerManager.ConditionSubscriberMap:
&sync.map{
"A || B": &Subscriber{
SinkComponent: "http-sink",
},
"A && B": &Subscriber{
Topic: "metrics",
}
}
When events incoming
Input A receives an event
triggerManager.TopicInputMap:
&sync.map{
"default-my-es-a-event-a": &InputStatus{
LastMsgTime: time.Now().Unix(),
LastEvent: event1,
Status: true,
Name: "A",
},
"default-my-es-b-event-b": &InputStatus{
LastMsgTime: 0,
LastEvent: nil,
Status: false,
Name: "B",
}
}
The condition "A || B" will be matched by cel and the event event1 will be sent to "http-sink".
Input B receives an event in 60s
triggerManager.TopicInputMap:
&sync.map{
"default-my-es-a-event-a": &InputStatus{
LastMsgTime: <Time when event1 is received>,
LastEvent: event1,
Status: true,
Name: "A",
},
"default-my-es-b-event-b": &InputStatus{
LastMsgTime: time.Now().Unix(),
LastEvent: event2,
Status: true,
Name: "B",
}
}
After cel has determined the condition, the "A || B" condition and "A && B" will both be matched and the events event1 and event2 will be sent to "http-sink" and "metrics"
Input B receives an event after 60s but Input A does not
triggerManager.TopicInputMap:
&sync.map{
"default-my-es-a-event-a": &InputStatus{
LastMsgTime: 0,
LastEvent: nil,
Status: false,
Name: "A",
},
"default-my-es-b-event-b": &InputStatus{
LastMsgTime: time.Now().Unix(),
LastEvent: event2,
Status: true,
Name: "B",
}
}
The condition "A || B" will be matched by cel and the event2 will be sent to "http-sink"
Performance
Add the domain name of the nats server to the ip address mapping in /etc/hosts
on the node:
# nats
<svc address> nats.default nats-0.nats.default.svc.cluster.local
Import the configuration:
export CONFIG="eyJidXNDb21wb25lbnQiOiJ0cmlnZ2VyIiwiYnVzVG9waWMiOlt7Im5hbWUiOiJBIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImV2ZW50U291cmNlIjoibXktZXZlbnRzb3VyY2UiLCJldmVudCI6InNhbXBsZS1vbmUifSx7Im5hbWUiOiJCIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImV2ZW50U291cmNlIjoibXktZXZlbnRzb3VyY2UiLCJldmVudCI6InNhbXBsZS10d28ifV0sInN1YnNjcmliZXJzIjp7IkEgXHUwMDI2XHUwMDI2IEIiOnsidG9waWMiOiJtZXRyaWNzIn0sIkEgfHwgQiI6eyJzaW5rQ29tcG9uZW50IjoiaHR0cC1zaW5rIn19LCJwb3J0IjoiNTA1MCJ9"
Clone OpenFunction/events-handlers to local and go to trigger/handler
.
Start the program using the dapr command line, specifying the --profile-port
and --enable-profiling
:
dapr run --app-id trigger-handler --enable-profiling --profile-port 7777 --app-protocol grpc --app-port 5050 --components-path ../example/deploy/ go run ./main.go
Now that the connection has been established, we can use pprof
to profile the Dapr runtime.
The following example will create a cpu.pprof
file containing samples from a profile session that lasts 120 seconds:
curl "http://localhost:7777/debug/pprof/profile?seconds=120" > cpu.pprof
Use the following command to display the profile (You need to install graphviz first) :
go tool pprof -http=":8081" cpu.pprof
You can refer to Profiling & Debugging to learn more.
from openfunction.
/assign @tpiperatgod
from openfunction.
Added the events-handlers repo
from openfunction.
pprof result of heap alloc-objects.pdf
from openfunction.
Related Issues (20)
- cloud-native serverless workflow in openfunction HOT 1
- OpenFunction 1.20 install fail HOT 2
- OpenFunction 1.2.0 imageCredentials pull private images fail HOT 2
- images pull failed HOT 2
- When the function is published, there will be a brief service unavailability HOT 1
- The version of the function is wrong when publishing with the specified function tag. HOT 7
- Adjust the conditions of gateway default listener HOT 2
- 关于 openfunction 异步任务的一些问题 HOT 2
- Failed to install using Helm on EKS HOT 5
- Is OpenFunction compatible with ARM architectures? Like Mac M1s? HOT 4
- How to install openfunction v1.0.0 ? HOT 2
- Build multiarch images
- In dapr standalone mode, keda scaling doesn't work
- Allow Dapr to use http protocol for functions HOT 1
- Consider the use of Dapr Shared for Dapr in standalone mode HOT 2
- Gateway Domain HOT 2
- Bulk Event Processing
- Some yaml files and md documents still remain core in v1beta1 format
- When will the new version be issued? HOT 5
- Report the use of components with vulnerabilities in OpenFunction
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from openfunction.