diff options
-rw-r--r-- | main.go | 94 | ||||
-rw-r--r-- | message/message.go | 71 |
2 files changed, 149 insertions, 16 deletions
@@ -11,7 +11,7 @@ import ( "time" "git.kiefte.eu/lapingvino/infodump/message" - ipfs "github.com/ipfs/go-ipfs-api" + shell "github.com/ipfs/go-ipfs-api" _ "modernc.org/sqlite" ) @@ -78,6 +78,7 @@ func main() { for { // Present the user with a menu Menu([]MenuElements{ + {"Start OLN Listener", StartOLNListener}, {"Read Messages", ReadMessages}, {"Write Message", WriteMessage}, {"Sync Messages", SyncMessages}, @@ -93,6 +94,56 @@ func main() { } } +// StartOLNListener starts a PubSub listener that listens for messages from the network +// and adds them to LocalMessages +// For this it uses the IPFS gateway and listens on the topic "OLN", as well as +// the list of followed tags from the database +func StartOLNListener() { + // Get the IPFS gateway + gateway := message.IPFSGateway + // Get the database + db := GetDatabase() + // Get the list of followed tags + followedTags := GetFollowedTags(db) + // Create a new IPFS client + myIPFS := shell.NewShell(gateway) + var subs []*shell.PubSubSubscription + olnsub, err := myIPFS.PubSubSubscribe("OLN") + if err != nil { + fmt.Println(err) + return + } + subs = append(subs, olnsub) + for _, tag := range followedTags { + tagssub, err := myIPFS.PubSubSubscribe("oln-tag-" + tag) + if err != nil { + fmt.Println(err) + } else { + subs = append(subs, tagssub) + } + } + // Start a goroutine for each of the subscriptions in subs, + // read the CID from the Next method, look up the CID on IPFS, + // read this in via message.MessagesFromIPFS and add the message to LocalMessages + for _, sub := range subs { + go func(sub *shell.PubSubSubscription) { + for { + msg, err := sub.Next() + if err != nil { + fmt.Println(err) + return + } + msgs, err := message.MessagesFromIPFS(string(msg.Data)) + if err != nil { + fmt.Println(err) + return + } + LocalMessages.AddMany(msgs) + } + }(sub) + } +} + // GetDatabase checks if DB is already set and opened, if not it Sets the database first func GetDatabase() *sql.DB { if DB == nil { @@ -102,7 +153,7 @@ func GetDatabase() *sql.DB { return DB } -func GetMessagesFromDatabase(db *sql.DB) message.Messages { +func GetMessagesFromDatabase(db *sql.DB) *message.Messages { // Get all messages from the database rows, err := db.Query("SELECT hash, message, nonce, timestamp FROM messages") if err != nil { @@ -132,17 +183,17 @@ func GetMessagesFromDatabase(db *sql.DB) message.Messages { } // Add the message to the Messages object fmt.Println("Adding message to Messages object...") - msgs[hash] = &m + msgs.Add(&m) } - return msgs + return &msgs } func ReadMessages() { // Show all messages in LocalMessages fmt.Println("Local messages:") - for _, m := range LocalMessages { - fmt.Println(m) - } + LocalMessages.Each(func(m *message.Message) { + fmt.Println(m.String()) + }) } func WriteMessage() { @@ -163,14 +214,14 @@ func SyncMessages() { // Update the database with the messages in LocalMessages db := GetDatabase() // Put the messages in the database - for k, m := range LocalMessages { + LocalMessages.Each(func(m *message.Message) { _, err := db.Exec("INSERT INTO messages(hash, message, nonce, timestamp) VALUES(?,?,?,?)", m.Stamp(), m.Message, m.Nonce, m.Timestamp) if err != nil { fmt.Println(err) } else { - fmt.Println("Message", k, "synced") + fmt.Println("Message", m.Stamp(), "synced") } - } + }) fmt.Println("Now loading messages from database...") // Get the messages from the database @@ -186,13 +237,14 @@ func SyncMessages() { } // Set LocalMessages to the messages that are in the database - LocalMessages = messages + LocalMessages = message.Messages{} + LocalMessages.AddMany(messages) } func TestIPFSGateway(gateway string) error { // Test the IPFS gateway fmt.Println("Testing IPFS gateway...") - ipfs := ipfs.NewShell(gateway) + ipfs := shell.NewShell(gateway) _, err := ipfs.ID() return err } @@ -285,3 +337,21 @@ func ConfigureFollowedTags() { } } } + +func GetFollowedTags(db *sql.DB) []string { + // Get the tags from the database + rows, err := db.Query("SELECT tag FROM followed_tags") + if err != nil { + fmt.Println(err) + } + var tags []string + for rows.Next() { + var tag string + err = rows.Scan(&tag) + if err != nil { + fmt.Println(err) + } + tags = append(tags, tag) + } + return tags +} diff --git a/message/message.go b/message/message.go index aa2a3e5..6d8ddcd 100644 --- a/message/message.go +++ b/message/message.go @@ -5,7 +5,9 @@ import ( "crypto/sha256" "encoding/json" "fmt" + "io" "math/bits" + "sync" "time" ipfs "github.com/ipfs/go-ipfs-api" @@ -90,21 +92,82 @@ func New(msg string, n int, timestamp int64) *Message { } // Map Messages maps the stamp of the message to the message itself -type Messages map[string]*Message +type Messages struct { + lock sync.RWMutex + msgs map[string]*Message +} + +// MessagesFromIPFS takes a CID and returns a Messages map +func MessagesFromIPFS(cid string) (*Messages, error) { + emptyMsgs := Messages{msgs: make(map[string]*Message)} + // Create an IPFS instance based on the IPFSGateway + myIPFS := ipfs.NewShell(IPFSGateway) + // Get the JSON from IPFS + jsonr, err := myIPFS.Cat(cid) + if err != nil { + return &emptyMsgs, err + } + jsonb, err := io.ReadAll(jsonr) + if err != nil { + return &emptyMsgs, err + } + // Unmarshal the JSON into a Messages map + var messages Messages + err = json.Unmarshal(jsonb, &(messages.msgs)) + if err != nil { + return &emptyMsgs, err + } + return &messages, nil +} // Add a message to the Messages map func (m *Messages) Add(msg *Message) { - (*m)[msg.Stamp()] = msg + m.lock.Lock() + defer m.lock.Unlock() + if m.msgs == nil { + m.msgs = make(map[string]*Message) + } + m.msgs[msg.Stamp()] = msg +} + +// AddMany adds another Messages map to the current Messages map +func (m *Messages) AddMany(msgs *Messages) { + m.lock.Lock() + defer m.lock.Unlock() + msgs.lock.RLock() + defer msgs.lock.RUnlock() + if m.msgs == nil { + m.msgs = make(map[string]*Message) + } + for _, msg := range msgs.msgs { + m.msgs[msg.Stamp()] = msg + } } // Remove a message from the Messages map by stamp func (m *Messages) Remove(stamp string) { - delete(*m, stamp) + m.lock.Lock() + defer m.lock.Unlock() + delete(m.msgs, stamp) } // Return a JSON representation of the Messages map func (m *Messages) JSON() ([]byte, error) { - return json.Marshal(m) + m.lock.RLock() + defer m.lock.RUnlock() + return json.Marshal(m.msgs) +} + +// Do something with each message in the Messages map +func (m *Messages) Each(f func(msg *Message)) { + m.lock.RLock() + defer m.lock.RUnlock() + if m.msgs == nil { + return + } + for _, msg := range m.msgs { + f(msg) + } } // Add the messages as a JSON object to IPFS |