1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
package main
import (
"database/sql"
"fmt"
"os"
_ "modernc.org/sqlite"
shell "github.com/ipfs/go-ipfs-api"
"git.kiefte.eu/lapingvino/infodump/message"
)
var DatabasePath = "infodump.db"
var DB *sql.DB
// StartOLNListener starts a PubSub listener that listens for messages from the network
// and adds them to LocalMessages
// For this it uses the IPFS gateway and listens on the topic "OLN", as well as
// the list of followed tags from the database
func StartOLNListener() {
// Get the IPFS gateway
gateway := message.IPFSGateway
// Get the database
db := GetDatabase()
// Get the list of followed tags
followedTags := GetFollowedTags(db)
// Create a new IPFS client
myIPFS := shell.NewShell(gateway)
var subs []*shell.PubSubSubscription
olnsub, err := myIPFS.PubSubSubscribe("OLN")
if err != nil {
fmt.Println(err)
return
}
subs = append(subs, olnsub)
for _, tag := range followedTags {
tagssub, err := myIPFS.PubSubSubscribe("oln-" + tag)
if err != nil {
fmt.Println(err)
} else {
subs = append(subs, tagssub)
}
}
// Start a goroutine for each of the subscriptions in subs,
// read the CID from the Next method, look up the CID on IPFS,
// read this in via message.MessagesFromIPFS and add the message to LocalMessages
for _, sub := range subs {
go func(sub *shell.PubSubSubscription) {
for {
msg, err := sub.Next()
if err != nil {
fmt.Println("Error reading from PubSub:", err)
return
}
msgs, err := message.MessagesFromIPFS(string(msg.Data))
if err != nil {
fmt.Println("Error reading from IPFS:", err)
return
}
LocalMessages.AddMany(msgs)
}
}(sub)
}
}
// GetDatabase checks if DB is already set and opened, if not it Sets the database first
func GetDatabase() *sql.DB {
if DB == nil {
fmt.Println("Database not set, setting database...")
SetDatabase()
}
return DB
}
func GetMessagesFromDatabase(db *sql.DB) *message.Messages {
// Get all messages from the database
rows, err := db.Query("SELECT hash, message, nonce, timestamp FROM messages")
if err != nil {
fmt.Println(err)
}
defer rows.Close()
// Create a new Messages object
msgs := message.Messages{}
// Loop through all messages
fmt.Println("Getting messages from database...")
for rows.Next() {
var hash, msg string
var nonce int
var timestamp int64
// Get the values from the database
err := rows.Scan(&hash, &msg, &nonce, ×tamp)
if err != nil {
fmt.Println(err)
}
fmt.Println("Got message from database:", hash)
// Create a new message object
m := message.Message{
Message: msg,
Nonce: nonce,
Timestamp: timestamp,
}
// Add the message to the Messages object
fmt.Println("Adding message to Messages object...")
msgs.Add(&m)
}
return &msgs
}
func InitDatabase(db *sql.DB) {
var err error
// Create the table "messages"
_, err = db.Exec("CREATE TABLE messages(hash TEXT PRIMARY KEY, message TEXT, nonce INTEGER, timestamp INTEGER)")
if err != nil {
fmt.Println(err)
}
// Create the table "followed_tags"
_, err = db.Exec("CREATE TABLE followed_tags(tag TEXT)")
if err != nil {
fmt.Println(err)
}
}
// SetDatabase configures DB to be the database to use
// The name used is DatabasePath, but the user will be asked if this correct or if they want to change it
// If the database is already set, it will ask the user if they want to overwrite it
// If the database is not set, it will ask the user if they want to create it
// If the database is set but it doesn't contain the tables "messages" and "followed_tags",
// it will create them
func SetDatabase() {
// First check if the user is okay with the database path
fmt.Println("Database path: ", DatabasePath)
fmt.Println("Is this correct? (y/n)")
answer := Readline()
if answer == "n" {
fmt.Println("Enter the database path: ")
DatabasePath = Readline()
}
// Open the database
db, err := sql.Open("sqlite", DatabasePath)
if err != nil {
fmt.Println(err)
return
}
// Check if the database is already set
if DB != nil {
fmt.Println("Database already set, overwrite? (y/n)")
answer := Readline()
if answer == "n" {
return
}
}
// Check if the database exists
if _, err := os.Stat(DatabasePath); os.IsNotExist(err) {
fmt.Println("Database does not exist, create? (y/n)")
answer := Readline()
if answer == "n" {
return
}
}
// Set the database
DB = db
// Check if the database contains the tables "messages" and "followed_tags"
// If not, create them
InitDatabase(DB)
}
func TrimDatabase() {
// Ask how many messages to keep
fmt.Println("How many messages to keep?")
var num int
fmt.Scan(&num)
// Get the database
db := GetDatabase()
// Get the list of messages
msgs := GetMessagesFromDatabase(db)
// Trim the list of messages
msgs.Trim(num)
// Delete all messages from the database
_, err := db.Exec("DELETE FROM messages")
if err != nil {
fmt.Println(err)
}
// Add the trimmed list of messages to the database
msgs.Each(func(m *message.Message) {
_, err := db.Exec("INSERT INTO messages(hash, message, nonce, timestamp) VALUES(?, ?, ?, ?)", m.Stamp(), m.Message, m.Nonce, m.Timestamp)
if err != nil {
fmt.Println(err)
}
})
}
|