diff options
Diffstat (limited to 'main.go')
-rw-r--r-- | main.go | 94 |
1 files changed, 82 insertions, 12 deletions
@@ -11,7 +11,7 @@ import ( "time" "git.kiefte.eu/lapingvino/infodump/message" - ipfs "github.com/ipfs/go-ipfs-api" + shell "github.com/ipfs/go-ipfs-api" _ "modernc.org/sqlite" ) @@ -78,6 +78,7 @@ func main() { for { // Present the user with a menu Menu([]MenuElements{ + {"Start OLN Listener", StartOLNListener}, {"Read Messages", ReadMessages}, {"Write Message", WriteMessage}, {"Sync Messages", SyncMessages}, @@ -93,6 +94,56 @@ func main() { } } +// StartOLNListener starts a PubSub listener that listens for messages from the network +// and adds them to LocalMessages +// For this it uses the IPFS gateway and listens on the topic "OLN", as well as +// the list of followed tags from the database +func StartOLNListener() { + // Get the IPFS gateway + gateway := message.IPFSGateway + // Get the database + db := GetDatabase() + // Get the list of followed tags + followedTags := GetFollowedTags(db) + // Create a new IPFS client + myIPFS := shell.NewShell(gateway) + var subs []*shell.PubSubSubscription + olnsub, err := myIPFS.PubSubSubscribe("OLN") + if err != nil { + fmt.Println(err) + return + } + subs = append(subs, olnsub) + for _, tag := range followedTags { + tagssub, err := myIPFS.PubSubSubscribe("oln-tag-" + tag) + if err != nil { + fmt.Println(err) + } else { + subs = append(subs, tagssub) + } + } + // Start a goroutine for each of the subscriptions in subs, + // read the CID from the Next method, look up the CID on IPFS, + // read this in via message.MessagesFromIPFS and add the message to LocalMessages + for _, sub := range subs { + go func(sub *shell.PubSubSubscription) { + for { + msg, err := sub.Next() + if err != nil { + fmt.Println(err) + return + } + msgs, err := message.MessagesFromIPFS(string(msg.Data)) + if err != nil { + fmt.Println(err) + return + } + LocalMessages.AddMany(msgs) + } + }(sub) + } +} + // GetDatabase checks if DB is already set and opened, if not it Sets the database first func GetDatabase() *sql.DB { if DB == nil { @@ -102,7 +153,7 @@ func GetDatabase() *sql.DB { return DB } -func GetMessagesFromDatabase(db *sql.DB) message.Messages { +func GetMessagesFromDatabase(db *sql.DB) *message.Messages { // Get all messages from the database rows, err := db.Query("SELECT hash, message, nonce, timestamp FROM messages") if err != nil { @@ -132,17 +183,17 @@ func GetMessagesFromDatabase(db *sql.DB) message.Messages { } // Add the message to the Messages object fmt.Println("Adding message to Messages object...") - msgs[hash] = &m + msgs.Add(&m) } - return msgs + return &msgs } func ReadMessages() { // Show all messages in LocalMessages fmt.Println("Local messages:") - for _, m := range LocalMessages { - fmt.Println(m) - } + LocalMessages.Each(func(m *message.Message) { + fmt.Println(m.String()) + }) } func WriteMessage() { @@ -163,14 +214,14 @@ func SyncMessages() { // Update the database with the messages in LocalMessages db := GetDatabase() // Put the messages in the database - for k, m := range LocalMessages { + LocalMessages.Each(func(m *message.Message) { _, err := db.Exec("INSERT INTO messages(hash, message, nonce, timestamp) VALUES(?,?,?,?)", m.Stamp(), m.Message, m.Nonce, m.Timestamp) if err != nil { fmt.Println(err) } else { - fmt.Println("Message", k, "synced") + fmt.Println("Message", m.Stamp(), "synced") } - } + }) fmt.Println("Now loading messages from database...") // Get the messages from the database @@ -186,13 +237,14 @@ func SyncMessages() { } // Set LocalMessages to the messages that are in the database - LocalMessages = messages + LocalMessages = message.Messages{} + LocalMessages.AddMany(messages) } func TestIPFSGateway(gateway string) error { // Test the IPFS gateway fmt.Println("Testing IPFS gateway...") - ipfs := ipfs.NewShell(gateway) + ipfs := shell.NewShell(gateway) _, err := ipfs.ID() return err } @@ -285,3 +337,21 @@ func ConfigureFollowedTags() { } } } + +func GetFollowedTags(db *sql.DB) []string { + // Get the tags from the database + rows, err := db.Query("SELECT tag FROM followed_tags") + if err != nil { + fmt.Println(err) + } + var tags []string + for rows.Next() { + var tag string + err = rows.Scan(&tag) + if err != nil { + fmt.Println(err) + } + tags = append(tags, tag) + } + return tags +} |