aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main.go94
-rw-r--r--message/message.go71
2 files changed, 149 insertions, 16 deletions
diff --git a/main.go b/main.go
index 338c6f8..eda7c15 100644
--- a/main.go
+++ b/main.go
@@ -11,7 +11,7 @@ import (
"time"
"git.kiefte.eu/lapingvino/infodump/message"
- ipfs "github.com/ipfs/go-ipfs-api"
+ shell "github.com/ipfs/go-ipfs-api"
_ "modernc.org/sqlite"
)
@@ -78,6 +78,7 @@ func main() {
for {
// Present the user with a menu
Menu([]MenuElements{
+ {"Start OLN Listener", StartOLNListener},
{"Read Messages", ReadMessages},
{"Write Message", WriteMessage},
{"Sync Messages", SyncMessages},
@@ -93,6 +94,56 @@ func main() {
}
}
+// StartOLNListener starts a PubSub listener that listens for messages from the network
+// and adds them to LocalMessages
+// For this it uses the IPFS gateway and listens on the topic "OLN", as well as
+// the list of followed tags from the database
+func StartOLNListener() {
+ // Get the IPFS gateway
+ gateway := message.IPFSGateway
+ // Get the database
+ db := GetDatabase()
+ // Get the list of followed tags
+ followedTags := GetFollowedTags(db)
+ // Create a new IPFS client
+ myIPFS := shell.NewShell(gateway)
+ var subs []*shell.PubSubSubscription
+ olnsub, err := myIPFS.PubSubSubscribe("OLN")
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ subs = append(subs, olnsub)
+ for _, tag := range followedTags {
+ tagssub, err := myIPFS.PubSubSubscribe("oln-tag-" + tag)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ subs = append(subs, tagssub)
+ }
+ }
+ // Start a goroutine for each of the subscriptions in subs,
+ // read the CID from the Next method, look up the CID on IPFS,
+ // read this in via message.MessagesFromIPFS and add the message to LocalMessages
+ for _, sub := range subs {
+ go func(sub *shell.PubSubSubscription) {
+ for {
+ msg, err := sub.Next()
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ msgs, err := message.MessagesFromIPFS(string(msg.Data))
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ LocalMessages.AddMany(msgs)
+ }
+ }(sub)
+ }
+}
+
// GetDatabase checks if DB is already set and opened, if not it Sets the database first
func GetDatabase() *sql.DB {
if DB == nil {
@@ -102,7 +153,7 @@ func GetDatabase() *sql.DB {
return DB
}
-func GetMessagesFromDatabase(db *sql.DB) message.Messages {
+func GetMessagesFromDatabase(db *sql.DB) *message.Messages {
// Get all messages from the database
rows, err := db.Query("SELECT hash, message, nonce, timestamp FROM messages")
if err != nil {
@@ -132,17 +183,17 @@ func GetMessagesFromDatabase(db *sql.DB) message.Messages {
}
// Add the message to the Messages object
fmt.Println("Adding message to Messages object...")
- msgs[hash] = &m
+ msgs.Add(&m)
}
- return msgs
+ return &msgs
}
func ReadMessages() {
// Show all messages in LocalMessages
fmt.Println("Local messages:")
- for _, m := range LocalMessages {
- fmt.Println(m)
- }
+ LocalMessages.Each(func(m *message.Message) {
+ fmt.Println(m.String())
+ })
}
func WriteMessage() {
@@ -163,14 +214,14 @@ func SyncMessages() {
// Update the database with the messages in LocalMessages
db := GetDatabase()
// Put the messages in the database
- for k, m := range LocalMessages {
+ LocalMessages.Each(func(m *message.Message) {
_, err := db.Exec("INSERT INTO messages(hash, message, nonce, timestamp) VALUES(?,?,?,?)", m.Stamp(), m.Message, m.Nonce, m.Timestamp)
if err != nil {
fmt.Println(err)
} else {
- fmt.Println("Message", k, "synced")
+ fmt.Println("Message", m.Stamp(), "synced")
}
- }
+ })
fmt.Println("Now loading messages from database...")
// Get the messages from the database
@@ -186,13 +237,14 @@ func SyncMessages() {
}
// Set LocalMessages to the messages that are in the database
- LocalMessages = messages
+ LocalMessages = message.Messages{}
+ LocalMessages.AddMany(messages)
}
func TestIPFSGateway(gateway string) error {
// Test the IPFS gateway
fmt.Println("Testing IPFS gateway...")
- ipfs := ipfs.NewShell(gateway)
+ ipfs := shell.NewShell(gateway)
_, err := ipfs.ID()
return err
}
@@ -285,3 +337,21 @@ func ConfigureFollowedTags() {
}
}
}
+
+func GetFollowedTags(db *sql.DB) []string {
+ // Get the tags from the database
+ rows, err := db.Query("SELECT tag FROM followed_tags")
+ if err != nil {
+ fmt.Println(err)
+ }
+ var tags []string
+ for rows.Next() {
+ var tag string
+ err = rows.Scan(&tag)
+ if err != nil {
+ fmt.Println(err)
+ }
+ tags = append(tags, tag)
+ }
+ return tags
+}
diff --git a/message/message.go b/message/message.go
index aa2a3e5..6d8ddcd 100644
--- a/message/message.go
+++ b/message/message.go
@@ -5,7 +5,9 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"
+ "io"
"math/bits"
+ "sync"
"time"
ipfs "github.com/ipfs/go-ipfs-api"
@@ -90,21 +92,82 @@ func New(msg string, n int, timestamp int64) *Message {
}
// Map Messages maps the stamp of the message to the message itself
-type Messages map[string]*Message
+type Messages struct {
+ lock sync.RWMutex
+ msgs map[string]*Message
+}
+
+// MessagesFromIPFS takes a CID and returns a Messages map
+func MessagesFromIPFS(cid string) (*Messages, error) {
+ emptyMsgs := Messages{msgs: make(map[string]*Message)}
+ // Create an IPFS instance based on the IPFSGateway
+ myIPFS := ipfs.NewShell(IPFSGateway)
+ // Get the JSON from IPFS
+ jsonr, err := myIPFS.Cat(cid)
+ if err != nil {
+ return &emptyMsgs, err
+ }
+ jsonb, err := io.ReadAll(jsonr)
+ if err != nil {
+ return &emptyMsgs, err
+ }
+ // Unmarshal the JSON into a Messages map
+ var messages Messages
+ err = json.Unmarshal(jsonb, &(messages.msgs))
+ if err != nil {
+ return &emptyMsgs, err
+ }
+ return &messages, nil
+}
// Add a message to the Messages map
func (m *Messages) Add(msg *Message) {
- (*m)[msg.Stamp()] = msg
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ if m.msgs == nil {
+ m.msgs = make(map[string]*Message)
+ }
+ m.msgs[msg.Stamp()] = msg
+}
+
+// AddMany adds another Messages map to the current Messages map
+func (m *Messages) AddMany(msgs *Messages) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ msgs.lock.RLock()
+ defer msgs.lock.RUnlock()
+ if m.msgs == nil {
+ m.msgs = make(map[string]*Message)
+ }
+ for _, msg := range msgs.msgs {
+ m.msgs[msg.Stamp()] = msg
+ }
}
// Remove a message from the Messages map by stamp
func (m *Messages) Remove(stamp string) {
- delete(*m, stamp)
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ delete(m.msgs, stamp)
}
// Return a JSON representation of the Messages map
func (m *Messages) JSON() ([]byte, error) {
- return json.Marshal(m)
+ m.lock.RLock()
+ defer m.lock.RUnlock()
+ return json.Marshal(m.msgs)
+}
+
+// Do something with each message in the Messages map
+func (m *Messages) Each(f func(msg *Message)) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
+ if m.msgs == nil {
+ return
+ }
+ for _, msg := range m.msgs {
+ f(msg)
+ }
}
// Add the messages as a JSON object to IPFS