aboutsummaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go94
1 files changed, 82 insertions, 12 deletions
diff --git a/main.go b/main.go
index 338c6f8..eda7c15 100644
--- a/main.go
+++ b/main.go
@@ -11,7 +11,7 @@ import (
"time"
"git.kiefte.eu/lapingvino/infodump/message"
- ipfs "github.com/ipfs/go-ipfs-api"
+ shell "github.com/ipfs/go-ipfs-api"
_ "modernc.org/sqlite"
)
@@ -78,6 +78,7 @@ func main() {
for {
// Present the user with a menu
Menu([]MenuElements{
+ {"Start OLN Listener", StartOLNListener},
{"Read Messages", ReadMessages},
{"Write Message", WriteMessage},
{"Sync Messages", SyncMessages},
@@ -93,6 +94,56 @@ func main() {
}
}
+// StartOLNListener starts a PubSub listener that listens for messages from the network
+// and adds them to LocalMessages
+// For this it uses the IPFS gateway and listens on the topic "OLN", as well as
+// the list of followed tags from the database
+func StartOLNListener() {
+ // Get the IPFS gateway
+ gateway := message.IPFSGateway
+ // Get the database
+ db := GetDatabase()
+ // Get the list of followed tags
+ followedTags := GetFollowedTags(db)
+ // Create a new IPFS client
+ myIPFS := shell.NewShell(gateway)
+ var subs []*shell.PubSubSubscription
+ olnsub, err := myIPFS.PubSubSubscribe("OLN")
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ subs = append(subs, olnsub)
+ for _, tag := range followedTags {
+ tagssub, err := myIPFS.PubSubSubscribe("oln-tag-" + tag)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ subs = append(subs, tagssub)
+ }
+ }
+ // Start a goroutine for each of the subscriptions in subs,
+ // read the CID from the Next method, look up the CID on IPFS,
+ // read this in via message.MessagesFromIPFS and add the message to LocalMessages
+ for _, sub := range subs {
+ go func(sub *shell.PubSubSubscription) {
+ for {
+ msg, err := sub.Next()
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ msgs, err := message.MessagesFromIPFS(string(msg.Data))
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ LocalMessages.AddMany(msgs)
+ }
+ }(sub)
+ }
+}
+
// GetDatabase checks if DB is already set and opened, if not it Sets the database first
func GetDatabase() *sql.DB {
if DB == nil {
@@ -102,7 +153,7 @@ func GetDatabase() *sql.DB {
return DB
}
-func GetMessagesFromDatabase(db *sql.DB) message.Messages {
+func GetMessagesFromDatabase(db *sql.DB) *message.Messages {
// Get all messages from the database
rows, err := db.Query("SELECT hash, message, nonce, timestamp FROM messages")
if err != nil {
@@ -132,17 +183,17 @@ func GetMessagesFromDatabase(db *sql.DB) message.Messages {
}
// Add the message to the Messages object
fmt.Println("Adding message to Messages object...")
- msgs[hash] = &m
+ msgs.Add(&m)
}
- return msgs
+ return &msgs
}
func ReadMessages() {
// Show all messages in LocalMessages
fmt.Println("Local messages:")
- for _, m := range LocalMessages {
- fmt.Println(m)
- }
+ LocalMessages.Each(func(m *message.Message) {
+ fmt.Println(m.String())
+ })
}
func WriteMessage() {
@@ -163,14 +214,14 @@ func SyncMessages() {
// Update the database with the messages in LocalMessages
db := GetDatabase()
// Put the messages in the database
- for k, m := range LocalMessages {
+ LocalMessages.Each(func(m *message.Message) {
_, err := db.Exec("INSERT INTO messages(hash, message, nonce, timestamp) VALUES(?,?,?,?)", m.Stamp(), m.Message, m.Nonce, m.Timestamp)
if err != nil {
fmt.Println(err)
} else {
- fmt.Println("Message", k, "synced")
+ fmt.Println("Message", m.Stamp(), "synced")
}
- }
+ })
fmt.Println("Now loading messages from database...")
// Get the messages from the database
@@ -186,13 +237,14 @@ func SyncMessages() {
}
// Set LocalMessages to the messages that are in the database
- LocalMessages = messages
+ LocalMessages = message.Messages{}
+ LocalMessages.AddMany(messages)
}
func TestIPFSGateway(gateway string) error {
// Test the IPFS gateway
fmt.Println("Testing IPFS gateway...")
- ipfs := ipfs.NewShell(gateway)
+ ipfs := shell.NewShell(gateway)
_, err := ipfs.ID()
return err
}
@@ -285,3 +337,21 @@ func ConfigureFollowedTags() {
}
}
}
+
+func GetFollowedTags(db *sql.DB) []string {
+ // Get the tags from the database
+ rows, err := db.Query("SELECT tag FROM followed_tags")
+ if err != nil {
+ fmt.Println(err)
+ }
+ var tags []string
+ for rows.Next() {
+ var tag string
+ err = rows.Scan(&tag)
+ if err != nil {
+ fmt.Println(err)
+ }
+ tags = append(tags, tag)
+ }
+ return tags
+}