Browse Source

Cleaning up a bit and making debugging pubsub a bit easier

main
Joop Kiefte 5 months ago committed by GitHub
parent
commit
99ba9783f8
  1. 4
      db.go
  2. 34
      sync.go

4
db.go

@ -51,12 +51,12 @@ func StartOLNListener() { @@ -51,12 +51,12 @@ func StartOLNListener() {
for {
msg, err := sub.Next()
if err != nil {
fmt.Println(err)
fmt.Println("Error reading from PubSub:", err)
return
}
msgs, err := message.MessagesFromIPFS(string(msg.Data))
if err != nil {
fmt.Println(err)
fmt.Println("Error reading from IPFS:", err)
return
}
LocalMessages.AddMany(msgs)

34
sync.go

@ -66,6 +66,8 @@ func ReadMessagesFromNetwork() { @@ -66,6 +66,8 @@ func ReadMessagesFromNetwork() {
// WriteMessagesToNetwork writes the messages in LocalMessages to the IPFS network
func WriteMessagesToNetwork() {
// Create an IPFS shell to later publish the messages to via PubSub
myIPFS := shell.NewShell(message.IPFSGateway)
// Add the messages to the IPFS network
cid, err := LocalMessages.AddToIPFS()
if err != nil {
@ -73,22 +75,43 @@ func WriteMessagesToNetwork() { @@ -73,22 +75,43 @@ func WriteMessagesToNetwork() {
} else {
fmt.Println("Messages synced to IPFS: ", cid)
}
// Publish the CID of LocalMessages to the OLN tag
err = myIPFS.PubSubPublish("OLN", cid)
if err != nil {
fmt.Println(err)
} else {
fmt.Println("Published CID", cid, "to the main network")
}
// Define what is considered a tag by regex
tagDefinition := []string{
// Hashtags
"#[a-zA-Z0-9]+",
// Mentions
"@[a-zA-Z0-9]+",
// Links
"https?://[a-zA-Z0-9./]+",
}
// Split each message on spaces and map the resulting strings
// to the message stamp
messages := make(map[string][]*message.Message)
LocalMessages.Each(func(m *message.Message) {
tags := strings.Split(m.Message, " ")
itertags:
for _, tag := range tags {
if messages[tag] == nil {
messages[tag] = []*message.Message{}
// Check if the tag matches any of the tag definitions
// if not, continue to the next tag
for _, regex := range tagDefinition {
if !strings.Contains(tag, regex) {
continue itertags
}
}
// If the tag matches any of the tag definitions,
// add the message to the messages map
messages[tag] = append(messages[tag], m)
}
})
// Create a new set of messages per tag and send them over PubSub
// Create an IPFS shell to later publish the messages to via PubSub
// Per message, invoke Add on the tags map entry
ipfs := shell.NewShell(message.IPFSGateway)
tags := make(map[string]*message.Messages)
for tag, messages := range messages {
if tags[tag] == nil {
@ -105,11 +128,12 @@ func WriteMessagesToNetwork() { @@ -105,11 +128,12 @@ func WriteMessagesToNetwork() {
fmt.Println("Messages synced to IPFS: ", cid)
}
// Publish the messages via PubSub on the IPFS shell
err = ipfs.PubSubPublish(tag, cid)
err = myIPFS.PubSubPublish("oln-"+tag, cid)
if err != nil {
fmt.Println(err)
} else {
fmt.Println("Messages published to the rest of the network for tag: ", tag)
}
}
}

Loading…
Cancel
Save